38 subscriptionTimer =
new ScribeTimer(
"Subscription timer");
49 numSubscriptionRefresh = 0;
50 subscriptionRefreshBytes = 0;
56 cancelAndDelete(subscriptionTimer);
62 if( stage != (numInitStages()-1))
73 WATCH(heartbeatBytes);
74 WATCH(numSubscriptionRefresh);
75 WATCH(subscriptionRefreshBytes);
76 WATCH(numChildTimeout);
77 WATCH(numParentTimeout);
79 childTimeout = par(
"childTimeout");
80 parentTimeout = par(
"parentTimeout");
86 simtime_t time = globalStatistics->calcMeasuredLifetime(creationTime);
89 globalStatistics->addStdDev(
"Scribe: Received JOIN Messages/s",
91 globalStatistics->addStdDev(
"Scribe: Forwarded Multicast Messages/s",
93 globalStatistics->addStdDev(
"Scribe: Forwarded Multicast Bytes/s",
95 globalStatistics->addStdDev(
"Scribe: Received Multicast Messages/s (subscribed groups only)",
97 globalStatistics->addStdDev(
"Scribe: Received Multicast Bytes/s (subscribed groups only)",
98 receivedBytes / time);
99 globalStatistics->addStdDev(
"Scribe: Send Heartbeat Messages/s",
100 numHeartbeat / time);
101 globalStatistics->addStdDev(
"Scribe: Send Heartbeat Bytes/s",
102 heartbeatBytes / time);
103 globalStatistics->addStdDev(
"Scribe: Send Subscription Refresh Messages/s",
104 numSubscriptionRefresh / time);
105 globalStatistics->addStdDev(
"Scribe: Send Subscription Refresh Bytes/s",
106 subscriptionRefreshBytes / time);
107 globalStatistics->addStdDev(
"Scribe: Number of Child Timeouts/s",
108 numChildTimeout / time);
109 globalStatistics->addStdDev(
"Scribe: Number of Parent Timeouts/s",
110 numParentTimeout / time);
117 if( joinMsg == NULL ) {
122 if( joinMsg->
getSrcNode() == overlay->getThisNode() )
return;
124 handleJoinMessage( joinMsg,
false );
132 for( GroupList::iterator it = groupList.begin(); it != groupList.end(); ++it ){
134 if( !it->second.getParent().isUnspecified()
135 && it->second.getParent() == overlay->getThisNode() ) {
138 if( comp.compare(node.
getKey(), overlay->getThisNode().getKey()) < 0){
159 cPolymorphic* context,
int rpcId,
164 handleJoinResponse( _ScribeJoinResponse );
165 EV <<
"[Scribe::handleRpcResponse() @ " << overlay->getThisNode().getIp()
166 <<
" (" << overlay->getThisNode().getKey().toString(16) <<
")]\n"
167 <<
" Received a ScribeJoin RPC Response: id=" << rpcId <<
"\n"
168 <<
" msg=" << *_ScribeJoinResponse <<
" rtt=" << rtt
173 handlePublishResponse( _ScribePublishResponse );
181 dynamic_cast<ScribeSubscriptionRefreshMessage*>(msg) ){
183 refreshChildTimer( refreshMsg->getSrc(), refreshMsg->getGroupId() );
186 deliverALMDataToGroup( data );
188 handleLeaveMessage( leaveMsg );
195 subscribeToGroup( subscribeMsg->getGroupId() );
197 }
else if(
ALMLeaveMessage* leaveMsg = dynamic_cast<ALMLeaveMessage*>(msg)){
198 leaveGroup( leaveMsg->getGroupId() );
201 deliverALMDataToRoot( mcastMsg );
202 }
else if(
ALMAnycastMessage* acastMsg = dynamic_cast<ALMAnycastMessage*>(msg) ){
204 EV <<
"[Scribe::handleUpperMessage() @ " << overlay->getThisNode().getIp()
205 <<
" (" << overlay->getThisNode().getKey().toString(16) <<
")]\n"
206 <<
" Anycast message for group " << acastMsg->getGroupId() <<
"\n"
207 <<
" ignored: Not implemented yet!"
210 }
else if(
ALMCreateMessage* createMsg = dynamic_cast<ALMCreateMessage*>(msg) ){
211 EV <<
"[Scribe::handleUpperMessage() @ " << overlay->getThisNode().getIp()
212 <<
" (" << overlay->getThisNode().getKey().toString(16) <<
")]\n"
213 <<
" Create message for group " << createMsg->getGroupId() <<
"\n"
214 <<
" ignored: Scribe has implicit create on SUBSCRIBE"
217 }
else if(
ALMDeleteMessage* deleteMsg = dynamic_cast<ALMDeleteMessage*>(msg) ){
218 EV <<
"[Scribe::handleUpperMessage() @ " << overlay->getThisNode().getIp()
219 <<
" (" << overlay->getThisNode().getKey().toString(16) <<
")]\n"
220 <<
" Delete message for group " << deleteMsg->getGroupId() <<
"\n"
221 <<
" ignored: Scribe has implicit delete on LEAVE"
235 readyMsg->
setComp( getThisCompType() );
237 send( readyMsg,
"to_upperTier" );
239 startTimer( subscriptionTimer );
251 for( GroupList::iterator it = groupList.begin(); it != groupList.end(); ++it ) {
255 refreshMsg->
setGroupId( it->second.getGroupId() );
256 refreshMsg->
setSrc( overlay->getThisNode() );
260 subscriptionRefreshBytes += refreshMsg->getByteLength()
265 startTimer( subscriptionTimer );
271 GroupList::iterator groupIt = groupList.find( timer->
getGroup() );
272 if( groupIt == groupList.end() ) {
277 for( set<NodeHandle>::iterator it = groupIt->second.getChildrenBegin();
278 it != groupIt->second.getChildrenEnd(); ++it ) {
284 RECORD_STATS(++numHeartbeat; heartbeatBytes += heartbeatMsg->getByteLength());
293 removeChildFromGroup( timer );
300 EV <<
"[Scribe::handleTimerEvent() @ " << overlay->getThisNode().getIp()
301 <<
" (" << overlay->getThisNode().getKey().
toString(16) <<
")]\n"
302 <<
" Parent of group " << key <<
"\n"
303 <<
" failed to send heartbeat, trying to rejoin"
311 GroupList::iterator groupIt = groupList.find( timer->
getGroup() );
312 if( groupIt == groupList.end() ) {
317 groupIt->second.setParentTimer( NULL );
318 cancelAndDelete( timer );
326 handleJoinMessage( joinMsg,
true );
334 EV <<
"[Scribe::handleJoinMessage() @ " << overlay->getThisNode().getIp()
335 <<
" (" << overlay->getThisNode().getKey().
toString(16) <<
")]\n"
336 <<
" Received a ScribeJoin for group " << key <<
"\n"
337 <<
" msg=" << joinMsg
341 pair<GroupList::iterator, bool> groupInserter;
342 groupInserter = groupList.insert( make_pair(key,
ScribeGroup(key)) );
345 if ( !amIRoot && ( groupInserter.second || groupInserter.first->second.getParent().isUnspecified()) ) {
353 if( groupInserter.first->second.numChildren() == 0 ) {
356 heartbeat->
setGroup( groupInserter.first->second.getGroupId() );
357 startTimer( heartbeat );
358 if(
ScribeTimer* t = groupInserter.first->second.getHeartbeatTimer() ){
360 if( t ) cancelAndDelete( t );
362 groupInserter.first->second.setHeartbeatTimer( heartbeat );
366 addChildToGroup( joinMsg->
getSrcNode(), groupInserter.first->second );
372 sendRpcResponse( joinMsg, joinResponse );
378 GroupList::iterator it = groupList.find( publishCall->
getGroupId() );
379 if( it == groupList.end() ||
380 it->second.getParent().isUnspecified() ||
381 it->second.getParent() != overlay->getThisNode() ){
388 sendRpcResponse( publishCall, msg );
395 sendRpcResponse( publishCall, msg );
399 EV <<
"[Scribe::handlePublishCall() @ " << overlay->getThisNode().getIp()
400 <<
" (" << overlay->getThisNode().getKey().toString(16) <<
")]\n"
401 <<
" PublishCall for group " << msg->
getGroupId()
402 <<
" does not contain a calid ALM data message!\n"
406 deliverALMDataToGroup( data );
412 GroupList::iterator it = groupList.find( joinResponse->
getGroupId() );
413 if( it == groupList.end() ) {
414 EV <<
"[Scribe::handleJoinResponse() @ " << overlay->getThisNode().getIp()
415 <<
" (" << overlay->getThisNode().getKey().toString(16) <<
")]\n"
416 <<
"Getting join response for an unknown group!\n";
419 it->second.setParent( joinResponse->
getSrcNode() );
424 parentTimeout->
setGroup( it->second.getGroupId() );
425 startTimer( parentTimeout );
426 if(
ScribeTimer* t = it->second.getParentTimer() ){
428 if( t ) cancelAndDelete( t );
430 it->second.setParentTimer( parentTimeout );
435 GroupList::iterator it = groupList.find( publishResponse->
getGroupId() );
436 if( it == groupList.end() ) {
437 EV <<
"[Scribe::handlePublishResponse() @ " << overlay->getThisNode().getIp()
438 <<
" (" << overlay->getThisNode().getKey().toString(16) <<
")]\n"
439 <<
"Getting publish response for an unknown group!\n";
446 it->second.setRendezvousPoint( publishResponse->
getSrcNode() );
452 GroupList::iterator it = groupList.find( leaveMsg->
getGroupId() );
453 if( it != groupList.end() ){
454 removeChildFromGroup( leaveMsg->
getSrc(), it->second );
461 EV <<
"[Scribe::subscribeToGroup() @ " << overlay->getThisNode().getIp()
462 <<
" (" << overlay->getThisNode().getKey().toString(16) <<
")]\n"
463 <<
" Trying to join group " << groupId <<
"\n";
466 pair<GroupList::iterator, bool> groupInserter;
467 groupInserter = groupList.insert( make_pair(groupId,
ScribeGroup(groupId)) );
470 groupInserter.first->second.setSubscription(
true);
473 if( groupInserter.second || groupInserter.first->second.getParent().isUnspecified()) {
483 GroupList::iterator it = groupList.find( group );
484 if( it != groupList.end() ){
485 it->second.setSubscription(
false );
486 checkGroupEmpty( it->second );
492 if( child == overlay->getThisNode() ) {
498 pair<set<NodeHandle>::iterator,
bool> inserter =
501 if( inserter.second ) {
507 timeoutMsg->
setChild( *inserter.first );
510 startTimer( timeoutMsg );
511 childTimeoutList.insert( make_pair(child, timeoutMsg) );
519 pair<ChildTimeoutList::iterator, ChildTimeoutList::iterator> ret =
520 childTimeoutList.equal_range( child );
521 if( ret.first != childTimeoutList.end() ){
522 for( ChildTimeoutList::iterator it = ret.first; it!=ret.second; ++it) {
523 if( group == it->second->getGroup() ) {
525 childTimeoutList.erase( it );
526 cancelAndDelete( timer );
535 checkGroupEmpty( group );
542 GroupList::iterator groupIt = groupList.find( timer->
getGroup() );
543 if( groupIt != groupList.end() ) {
548 checkGroupEmpty( group );
552 pair<ChildTimeoutList::iterator, ChildTimeoutList::iterator> ret =
553 childTimeoutList.equal_range( child );
554 if( ret.first != childTimeoutList.end() ) {
555 for( ChildTimeoutList::iterator it = ret.first; it!=ret.second; ++it) {
556 if( it->second == timer ) {
557 childTimeoutList.erase( it );
558 cancelAndDelete( timer );
573 msg->
setSrc( overlay->getThisNode() );
587 pair<ChildTimeoutList::iterator, ChildTimeoutList::iterator> ret =
588 childTimeoutList.equal_range( child );
590 if( ret.first == childTimeoutList.end() )
return;
593 for( ChildTimeoutList::iterator it = ret.first; it!=ret.second; ++it) {
594 if( it->first == child && it->second->getGroup() == groupId ) {
595 startTimer( it->second );
602 if( timer->isScheduled() ) {
603 cancelEvent( timer );
609 duration = parentTimeout/2;
612 duration = childTimeout/2;
615 duration = parentTimeout;
618 duration = childTimeout;
621 scheduleAt(simTime() + duration, timer );
627 pair<GroupList::iterator, bool> groupInserter;
631 if( groupInserter.second ) {
632 groupInserter.first->second.setAmISource(
true );
644 dataMsg->encapsulate( mcastMsg->decapsulate() );
650 msg->encapsulate( dataMsg );
652 if( !groupInserter.first->second.getRendezvousPoint().isUnspecified() ) {
654 sendRouteRpcCall(
TIER1_COMP, groupInserter.first->second.getRendezvousPoint(), msg);
667 GroupList::iterator it = groupList.find( dataMsg->
getGroupId() );
668 if( it == groupList.end() ) {
669 EV <<
"[Scribe::deliverALMDataToGroup() @ " << overlay->getThisNode().getIp()
670 <<
"Getting ALM data message response for an unknown group!\n";
678 if( timer ) startTimer( timer );
687 for( set<NodeHandle>::iterator cit = it->second.getChildrenBegin();
688 cit != it->second.getChildrenEnd(); ++cit ) {
690 RECORD_STATS(++numForward; forwardBytes += newMsg->getByteLength());
695 if( it->second.getSubscription() ) {
698 mcastMsg->encapsulate( dataMsg->decapsulate() );
699 RECORD_STATS(++numReceived; receivedBytes += dataMsg->getByteLength());
700 send( mcastMsg,
"to_upperTier" );