00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00021 #ifdef __MSC_VER
00022 #pragma warning ( disable : 4786 )
00023 #endif
00024
00025 #include <cassert>
00026 #include <cabal_channel.h>
00027 #include <cabal_socket.h>
00028 #include <cabal_conn.h>
00029 #include <cabal_trans.h>
00030 #include <cabal_recpt.h>
00031
00032 #include <iostream>
00033 using namespace std;
00034
00035 namespace Cabal
00036 {
00037
00038 void Channel::addSocket( Socket *skt )
00039 {
00040 skt->incref();
00041 m_sockets.push_back( skt );
00042 }
00043
00044 void Channel::removeSocket( Socket *skt )
00045 {
00046 SocketList::iterator it = m_sockets.begin();
00047 while( it != m_sockets.end() ){
00048 if ( *it == skt ) {
00049 m_sockets.erase( it );
00050 skt->decref();
00051 return;
00052 }
00053 it++;
00054 }
00055
00056 }
00057
00058 void Channel::schedule( Connection *con, const Priority prio )
00059 {
00060 ConList *shedList = &m_cons[static_cast<int>(prio)];
00061
00062 ConList::reverse_iterator rit = shedList->rbegin();
00063 MSECS conSched = con->nextSchedule();
00064
00065 while( rit != shedList->rend() )
00066 {
00067 if ( (*rit)->nextSchedule() <= conSched )
00068 {
00069 shedList->insert( rit.base(), con );
00070 break;
00071 }
00072 rit ++;
00073 }
00074
00075
00076 if ( rit == shedList->rend() ) {
00077 shedList->insert( shedList->begin(), con );
00078 m_nextSchedule = conSched;
00079 }
00080 }
00081
00082 void Channel::deschedule( const Connection *con, const Priority prio, const bool complete )
00083 {
00084
00085 ConList *lst = &m_cons[static_cast<int>(prio)];
00086 ConList::iterator it = lst->begin();
00087
00088 while( it != lst->end() )
00089 {
00090 if ( *it == con ) {
00091 it = lst->erase( it );
00092 if ( ! complete )
00093 return;
00094 }
00095 else
00096 it++;
00097 }
00098 }
00099
00100
00101 bool Channel::process( MSECS msecs )
00102 {
00103 RawSocket maxfd;
00104 fd_set wr_set, rd_set;
00105 fd_set *wr_set_point, *rd_set_point;
00106 struct timeval tv, *wait_tv;
00107 ConList *clist_used;
00108 SocketList::iterator sit;
00109
00110
00111 MSECS curTime = Net::timeOfDay();
00112 MSECS spanTime = curTime + msecs;
00113 MSECS cleanTime;
00114
00115 if ( m_bandwidth > 0 ) {
00116 cleanTime = m_lastSentTime + (m_lastSentSize * 1000 / m_bandwidth);
00117 }
00118 else
00119 cleanTime = -1;
00120
00121
00122
00123 MSECS readTime;
00124
00125
00126 if ( m_nextSchedule > 0 )
00127 {
00128
00129
00130 if ( m_nextSchedule < spanTime && ( cleanTime < m_nextSchedule || cleanTime < curTime ) ) {
00131
00132 readTime = ( m_nextSchedule > cleanTime ? m_nextSchedule : cleanTime ) - curTime;
00133
00134
00135
00136
00137
00138 }
00139 else
00140
00141 readTime = msecs == 0 ? 1: msecs;
00142 }
00143 else {
00144
00145 readTime = msecs;
00146 }
00147
00148 if ( readTime > 0 )
00149 {
00150 bool readDone = false;
00151
00152 if ( readTime > msecs ) readTime = msecs;
00153 FD_ZERO( &rd_set );
00154 maxfd = 0;
00155 sit = m_sockets.begin();
00156 rd_set_point = 0;
00157 while( sit != m_sockets.end() )
00158 {
00159 Socket *skt = (*sit);
00160
00161 if ( skt->closed() ) {
00162 sit = m_sockets.erase( sit );
00163 skt->decref();
00164 continue;
00165 }
00166
00167 if ( skt->reception() )
00168 {
00169 RawSocket fd = skt->rawSocket();
00170 FD_SET( fd, &rd_set );
00171 rd_set_point = &rd_set;
00172 if ( maxfd < fd ) maxfd = fd;
00173 }
00174 sit++;
00175 }
00176
00177
00178 if ( rd_set_point != 0 || msecs > 0 )
00179 {
00180
00181
00182
00183 tv.tv_sec = readTime/1000;
00184 tv.tv_usec = (readTime % 1000) * 1000;
00185
00186 int result;
00187 Net::resetError();
00188 result = select( maxfd + 1, rd_set_point, 0, 0, &tv );
00189
00190
00191 if ( result > 0 )
00192 {
00193 sit = m_sockets.begin();
00194 readDone = true;
00195
00196 while( result > 0 && sit != m_sockets.end() )
00197 {
00198 Socket *skt = (*sit);
00199 if ( skt->reception() != 0 && FD_ISSET( skt->rawSocket(), rd_set_point ) )
00200 {
00201 bool cont = skt->reception()->receive( skt );
00202 result--;
00203
00204 if ( skt->osError() != 0 || skt->closed() || ! cont )
00205 {
00206 sit = m_sockets.erase( sit );
00207 skt->decref();
00208
00209
00210 Net::resetError();
00211 continue;
00212 }
00213 }
00214 sit ++;
00215 }
00216 curTime = Net::timeOfDay();
00217
00218
00219
00220 if ( curTime > spanTime ) {
00221
00222 recptRoundRobin();
00223 return readDone;
00224 }
00225 }
00226 else if ( msecs == readTime )
00227 return false;
00228 }
00229 else if( msecs == readTime )
00230 return false;
00231 }
00232
00233
00234
00235
00236
00237
00238 FD_ZERO( &rd_set );
00239 maxfd = 0;
00240 sit = m_sockets.begin();
00241 rd_set_point = 0;
00242 while( sit != m_sockets.end() )
00243 {
00244 Socket *skt = (*sit);
00245
00246 if ( skt->closed() ) {
00247 sit = m_sockets.erase( sit );
00248 skt->decref();
00249 continue;
00250 }
00251
00252 if ( skt->reception() )
00253 {
00254 RawSocket fd = skt->rawSocket();
00255 FD_SET( fd, &rd_set );
00256
00257 rd_set_point = &rd_set;
00258 if ( maxfd < fd ) maxfd = fd;
00259 }
00260 sit++;
00261 }
00262
00263
00264
00265 FD_ZERO( &wr_set );
00266
00267
00268 wr_set_point = 0;
00269
00270
00271 for( int queueId = 0; queueId < CABAL_PRIORITIES ; queueId++ )
00272 {
00273 if ( ! m_cons[ queueId ].empty() )
00274 {
00275 ConList::iterator it = m_cons[queueId].begin();
00276
00277 MSECS sched = (*it)->nextSchedule();
00278
00279 if ( sched > 0 && sched <= curTime )
00280 {
00281
00282 do {
00283
00284 Socket *skt = (*it)->socket();
00285 if ( ! skt->closed() ) {
00286 RawSocket fd = skt->rawSocket();
00287
00288 FD_SET( fd, &wr_set );
00289 wr_set_point = &wr_set;
00290 if ( maxfd < fd ) maxfd = fd;
00291 }
00292 it++;
00293 } while ( it != m_cons[ queueId ].end() && (*it)->nextSchedule() <= curTime );
00294
00295
00296 clist_used = & m_cons[ queueId ];
00297
00298
00299 if ( it != clist_used->end() )
00300 {
00301 m_nextSchedule = (*it)->nextSchedule();
00302 }
00303 else {
00304 m_nextSchedule = -1;
00305 for ( int qid = queueId+1; qid < CABAL_PRIORITIES; qid ++ )
00306 {
00307 if ( ! m_cons[ qid ].empty() )
00308 {
00309 m_nextSchedule = m_cons[ qid ].front()->nextSchedule();
00310 break;
00311 }
00312 }
00313 }
00314
00315 break;
00316 }
00317
00318 else if ( m_nextSchedule > sched || m_nextSchedule == -1 )
00319 m_nextSchedule = sched;
00320 }
00321 }
00322
00323
00324 if( msecs >= 0 )
00325 {
00326
00327 spanTime -= curTime;
00328 if ( spanTime < 0 ) spanTime = 0;
00329
00330 tv.tv_sec = spanTime/1000;
00331 tv.tv_usec = (spanTime % 1000) * 1000;
00332 wait_tv = &tv;
00333 }
00334 else
00335 {
00336
00337 wait_tv = 0;
00338 }
00339
00340
00341
00342
00343 if ( (wait_tv == 0 || ( wait_tv != 0 && wait_tv->tv_sec == 0 && wait_tv->tv_usec == 0 ) )
00344 && rd_set_point == 0 && wr_set_point == 0 )
00345 {
00346 return false;
00347 }
00348
00349 int result;
00350 Net::resetError();
00351 result = select( maxfd + 1, rd_set_point, wr_set_point, 0, wait_tv );
00352
00353
00354
00355 if ( result <= 0 )
00356 return false;
00357
00358
00359 if ( rd_set_point )
00360 {
00361 sit = m_sockets.begin();
00362
00363 while( result > 0 && sit != m_sockets.end() )
00364 {
00365 Socket *skt = (*sit);
00366 if ( skt->reception() != 0 && FD_ISSET( skt->rawSocket(), rd_set_point ) )
00367 {
00368 bool cont = skt->reception()->receive( skt );
00369 result--;
00370
00371 if ( skt->osError() != 0 || skt->closed() || ! cont )
00372 {
00373 sit = m_sockets.erase( sit );
00374 skt->decref();
00375 Net::resetError();
00376 continue;
00377 }
00378 }
00379 sit ++;
00380 }
00381 }
00382
00383
00384 recptRoundRobin();
00385
00386
00387 if ( wr_set_point && result )
00388 {
00389
00390
00391 ConList::iterator clit = clist_used->begin();
00392
00393 while( result > 0 && clit != clist_used->end() )
00394 {
00395 Connection *con = (*clit);
00396 RawSocket fd = con->socket()->rawSocket();
00397
00398 if ( FD_ISSET( fd, wr_set_point ) )
00399 {
00400 bool cont = con->transmission()->transmit( con );
00401 result--;
00402
00403 clit = clist_used->erase( clit );
00404
00405 FD_CLR( fd, wr_set_point );
00406
00407
00408 if ( ! cont )
00409
00410 con->line()->remove( con );
00411
00412 Net::resetError();
00413
00414
00415
00416 }
00417 else
00418 clit++;
00419 }
00420 }
00421
00422
00423 return true;
00424 }
00425
00426 }
00427
00428