Main Page | Modules | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

cabal_channel.cpp

00001 /*
00002    CABAL - CAched BAlanced LAN
00003 
00004    Channel class.
00005 
00006    $Id: cabal_channel.cpp,v 1.7 2004/04/05 21:54:18 jonnymind Exp $
00007 ---------------------------------------------
00008    Begin      : 2004-01-08 19:53 UTC+0100
00009    Author     : Giancarlo Niccolai
00010 
00011    Last modified because:
00012 
00013 */
00014 
00021 #ifdef __MSC_VER
00022    #pragma warning ( disable : 4786 )
00023 #endif
00024 
00025 #include <cassert>
00026 #include <cabal_channel.h>
00027 #include <cabal_socket.h>
00028 #include <cabal_conn.h>
00029 #include <cabal_trans.h>
00030 #include <cabal_recpt.h>
00031 
00032 #include <iostream>
00033 using namespace std;
00034 
00035 namespace Cabal
00036 {
00037 
00038 void Channel::addSocket( Socket *skt )
00039 {
00040    skt->incref();
00041    m_sockets.push_back( skt );
00042 }
00043    
00044 void Channel::removeSocket( Socket *skt ) 
00045 {
00046    SocketList::iterator it = m_sockets.begin();
00047    while( it != m_sockets.end() ){
00048       if ( *it == skt ) {
00049          m_sockets.erase( it );
00050          skt->decref();
00051          return;
00052       }
00053       it++;
00054    }
00055    //oops, it wasent ours
00056 }
00057 
00058 void Channel::schedule( Connection *con, const Priority prio )
00059 {
00060    ConList *shedList = &m_cons[static_cast<int>(prio)];
00061    
00062    ConList::reverse_iterator rit = shedList->rbegin();
00063    MSECS conSched = con->nextSchedule();
00064    
00065    while( rit != shedList->rend() )
00066    {
00067       if ( (*rit)->nextSchedule() <= conSched ) 
00068       {
00069          shedList->insert( rit.base(), con );
00070          break;
00071       }
00072       rit ++;
00073    }
00074    
00075    // it's the most urgent! reprioritize ourself.
00076    if ( rit == shedList->rend() ) {
00077       shedList->insert( shedList->begin(), con );
00078       m_nextSchedule = conSched;
00079    }
00080 }
00081    
00082 void Channel::deschedule( const Connection *con, const Priority prio, const bool complete )
00083 {
00084    // a descheduled connection is more likely near the bottom.
00085    ConList *lst = &m_cons[static_cast<int>(prio)];
00086    ConList::iterator it = lst->begin();
00087    
00088    while( it != lst->end() )
00089    {
00090       if ( *it == con ) {
00091          it = lst->erase( it );
00092          if ( ! complete )
00093             return;  
00094       }
00095       else
00096          it++;
00097    }
00098 }
00099    
00100 
00101 bool Channel::process( MSECS msecs )
00102 {
00103    RawSocket maxfd;
00104    fd_set wr_set, rd_set;
00105    fd_set *wr_set_point, *rd_set_point;
00106    struct timeval tv, *wait_tv;
00107    ConList *clist_used;
00108    SocketList::iterator sit;
00109    
00110    // temporal references.
00111    MSECS curTime = Net::timeOfDay();
00112    MSECS spanTime = curTime + msecs;
00113    MSECS cleanTime;
00114    
00115    if ( m_bandwidth > 0 ) {
00116       cleanTime = m_lastSentTime + (m_lastSentSize * 1000 / m_bandwidth);
00117    }
00118    else 
00119       cleanTime = -1;
00120       
00121    // get the maximum time that writers must wait.
00122    
00123    MSECS readTime;
00124 
00125    // have we a schedule?
00126    if ( m_nextSchedule > 0 ) 
00127    {
00128       // is the first next schedule in this time slice? 
00129       // Have we enough bandwidth for this time slice ?
00130       if ( m_nextSchedule < spanTime && ( cleanTime < m_nextSchedule || cleanTime < curTime ) ) {
00131          // then, wait for the highest time between next schedule and clean (bandwidth) time.
00132          readTime = ( m_nextSchedule > cleanTime ? m_nextSchedule : cleanTime ) - curTime;
00133          // readTime <= 0 means immediately go to read/write phase.
00134          /*MSECS minWait = minimalWait(); // else, temper with minimal wait.
00135          if ( readTime < minWait ) {
00136             readTime = msecs < minWait ? msecs : minWait;
00137          }*/
00138       }
00139       else
00140          // use all the timeslice
00141          readTime = msecs == 0 ? 1: msecs; // set a fake read idle wait
00142    }
00143    else {
00144       // dedicates all the time for reading.
00145       readTime = msecs;
00146    }
00147    
00148    if ( readTime > 0 )
00149    {           
00150       bool readDone = false;      
00151       // reads all subscribed sockets.
00152       if ( readTime > msecs ) readTime = msecs;
00153       FD_ZERO( &rd_set );
00154       maxfd = 0;
00155       sit = m_sockets.begin();
00156       rd_set_point = 0;
00157       while( sit != m_sockets.end() )
00158       {
00159          Socket *skt = (*sit);
00160          // remove closed sockets in the meanwhile
00161          if ( skt->closed() ) {
00162             sit = m_sockets.erase( sit );
00163             skt->decref();
00164             continue;
00165          }
00166          
00167          if ( skt->reception() ) 
00168          {
00169             RawSocket fd = skt->rawSocket();
00170             FD_SET( fd, &rd_set );
00171             rd_set_point = &rd_set;
00172             if ( maxfd < fd ) maxfd = fd;
00173          }
00174          sit++;
00175       }
00176       
00177       // WAIT EVEN if there are no things to wait for, unless timeout is zero.
00178       if ( rd_set_point != 0 || msecs > 0 )
00179       {      
00180    
00181          // this time is completely dedicated to reading; so start reading.
00182          // Wait for the read time.
00183          tv.tv_sec = readTime/1000;
00184          tv.tv_usec = (readTime % 1000) * 1000;
00185          
00186          int result;
00187          Net::resetError();
00188          result = select( maxfd + 1, rd_set_point, 0, 0, &tv );
00189          
00190          // now, parse incoming traffic, if present.
00191          if ( result > 0 ) 
00192          {
00193             sit = m_sockets.begin();
00194             readDone = true;
00195             
00196             while( result > 0 && sit != m_sockets.end() )
00197             {
00198                Socket *skt = (*sit);
00199                if ( skt->reception() != 0 && FD_ISSET( skt->rawSocket(), rd_set_point ) )
00200                {
00201                   bool cont = skt->reception()->receive( skt );
00202                   result--;
00203                   // in case of error on the socket, remove it.
00204                   if ( skt->osError() != 0 || skt->closed() || ! cont )
00205                   {
00206                      sit = m_sockets.erase( sit );
00207                      skt->decref();
00208                      // important; if you don't do this, it's possible that all
00209                      // the receivers are just killed.
00210                      Net::resetError();
00211                      continue;
00212                   }
00213                }
00214                sit ++;
00215             }
00216             curTime = Net::timeOfDay();
00217          
00218             // Too late? Next time will be better; and if we have received early reads,
00219             // return if the next schedule was not available in current timeslice (msecs)      
00220             if ( curTime > spanTime ) { 
00221                //TRICK: round-robinize the receiver list.
00222                recptRoundRobin();
00223                return readDone; // we did something?
00224             }
00225          }
00226          else if ( msecs == readTime ) // all timeslice elapsed in waiting for nothing; return.
00227             return false;
00228       }
00229       else if( msecs == readTime )
00230          return false;
00231    }
00232    
00233    // ok, now it's time for read/writing.
00234    // if we are here, part (or all) the schedule is dedicated to writing
00235    // Again (even if already done), resubscribe for reading.
00236    // Prepare sockets with readers to subscribe the select
00237    
00238    FD_ZERO( &rd_set );
00239    maxfd = 0;
00240    sit = m_sockets.begin();
00241    rd_set_point = 0;
00242    while( sit != m_sockets.end() )
00243    {
00244       Socket *skt = (*sit);
00245       // remove closed sockets in the meanwhile
00246       if ( skt->closed() ) {
00247          sit = m_sockets.erase( sit );
00248          skt->decref();
00249          continue;
00250       }
00251       
00252       if ( skt->reception() ) 
00253       {
00254          RawSocket fd = skt->rawSocket();
00255          FD_SET( fd, &rd_set );
00256          // uses as a flag; it's a 32bit copy, as a bool...
00257          rd_set_point = &rd_set;
00258          if ( maxfd < fd ) maxfd = fd;
00259       }
00260       sit++;
00261    }
00262       
00263    // prepare the connection willing to send to enter the select for writing.
00264    
00265    FD_ZERO( &wr_set );   
00266    // we know we must write, or we would have dropped the function before.
00267    //... unless some writer scheduled to write has been removed.
00268    wr_set_point = 0;
00269       
00270    // Get the first non empty priority queue
00271    for( int queueId = 0; queueId < CABAL_PRIORITIES ; queueId++ )
00272    {
00273       if ( ! m_cons[ queueId ].empty() ) 
00274       {
00275          ConList::iterator it = m_cons[queueId].begin();
00276          // if the object is scheduled to go, put it in the select set.
00277          MSECS sched = (*it)->nextSchedule();
00278                      
00279          if ( sched > 0 && sched <= curTime ) 
00280          {  
00281             // get at least one.
00282             do {
00283                // ignore closed sockets in the meanwhile
00284                Socket *skt = (*it)->socket();
00285                if ( ! skt->closed() ) {
00286                   RawSocket fd = skt->rawSocket();
00287                   //FD_Setting a socker more than one time has no effect... so we do it!
00288                   FD_SET( fd, &wr_set );
00289                   wr_set_point = &wr_set;  // using the pointer as a flag.
00290                   if ( maxfd < fd ) maxfd = fd;
00291                }
00292                it++;
00293             } while ( it != m_cons[ queueId ].end() && (*it)->nextSchedule() <= curTime );
00294             
00295             // get only the first queue that has scheduled things
00296             clist_used = & m_cons[ queueId ];
00297             
00298             // reprioritize next schedule
00299             if ( it != clist_used->end() ) 
00300             {
00301                m_nextSchedule = (*it)->nextSchedule();
00302             }
00303             else {
00304                m_nextSchedule = -1;
00305                for ( int qid = queueId+1; qid < CABAL_PRIORITIES; qid ++ ) 
00306                {
00307                   if ( ! m_cons[ qid ].empty() ) 
00308                   {
00309                      m_nextSchedule = m_cons[ qid ].front()->nextSchedule();
00310                      break;
00311                   }
00312                }
00313             }
00314                      
00315             break;  
00316          }
00317          // else, adequate the future schedule.
00318          else if ( m_nextSchedule > sched || m_nextSchedule == -1 )
00319             m_nextSchedule = sched;
00320       }
00321    }
00322    
00323    
00324    if( msecs >= 0 ) 
00325    {
00326       // how much span time remained?   
00327       spanTime -= curTime; // curtime is updated by the reader only loop, if it happens..
00328       if ( spanTime < 0 ) spanTime = 0;
00329 
00330       tv.tv_sec = spanTime/1000;
00331       tv.tv_usec = (spanTime % 1000) * 1000;
00332       wait_tv = &tv;
00333    }
00334    else
00335    {
00336       // prepare for infinite wait
00337       wait_tv = 0;
00338    }
00339 
00340    
00341    // manage a special case. If we don't have to wait and don't have any set,
00342    // return immediately
00343    if ( (wait_tv == 0 || ( wait_tv != 0 && wait_tv->tv_sec == 0 && wait_tv->tv_usec == 0 ) )
00344          && rd_set_point == 0 && wr_set_point == 0 )
00345    {
00346       return false;
00347    }
00348 
00349    int result;
00350    Net::resetError();
00351    result = select( maxfd + 1, rd_set_point, wr_set_point, 0, wait_tv );
00352 
00353    // if there aren't news, return immediately
00354    //TODO: error management;
00355    if ( result <= 0 ) 
00356       return false;
00357       
00358    //parse first the receivers.
00359    if ( rd_set_point ) 
00360    {
00361       sit = m_sockets.begin();
00362       
00363       while( result > 0 && sit != m_sockets.end() )
00364       {
00365          Socket *skt = (*sit);
00366          if ( skt->reception() != 0 && FD_ISSET( skt->rawSocket(), rd_set_point ) )
00367          {
00368             bool cont = skt->reception()->receive( skt );
00369             result--;
00370             // in case of error on the socket, remove it.
00371             if ( skt->osError() != 0 || skt->closed() || ! cont )
00372             {
00373                sit = m_sockets.erase( sit );
00374                skt->decref();
00375                Net::resetError();
00376                continue;
00377             }
00378          }
00379          sit ++;
00380       }
00381    }
00382    
00383    //TRICK: round-robinize the receiver list.
00384    recptRoundRobin();
00385    
00386    // now for writing.
00387    if ( wr_set_point && result )
00388    {
00389      // reset also schedule, to allow other writers to set their own.
00390      
00391      ConList::iterator clit = clist_used->begin();
00392       
00393       while( result > 0 && clit != clist_used->end() )
00394       {
00395          Connection *con = (*clit);
00396          RawSocket fd = con->socket()->rawSocket();
00397          
00398          if ( FD_ISSET( fd, wr_set_point ) )
00399          {
00400             bool cont = con->transmission()->transmit( con );
00401             result--;
00402             // always remove the transmission from scheduled list.
00403             clit = clist_used->erase( clit );
00404             // and always reset the socket status: only one write per socket.
00405             FD_CLR( fd, wr_set_point );
00406             
00407             // requested to terminate?
00408             if ( ! cont )
00409                // a funny grammar, but it's right.
00410                con->line()->remove( con ); 
00411                
00412             Net::resetError();
00413             // do not erase in case of error on socket; this is an application side
00414             // business we are not interested in. If the transmission wants to
00415             // be removed it just return false, the rest it's its business.
00416          }
00417          else
00418             clit++;
00419       }
00420    }
00421    
00422    // signal that we had some processing
00423    return true;
00424 }
00425 
00426 } // namespace
00427 
00428 /* End of cabal_channel.cpp */

Generated on Sat Apr 10 17:41:48 2004 for Cabal by doxygen 1.3.5