Main Page | Modules | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

balser.cpp

00001 /*
00002    CABAL - CAched BAlanced LAN
00003    Test for balance and priorities
00004    
00005    Tests also pseudo connections on a single UDP socket
00006 
00007    (single thread version)
00008 
00009    $Id: balser.cpp,v 1.3 2004/03/29 14:47:55 jonnymind Exp $
00010 ---------------------------------------------
00011    Begin      :
00012    Author     : Giancarlo Niccolai
00013 
00014    Last modified because:
00015 
00016 */
00017 
00018 #ifdef _MSC_VER
00019    #pragma warning ( disable : 4786 )
00020 #endif
00021 
00022 
00023 #include <map>
00024 #include "balser.h"
00025 
00026 using namespace std;
00027 using namespace Cabal;
00028 
00029 
00030 static ClientHash clients;
00031 static unsigned long s_clientCount = 0;
00032 
00033 
00034 bool DflTransmit::transmit( Connection *con )
00035 {
00042    UDPSocket *skt = reinterpret_cast<UDPSocket *>(con->socket());
00043    PACKET pkt;
00044    unsigned long to_send;
00045    
00046    // pending resends?
00047    if( !m_resend.empty() ) 
00048    {
00049        to_send = * m_resend.begin();
00050        m_resend.pop_front();  
00051    }
00052    else if ( m_packetId < m_size ) {
00053       to_send = m_packetId;
00054       m_packetId ++;
00055    }
00056    else {
00057       // we are done: we've been called a last time to check for resends,
00058       // and we have none to do.
00059       cout << "Client " << m_userId <<"(" << m_dest.ip() <<":"<< m_dest.port() <<"): " <<
00060              "Transmission complete." << endl;
00061       
00062       // send a last goodbye transmission
00063       pkt.userId = m_userId;
00064       pkt.id = END_CONNECT; 
00065       pkt.timestamp = Net::timeOfDay(); 
00066       // for simplicity, we send all the packets of the same size.
00067       skt->sendTo( &pkt, sizeof( pkt ), m_dest );
00068    
00069       // remove client from active list.
00070       clients.erase( m_userId );
00071       return false;
00072    }
00073       
00074    pkt.userId = m_userId;
00075    pkt.id = to_send; 
00076    // in real app, we may have a global 
00077    // "tick" thread that provides all who needs with pre-cached
00078    // timestamp.
00079    pkt.timestamp = Net::timeOfDay(); 
00080    
00081    // just a data to be different from client to client.
00082    memset( pkt.data, static_cast<unsigned char>( to_send ), sizeof( pkt.data ) );
00083    
00084    // the real send operation; it (more or less) translates into a very fast inline.
00085    int iSent = skt->sendTo( &pkt, sizeof( pkt ), m_dest );
00086    
00087    if ( iSent > 0 ) 
00088    {
00089       con->account( iSent );
00090       
00091       // sent all packets?
00092       if ( m_packetId == m_size ) 
00093          // reschedule in timeout seconds to allow client to ask for last resends.
00094          con->schedule( 1500, m_prio );
00095       else
00096          // require fastest reschedulation possible; line and channel bandwidth will
00097          // limit us.
00098          con->schedule( 0, m_prio );
00099          
00100       return true;
00101    }
00102    else {
00103        // we had an error
00104       cout << "Client " << m_userId <<"(" << m_dest.ip() <<":"<< m_dest.port() <<"): " <<
00105            "lost on sendto error" << skt->osError() << ": "  << Net::errorDescription( skt->osError() ) << endl;
00106       return false;     
00107    }
00108 }
00109 
00110 
00111 
00112          
00113 bool DflReception::receive( Socket *skt )
00114 {
00115    //we know we are fed with an UDP socket; 
00116    //recv() will put the remote address in its remote() field
00117    char buffer[1500];
00118    PACKET *pkt;
00119    
00120    // read all the data in the datagram indipendently of the final casting
00121    int len = skt->recv( buffer, 1500 );
00122    
00123    if ( len == 0 ) {
00124       cout << "Reception terminating: Socket closed." << endl;
00125       return false;
00126    }
00127    else if ( len < 0 ) {
00128       cout << "Reception terminating: Socket had error " << skt->osError() << " (" << 
00129          Net::errorDescription( skt->osError() ) << ")" << endl;
00130       return false;
00131    }
00132    // not conforming with datagram we want to have ?
00133    else if ( len != CLIENT_REP_SIZE ) { 
00134       cout << "Reception warning: Spurious data on socket: "<< len << "bytes" <<endl;
00135       // discard, but don't give up.
00136       return true;
00137    }
00138    
00139    // let's see who's the sender.
00140    pkt = reinterpret_cast< PACKET *>( buffer );
00141    if ( pkt->userId == START_CONNECT ) // connection request!
00142    {
00143       s_clientCount++;
00144       if ( s_clientCount == 0xf0000000 ) 
00145          s_clientCount = 1;
00146          
00147       // prepare to contact-back our newcomer.
00148       DflTransmit *trs = new DflTransmit( s_clientCount, 60, skt->remote(), medium_prio );
00149       Connection *con = new Connection( skt, trs, this ); // last field optional, only for reference.
00150       Line *line = new Line( LINE_BANDWITH );
00151       line->set( "filetransfer", con );
00152       con->schedule( 10 ); // prepare to start fast.
00153       // save our client.
00154       clients[ s_clientCount ] = new Client( s_clientCount, line );
00155    }
00156    else {
00157       // a real application may use also address/port to ckeck for
00158       // client identity.
00159       ClientHash::iterator incoming = clients.find( pkt->userId );
00160       if ( incoming == clients.end() ) {
00161          cout << "Reception warning: Invalid client ID: "<< pkt->userId << endl;
00162          // discard, but don't give up.
00163          return true;
00164       }
00165       
00166       Client *remote = incoming->second;
00167       //Now we have our client; a real app would account ping stats using the echoed timestamp.
00168       CLI_REQ *request = reinterpret_cast< CLI_REQ *>( pkt->data ); 
00169       
00170       switch( request->type ) 
00171       {
00172          case REQ_TYPE_RESEND:
00173          {
00174             // filetransfer connection must be open.
00175             Connection *remcon = remote->m_comLine->get( "filetransfer" ); 
00176             // we know what kind of transmission are dealing with what kind of transfers.
00177             DflTransmit *dfltrans = reinterpret_cast< DflTransmit *>( remcon->transmission() );
00178             dfltrans->resend( request->value );
00179             // anticipates resend;
00180             remcon->schedule( 0, high_prio );
00181             // do nothing; we are just happy, and we signal the client is alive.
00182          }
00183          // no break;
00184          
00185          case REQ_TYPE_ACK:
00186             // do nothing; we are just happy, and we signal the client is alive.
00187             remote->m_lastHail = Net::timeOfDay();
00188             remote->m_packetId = pkt->id;
00189             remote->m_rtt = remote->m_lastHail < pkt->timestamp ? 
00190                   CABAL_DAY_LENGTH - pkt->timestamp + remote->m_lastHail: remote->m_lastHail - pkt->timestamp;
00191       }
00192    }
00193    
00194    // continuing operations.
00195    return true;
00196 }
00197 
00198 
00199 
00200 /*******************************************************************
00201 * Main procedure: create a server that accepts incoming connections
00202 * and produces a cacher for each new socket.
00203 * Cachers are destroyed as remote connections are closed.
00204 ********************************************************************/
00205 
00206 int main ( int argc, char *argv[] )
00207 {
00208    Net::init();
00209    int n_port;
00210    Channel *channel;
00211 
00212    cout << "Welcome from CABAL Balanced Server test." << endl;
00213    cout << "Channel Bandwith set to "<< CHANNEL_BANDWIDTH <<" bytes/sec." << endl;
00214    cout << "Client Bandwith set to "<< LINE_BANDWITH << " bytes/sec." << endl;
00215    cout << "Waiting loop set at 50ms." << endl;
00216    
00217    if ( argc > 1 ) {
00218       n_port = atoi( argv[1] );
00219    }
00220    else {
00221       n_port = DEFAULT_PORT;
00222    }
00223 
00224    // creates a channel for our server socket;
00225    channel = new Channel( CHANNEL_BANDWIDTH );
00226    
00227    try 
00228    {
00229       UDPSocket *usock = new UDPSocket( "0.0.0.0", n_port );
00230       cout << "Server waiting on port "<< n_port << endl;
00231       DflReception *rec = new DflReception();
00232       
00233       // assign the socket to a channel, with a given reception.
00234       usock->channel( channel, rec );
00235       // drop our reference for the socket.
00236       usock->decref();
00237       
00238       while( true )  // for now.
00239       {
00240          // sleep maximum 50ms
00241          bool res = channel->process( 50 );
00242          while( res ) res = channel->process( 0 );
00243          
00244          // now prune idle clients.
00245          ClientHash::iterator it = clients.begin();
00246          MSECS now = Net::timeOfDay();
00247          cout << "\r";
00248          while( it != clients.end() )
00249          {
00250             Client *cli = it->second;
00251             if( cli->m_lastHail < (now - 2000) && cli->m_lastHail > 2000 ) 
00252             {
00253                cout << cli->m_userId <<":K,";
00254                ClientHash::iterator old = it;
00255                it++;
00256                clients.erase( old );
00257                delete cli; // also deletes lines, connections and transmissions.
00258             }
00259             else {
00260                cout << cli->m_userId <<":" << cli->m_packetId <<  ":"<< cli->m_rtt << ",";
00261                it++;
00262             }
00263          }
00264          
00265          cout << "          " << flush;
00266       }
00267       
00268    }
00269    catch (Error e ) {
00270       cout << e <<"\n";
00271    }
00272    Net::exit();
00273 
00274    return 0;
00275 }

Generated on Sat Apr 10 17:41:48 2004 for Cabal by doxygen 1.3.5