OverSim
Scribe.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2006 Institut fuer Telematik, Universitaet Karlsruhe (TH)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 //
18 
24 #include <assert.h>
25 
26 #include <BaseApp.h>
27 #include "Scribe.h"
28 #include <GlobalStatistics.h>
29 
30 #include "Comparator.h"
31 
33 
34 using namespace std;
35 
37 {
38  subscriptionTimer = new ScribeTimer("Subscription timer");
39  subscriptionTimer->setTimerType( SCRIBE_SUBSCRIPTION_REFRESH );
40  numJoins = 0;
41  numChildTimeout = 0;
42  numParentTimeout = 0;
43  numForward = 0;
44  forwardBytes = 0;
45  numReceived = 0;
46  receivedBytes = 0;
47  numHeartbeat = 0;
48  heartbeatBytes = 0;
49  numSubscriptionRefresh = 0;
50  subscriptionRefreshBytes = 0;
51 }
52 
54 {
55  groupList.clear();
56  cancelAndDelete(subscriptionTimer);
57  // TODO: clear childTimeoutList
58 }
59 
60 void Scribe::initializeApp(int stage)
61 {
62  if( stage != (numInitStages()-1))
63  {
64  return;
65  }
66  WATCH(groupList);
67  WATCH(numJoins);
68  WATCH(numForward);
69  WATCH(forwardBytes);
70  WATCH(numReceived);
71  WATCH(receivedBytes);
72  WATCH(numHeartbeat);
73  WATCH(heartbeatBytes);
74  WATCH(numSubscriptionRefresh);
75  WATCH(subscriptionRefreshBytes);
76  WATCH(numChildTimeout);
77  WATCH(numParentTimeout);
78 
79  childTimeout = par("childTimeout");
80  parentTimeout = par("parentTimeout");
81 
82 }
83 
85 {
86  simtime_t time = globalStatistics->calcMeasuredLifetime(creationTime);
87  if (time < GlobalStatistics::MIN_MEASURED) return;
88 
89  globalStatistics->addStdDev("Scribe: Received JOIN Messages/s",
90  numJoins / time);
91  globalStatistics->addStdDev("Scribe: Forwarded Multicast Messages/s",
92  numForward / time);
93  globalStatistics->addStdDev("Scribe: Forwarded Multicast Bytes/s",
94  forwardBytes / time);
95  globalStatistics->addStdDev("Scribe: Received Multicast Messages/s (subscribed groups only)",
96  numReceived / time);
97  globalStatistics->addStdDev("Scribe: Received Multicast Bytes/s (subscribed groups only)",
98  receivedBytes / time);
99  globalStatistics->addStdDev("Scribe: Send Heartbeat Messages/s",
100  numHeartbeat / time);
101  globalStatistics->addStdDev("Scribe: Send Heartbeat Bytes/s",
102  heartbeatBytes / time);
103  globalStatistics->addStdDev("Scribe: Send Subscription Refresh Messages/s",
104  numSubscriptionRefresh / time);
105  globalStatistics->addStdDev("Scribe: Send Subscription Refresh Bytes/s",
106  subscriptionRefreshBytes / time);
107  globalStatistics->addStdDev("Scribe: Number of Child Timeouts/s",
108  numChildTimeout / time);
109  globalStatistics->addStdDev("Scribe: Number of Parent Timeouts/s",
110  numParentTimeout / time);
111 }
112 
113 void Scribe::forward(OverlayKey* key, cPacket** msg,
114  NodeHandle* nextHopNode)
115 {
116  ScribeJoinCall* joinMsg = dynamic_cast<ScribeJoinCall*> (*msg);
117  if( joinMsg == NULL ) {
118  // nothing to be done
119  return;
120  }
121 
122  if( joinMsg->getSrcNode() == overlay->getThisNode() ) return;
123 
124  handleJoinMessage( joinMsg, false );
125 
126  *msg = NULL;
127 }
128 
129 void Scribe::update( const NodeHandle& node, bool joined )
130 {
131  // if node is closer to any group i'm root of, subscribe
132  for( GroupList::iterator it = groupList.begin(); it != groupList.end(); ++it ){
133  // if I'm root ...
134  if( !it->second.getParent().isUnspecified()
135  && it->second.getParent() == overlay->getThisNode() ) {
136  KeyDistanceComparator<KeyRingMetric> comp( it->second.getGroupId() );
137  // ... and new node is closer to groupId
138  if( comp.compare(node.getKey(), overlay->getThisNode().getKey()) < 0){
139  // then the new node is new group root, so send him a subscribe
141  m->setGroupId( it->second.getGroupId() );
142  m->setBitLength( SCRIBE_JOINCALL_L(m) );
143  sendRouteRpcCall(TIER1_COMP, node, m);
144  }
145  }
146  }
147 }
148 
150 {
151  RPC_SWITCH_START(msg);
152  RPC_DELEGATE(ScribeJoin, handleJoinCall);
153  RPC_DELEGATE(ScribePublish, handlePublishCall);
154  RPC_SWITCH_END();
155  return RPC_HANDLED;
156 }
157 
159  cPolymorphic* context, int rpcId,
160  simtime_t rtt)
161 {
162  RPC_SWITCH_START(msg);
163  RPC_ON_RESPONSE( ScribeJoin ) {
164  handleJoinResponse( _ScribeJoinResponse );
165  EV << "[Scribe::handleRpcResponse() @ " << overlay->getThisNode().getIp()
166  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
167  << " Received a ScribeJoin RPC Response: id=" << rpcId << "\n"
168  << " msg=" << *_ScribeJoinResponse << " rtt=" << rtt
169  << endl;
170  break;
171  }
172  RPC_ON_RESPONSE( ScribePublish ) {
173  handlePublishResponse( _ScribePublishResponse );
174  }
175  RPC_SWITCH_END( );
176 }
177 
178 void Scribe::deliver(OverlayKey& key, cMessage* msg)
179 {
180  if( ScribeSubscriptionRefreshMessage* refreshMsg =
181  dynamic_cast<ScribeSubscriptionRefreshMessage*>(msg) ){
182  // reset child timeout
183  refreshChildTimer( refreshMsg->getSrc(), refreshMsg->getGroupId() );
184  delete refreshMsg;
185  } else if( ScribeDataMessage* data = dynamic_cast<ScribeDataMessage*>(msg) ){
186  deliverALMDataToGroup( data );
187  } else if( ScribeLeaveMessage* leaveMsg = dynamic_cast<ScribeLeaveMessage*>(msg) ){
188  handleLeaveMessage( leaveMsg );
189  }
190 }
191 
192 void Scribe::handleUpperMessage( cMessage *msg )
193 {
194  if( ALMSubscribeMessage* subscribeMsg = dynamic_cast<ALMSubscribeMessage*>(msg)){
195  subscribeToGroup( subscribeMsg->getGroupId() );
196  delete msg;
197  } else if( ALMLeaveMessage* leaveMsg = dynamic_cast<ALMLeaveMessage*>(msg)){
198  leaveGroup( leaveMsg->getGroupId() );
199  delete msg;
200  } else if( ALMMulticastMessage* mcastMsg = dynamic_cast<ALMMulticastMessage*>(msg) ){
201  deliverALMDataToRoot( mcastMsg );
202  } else if( ALMAnycastMessage* acastMsg = dynamic_cast<ALMAnycastMessage*>(msg) ){
203  // FIXME: anycast not implemented yet
204  EV << "[Scribe::handleUpperMessage() @ " << overlay->getThisNode().getIp()
205  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
206  << " Anycast message for group " << acastMsg->getGroupId() << "\n"
207  << " ignored: Not implemented yet!"
208  << endl;
209  delete msg;
210  } else if( ALMCreateMessage* createMsg = dynamic_cast<ALMCreateMessage*>(msg) ){
211  EV << "[Scribe::handleUpperMessage() @ " << overlay->getThisNode().getIp()
212  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
213  << " Create message for group " << createMsg->getGroupId() << "\n"
214  << " ignored: Scribe has implicit create on SUBSCRIBE"
215  << endl;
216  delete msg;
217  } else if( ALMDeleteMessage* deleteMsg = dynamic_cast<ALMDeleteMessage*>(msg) ){
218  EV << "[Scribe::handleUpperMessage() @ " << overlay->getThisNode().getIp()
219  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
220  << " Delete message for group " << deleteMsg->getGroupId() << "\n"
221  << " ignored: Scribe has implicit delete on LEAVE"
222  << endl;
223  delete msg;
224  }
225 }
226 
228 {
229  // process only ready messages from the tier below
230  if( (getThisCompType() - msg->getComp() == 1) && msg->getReady() ) {
231 
232  // Send a ready message to the tier above
233  CompReadyMessage* readyMsg = new CompReadyMessage;
234  readyMsg->setReady( true );
235  readyMsg->setComp( getThisCompType() );
236 
237  send( readyMsg, "to_upperTier" );
238 
239  startTimer( subscriptionTimer );
240  }
241  delete msg;
242 }
243 
244 void Scribe::handleTimerEvent( cMessage *msg )
245 {
246  ScribeTimer* timer = dynamic_cast<ScribeTimer*>(msg);
247  assert( timer );
248  switch( timer->getTimerType() ) {
250  // renew subscriptions for all groups
251  for( GroupList::iterator it = groupList.begin(); it != groupList.end(); ++it ) {
252  NodeHandle parent = it->second.getParent();
253  if( !parent.isUnspecified() ){
255  refreshMsg->setGroupId( it->second.getGroupId() );
256  refreshMsg->setSrc( overlay->getThisNode() );
257 
258  refreshMsg->setBitLength(SCRIBE_SUBSCRIPTIONREFRESH_L(refreshMsg));
259  RECORD_STATS(++numSubscriptionRefresh;
260  subscriptionRefreshBytes += refreshMsg->getByteLength()
261  );
262  callRoute( OverlayKey::UNSPECIFIED_KEY, refreshMsg, parent );
263  }
264  }
265  startTimer( subscriptionTimer );
266  break;
267 
268  case SCRIBE_HEARTBEAT:
269  {
270  // Send heartbeat messages to all children in the group
271  GroupList::iterator groupIt = groupList.find( timer->getGroup() );
272  if( groupIt == groupList.end() ) {
273  // FIXME: should not happen
274  delete timer;
275  return;
276  }
277  for( set<NodeHandle>::iterator it = groupIt->second.getChildrenBegin();
278  it != groupIt->second.getChildrenEnd(); ++it ) {
279  ScribeDataMessage* heartbeatMsg = new ScribeDataMessage("Heartbeat");
280  heartbeatMsg->setEmpty( true );
281  heartbeatMsg->setGroupId( timer->getGroup() );
282 
283  heartbeatMsg->setBitLength(SCRIBE_DATA_L(heartbeatMsg));
284  RECORD_STATS(++numHeartbeat; heartbeatBytes += heartbeatMsg->getByteLength());
285  callRoute( OverlayKey::UNSPECIFIED_KEY, heartbeatMsg, *it );
286  }
287  startTimer( timer );
288  break;
289  }
291  // Child failed, remove it from group
292  RECORD_STATS(++numChildTimeout);
293  removeChildFromGroup( timer );
294  break;
295 
297  // Parent failed, send new join to rejoin group
298  RECORD_STATS(++numParentTimeout);
299  OverlayKey key = timer->getGroup();
300  EV << "[Scribe::handleTimerEvent() @ " << overlay->getThisNode().getIp()
301  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
302  << " Parent of group " << key << "\n"
303  << " failed to send heartbeat, trying to rejoin"
304  << endl;
305 
306  ScribeJoinCall* newJoin = new ScribeJoinCall;
307  newJoin->setGroupId( key );
308  newJoin->setBitLength( SCRIBE_JOINCALL_L(newJoin) );
309  sendRouteRpcCall(TIER1_COMP, key, newJoin);
310 
311  GroupList::iterator groupIt = groupList.find( timer->getGroup() );
312  if( groupIt == groupList.end() ) {
313  // FIXME: should not happen
314  delete timer;
315  return;
316  }
317  groupIt->second.setParentTimer( NULL );
318  cancelAndDelete( timer );
319  break;
320  }
321 
322 }
323 
325 {
326  handleJoinMessage( joinMsg, true );
327 }
328 
329 void Scribe::handleJoinMessage( ScribeJoinCall* joinMsg, bool amIRoot)
330 {
331  RECORD_STATS(++numJoins);
332  OverlayKey key = joinMsg->getGroupId();
333 
334  EV << "[Scribe::handleJoinMessage() @ " << overlay->getThisNode().getIp()
335  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
336  << " Received a ScribeJoin for group " << key << "\n"
337  << " msg=" << joinMsg
338  << endl;
339 
340  // Insert group into grouplist, if not already there
341  pair<GroupList::iterator, bool> groupInserter;
342  groupInserter = groupList.insert( make_pair(key, ScribeGroup(key)) );
343 
344  // If group is new or no parent is known, send join to parent (unless I am root, so there is no parent)
345  if ( !amIRoot && ( groupInserter.second || groupInserter.first->second.getParent().isUnspecified()) ) {
346  ScribeJoinCall* newJoin = new ScribeJoinCall;
347  newJoin->setGroupId( key );
348  newJoin->setBitLength( SCRIBE_JOINCALL_L(newJoin) );
349  sendRouteRpcCall(TIER1_COMP, key, newJoin);
350  }
351 
352  // If group had no children, start heartbeat timer for group
353  if( groupInserter.first->second.numChildren() == 0 ) {
354  ScribeTimer* heartbeat = new ScribeTimer("HeartbeatTimer");
355  heartbeat->setTimerType( SCRIBE_HEARTBEAT );
356  heartbeat->setGroup( groupInserter.first->second.getGroupId() );
357  startTimer( heartbeat );
358  if( ScribeTimer* t = groupInserter.first->second.getHeartbeatTimer() ){
359  // delete old timer, if any
360  if( t ) cancelAndDelete( t );
361  }
362  groupInserter.first->second.setHeartbeatTimer( heartbeat );
363  }
364 
365  // Add child to group
366  addChildToGroup( joinMsg->getSrcNode(), groupInserter.first->second );
367 
368  // Send joinResponse
369  ScribeJoinResponse* joinResponse = new ScribeJoinResponse;
370  joinResponse->setGroupId( key );
371  joinResponse->setBitLength( SCRIBE_JOINRESPONSE_L(joinResponse) );
372  sendRpcResponse( joinMsg, joinResponse );
373 }
374 
376 {
377  // find group
378  GroupList::iterator it = groupList.find( publishCall->getGroupId() );
379  if( it == groupList.end() ||
380  it->second.getParent().isUnspecified() ||
381  it->second.getParent() != overlay->getThisNode() ){
382  // if I don't know the group or I am not root, inform sender
383  // TODO: forward message when I'm not root but know the rendevous point?
384  ScribePublishResponse* msg = new ScribePublishResponse("Wrong Root");
385  msg->setGroupId( publishCall->getGroupId() );
386  msg->setWrongRoot( true );
387  msg->setBitLength( SCRIBE_PUBLISHRESPONSE_L(msg) );
388  sendRpcResponse( publishCall, msg );
389  } else {
390  ScribeDataMessage* data = dynamic_cast<ScribeDataMessage*>(publishCall->decapsulate());
391 
392  ScribePublishResponse* msg = new ScribePublishResponse("Publish Successful");
393  msg->setGroupId( publishCall->getGroupId() );
394  msg->setBitLength( SCRIBE_PUBLISHRESPONSE_L(msg) );
395  sendRpcResponse( publishCall, msg );
396 
397  if( !data ) {
398  // TODO: throw exception? this should never happen
399  EV << "[Scribe::handlePublishCall() @ " << overlay->getThisNode().getIp()
400  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
401  << " PublishCall for group " << msg->getGroupId()
402  << " does not contain a calid ALM data message!\n"
403  << endl;
404  return;
405  }
406  deliverALMDataToGroup( data );
407  }
408 }
409 
411 {
412  GroupList::iterator it = groupList.find( joinResponse->getGroupId() );
413  if( it == groupList.end() ) {
414  EV << "[Scribe::handleJoinResponse() @ " << overlay->getThisNode().getIp()
415  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
416  << "Getting join response for an unknown group!\n";
417  return;
418  }
419  it->second.setParent( joinResponse->getSrcNode() );
420 
421  // Create new heartbeat timer
422  ScribeTimer* parentTimeout = new ScribeTimer("ParentTimeoutTimer");
423  parentTimeout->setTimerType( SCRIBE_PARENT_TIMEOUT );
424  parentTimeout->setGroup( it->second.getGroupId() );
425  startTimer( parentTimeout );
426  if( ScribeTimer* t = it->second.getParentTimer() ){
427  // delete old timer, if any
428  if( t ) cancelAndDelete( t );
429  }
430  it->second.setParentTimer( parentTimeout );
431 }
432 
434 {
435  GroupList::iterator it = groupList.find( publishResponse->getGroupId() );
436  if( it == groupList.end() ) {
437  EV << "[Scribe::handlePublishResponse() @ " << overlay->getThisNode().getIp()
438  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
439  << "Getting publish response for an unknown group!\n";
440  return;
441  }
442 
443  if( publishResponse->getWrongRoot() ) {
444  it->second.setRendezvousPoint( NodeHandle::UNSPECIFIED_NODE );
445  } else {
446  it->second.setRendezvousPoint( publishResponse->getSrcNode() );
447  }
448 }
449 
451 {
452  GroupList::iterator it = groupList.find( leaveMsg->getGroupId() );
453  if( it != groupList.end() ){
454  removeChildFromGroup( leaveMsg->getSrc(), it->second );
455  }
456  delete leaveMsg;
457 }
458 
459 void Scribe::subscribeToGroup( const OverlayKey& groupId )
460 {
461  EV << "[Scribe::subscribeToGroup() @ " << overlay->getThisNode().getIp()
462  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
463  << " Trying to join group " << groupId << "\n";
464 
465  // Insert group into grouplist, if not known yet
466  pair<GroupList::iterator, bool> groupInserter;
467  groupInserter = groupList.insert( make_pair(groupId, ScribeGroup(groupId)) );
468 
469  // Set subscription status
470  groupInserter.first->second.setSubscription(true);
471 
472  // Send join call if I'm not already a forwarder of this group
473  if( groupInserter.second || groupInserter.first->second.getParent().isUnspecified()) {
475  m->setGroupId( groupId );
476  m->setBitLength( SCRIBE_JOINCALL_L(m) );
477  sendRouteRpcCall(TIER1_COMP, m->getGroupId(), m);
478  }
479 }
480 
481 void Scribe::leaveGroup( const OverlayKey& group )
482 {
483  GroupList::iterator it = groupList.find( group );
484  if( it != groupList.end() ){
485  it->second.setSubscription( false );
486  checkGroupEmpty( it->second );
487  }
488 }
489 
490 void Scribe::addChildToGroup( const NodeHandle& child, ScribeGroup& group )
491 {
492  if( child == overlay->getThisNode() ) {
493  // Join from ourself, ignore
494  return;
495  }
496 
497  // add child to group's children list
498  pair<set<NodeHandle>::iterator, bool> inserter =
499  group.addChild( child );
500 
501  if( inserter.second ) {
502  // if child has not been in the list, create new timeout msg
503  ScribeTimer* timeoutMsg = new ScribeTimer;
504  timeoutMsg->setTimerType( SCRIBE_CHILD_TIMEOUT );
505 
506  // Remember child and group
507  timeoutMsg->setChild( *inserter.first );
508  timeoutMsg->setGroup( group.getGroupId() );
509 
510  startTimer( timeoutMsg );
511  childTimeoutList.insert( make_pair(child, timeoutMsg) );
512  }
513 }
514 
516 {
517  // find timer
518  ScribeTimer* timer = NULL;
519  pair<ChildTimeoutList::iterator, ChildTimeoutList::iterator> ret =
520  childTimeoutList.equal_range( child );
521  if( ret.first != childTimeoutList.end() ){
522  for( ChildTimeoutList::iterator it = ret.first; it!=ret.second; ++it) {
523  if( group == it->second->getGroup() ) {
524  timer = it->second;
525  childTimeoutList.erase( it );
526  cancelAndDelete( timer );
527  break;
528  }
529  }
530  }
531 
532  // remove child from group's childrenlist
533  group.removeChild( child );
534 
535  checkGroupEmpty( group );
536 }
537 
539 {
540  NodeHandle& child = timer->getChild();
541 
542  GroupList::iterator groupIt = groupList.find( timer->getGroup() );
543  if( groupIt != groupList.end() ) {
544  ScribeGroup& group = groupIt->second;
545  // remove child from group's childrenlist
546  group.removeChild( child );
547 
548  checkGroupEmpty( group );
549  }
550 
551  // remove timer from timeoutlist
552  pair<ChildTimeoutList::iterator, ChildTimeoutList::iterator> ret =
553  childTimeoutList.equal_range( child );
554  if( ret.first != childTimeoutList.end() ) {
555  for( ChildTimeoutList::iterator it = ret.first; it!=ret.second; ++it) {
556  if( it->second == timer ) {
557  childTimeoutList.erase( it );
558  cancelAndDelete( timer );
559  break;
560  }
561  }
562  }
563 }
564 
566 {
567  if( !group.isForwarder() && !group.getSubscription() && !group.getAmISource()){
568 
569  if( !group.getParent().isUnspecified() && group.getParent() != overlay->getThisNode() ) {
570 
571  ScribeLeaveMessage* msg = new ScribeLeaveMessage("Leave");
572  msg->setGroupId( group.getGroupId() );
573  msg->setSrc( overlay->getThisNode() );
574  msg->setBitLength( SCRIBE_LEAVE_L(msg) );
575  callRoute( OverlayKey::UNSPECIFIED_KEY, msg, group.getParent() );
576  }
577 
578  if( group.getParentTimer() ) cancelAndDelete( group.getParentTimer() );
579  if( group.getHeartbeatTimer() ) cancelAndDelete( group.getHeartbeatTimer() );
580  groupList.erase( group.getGroupId() );
581  }
582 }
583 
585 {
586  // find timer
587  pair<ChildTimeoutList::iterator, ChildTimeoutList::iterator> ret =
588  childTimeoutList.equal_range( child );
589  // no timer yet?
590  if( ret.first == childTimeoutList.end() ) return;
591 
592  // restart timer
593  for( ChildTimeoutList::iterator it = ret.first; it!=ret.second; ++it) {
594  if( it->first == child && it->second->getGroup() == groupId ) {
595  startTimer( it->second );
596  }
597  }
598 }
599 
601 {
602  if( timer->isScheduled() ) {
603  cancelEvent( timer );
604  }
605 
606  int duration = 0;
607  switch( timer->getTimerType() ) {
608  case SCRIBE_HEARTBEAT:
609  duration = parentTimeout/2;
610  break;
612  duration = childTimeout/2;
613  break;
615  duration = parentTimeout;
616  break;
618  duration = childTimeout;
619  break;
620  }
621  scheduleAt(simTime() + duration, timer );
622 }
623 
625 {
626  // find group
627  pair<GroupList::iterator, bool> groupInserter;
628  groupInserter = groupList.insert( make_pair(mcastMsg->getGroupId(), ScribeGroup(mcastMsg->getGroupId())) );
629 
630  // Group is not known yet
631  if( groupInserter.second ) {
632  groupInserter.first->second.setAmISource( true );
633  // TODO: Start/Restart timer to clean up cached groups
634  // If the timer expires, the flag should be cleared and checkGroupEmpty should be called
635  //
636  // FIXME: amISource should be set allways if app publishes messages to the group
637  // As the timer is not implemented yet, we only set the flag in "sender, but not receiver" mode
638  // to reduce the amount of unneccessary cached groups
639  }
640 
641  ScribeDataMessage* dataMsg = new ScribeDataMessage( mcastMsg->getName() );
642  dataMsg->setGroupId( mcastMsg->getGroupId() );
643  dataMsg->setBitLength( SCRIBE_DATA_L( dataMsg ));
644  dataMsg->encapsulate( mcastMsg->decapsulate() );
645 
646  // Send publish ...
647  ScribePublishCall* msg = new ScribePublishCall( "ScribePublish" );
648  msg->setGroupId( dataMsg->getGroupId() );
649  msg->setBitLength( SCRIBE_PUBLISHCALL_L(msg) );
650  msg->encapsulate( dataMsg );
651 
652  if( !groupInserter.first->second.getRendezvousPoint().isUnspecified() ) {
653  // ... directly to the rendevouz point, if known ...
654  sendRouteRpcCall(TIER1_COMP, groupInserter.first->second.getRendezvousPoint(), msg);
655  } else {
656  // ... else route it via KBR
657  sendRouteRpcCall(TIER1_COMP, msg->getGroupId(), msg);
658  }
659 
660  delete mcastMsg;
661 }
662 
663 
665 {
666  // find group
667  GroupList::iterator it = groupList.find( dataMsg->getGroupId() );
668  if( it == groupList.end() ) {
669  EV << "[Scribe::deliverALMDataToGroup() @ " << overlay->getThisNode().getIp()
670  << "Getting ALM data message response for an unknown group!\n";
671  delete dataMsg;
672  return;
673  }
674  // FIXME: ignore message if not from designated parent to avoid duplicates
675 
676  // reset parent heartbeat Timer
677  ScribeTimer *timer = it->second.getParentTimer();
678  if( timer ) startTimer( timer );
679 
680  // Only empty heartbeat?
681  if( dataMsg->getEmpty() ) {
682  delete dataMsg;
683  return;
684  }
685 
686  // deliver data to children
687  for( set<NodeHandle>::iterator cit = it->second.getChildrenBegin();
688  cit != it->second.getChildrenEnd(); ++cit ) {
689  ScribeDataMessage* newMsg = new ScribeDataMessage( *dataMsg );
690  RECORD_STATS(++numForward; forwardBytes += newMsg->getByteLength());
691  callRoute( OverlayKey::UNSPECIFIED_KEY, newMsg, *cit );
692  }
693 
694  // deliver to myself if I'm subscribed to that group
695  if( it->second.getSubscription() ) {
696  ALMMulticastMessage* mcastMsg = new ALMMulticastMessage( dataMsg->getName() );
697  mcastMsg->setGroupId( dataMsg->getGroupId() );
698  mcastMsg->encapsulate( dataMsg->decapsulate() );
699  RECORD_STATS(++numReceived; receivedBytes += dataMsg->getByteLength());
700  send( mcastMsg, "to_upperTier" );
701  }
702 
703  delete dataMsg;
704 }
705