OverSim
PubSubMMOG.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2006 Institut fuer Telematik, Universitaet Karlsruhe (TH)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 //
18 
24 #include <NotifierConsts.h>
25 
26 #include "PubSubMMOG.h"
27 
28 #include "GlobalNodeListAccess.h"
29 #include <GlobalStatistics.h>
30 #include <BootstrapList.h>
31 
33 
34 using namespace std;
35 
37 {
38  // because of IPAddressResolver, we need to wait until interfaces are registered,
39  // address auto-assignment takes place etc.
40  if(stage != MIN_STAGE_OVERLAY) return;
41 
42  state = INIT;
43  setBootstrapedIcon();
44  // TODO: use BootstrapList instead (but this currently preferes
45  // nodes from the same partition)
46  lobbyServer = globalNodeList->getBootstrapNode();
47 
48  joinTimer = new cMessage("join timer");
49  simtime_t joinTime = ceil(simTime() + (simtime_t) par("joinDelay"));
50  scheduleAt( joinTime, joinTimer );
51 
52  movementRate = par("movementRate");
53  eventDeliveryTimer = new PubSubTimer("event delivery timer");
54  eventDeliveryTimer->setType( PUBSUB_EVENTDELIVERY );
55  scheduleAt( joinTime + 1.0/(2*movementRate), eventDeliveryTimer );
56 
57  numSubspaces = par("numSubspaces");
58  subspaceSize = (int) ( (unsigned int) par("areaDimension") / numSubspaces);
59  thisNode.setKey( OverlayKey::random() );
60 
61  maxChildren = par("maxChildren");
63 
64  AOIWidth = par("AOIWidth");
65  maxMoveDelay = par("maxMoveDelay");
66 
67  parentTimeout = par("parentTimeout");
68  heartbeatTimer = new PubSubTimer("HeartbeatTimer");
69  heartbeatTimer->setType( PUBSUB_HEARTBEAT );
70  startTimer( heartbeatTimer );
71  childPingTimer = new PubSubTimer("ChildPingTimer");
72  childPingTimer->setType( PUBSUB_CHILDPING );
73  startTimer( childPingTimer );
74 
75  allowOldMoveMessages = par("allowOldMoveMessages");
76 
77  numEventsWrongTimeslot = numEventsCorrectTimeslot = 0;
78  numPubSubSignalingMessages = 0;
79  pubSubSignalingMessagesSize = 0;
80  numMoveMessages = 0;
81  moveMessagesSize = 0;
82  numMoveListMessages = 0;
83  moveListMessagesSize = 0;
84  respMoveListMessagesSize = 0;
85  lostMovementLists = 0;
86  receivedMovementLists = 0;
87  WATCH( numPubSubSignalingMessages );
88  WATCH( pubSubSignalingMessagesSize );
89  WATCH( numMoveMessages );
90  WATCH( moveMessagesSize );
91  WATCH( numMoveListMessages );
92  WATCH( moveListMessagesSize );
93  WATCH( numEventsWrongTimeslot );
94  WATCH( numEventsCorrectTimeslot );
95  WATCH( lostMovementLists );
96  WATCH( receivedMovementLists );
97  WATCH_LIST( subscribedSubspaces );
98  WATCH_MAP( responsibleSubspaces );
99  WATCH_MAP( backupSubspaces );
100  WATCH_MAP( intermediateSubspaces );
101 }
102 
104 {
105  // delegate messages
106  RPC_SWITCH_START( msg )
107  RPC_DELEGATE( PubSubSubscription, handleSubscriptionCall );
108  RPC_DELEGATE( PubSubTakeOverSubspace, handleTakeOver );
109  RPC_DELEGATE( PubSubBackup, handleBackupCall );
110  RPC_DELEGATE( PubSubIntermediate, handleIntermediateCall );
111  RPC_DELEGATE( PubSubAdoptChild, handleAdoptChildCall );
112  RPC_DELEGATE( PubSubPing, handlePingCall );
113  RPC_SWITCH_END( )
114 
115  return RPC_HANDLED;
116 
117 }
118 
120  cPolymorphic* context, int rpcId,
121  simtime_t rtt)
122 {
123  RPC_SWITCH_START(msg);
124  RPC_ON_RESPONSE( PubSubJoin ) {
125  handleJoinResponse( _PubSubJoinResponse );
126  EV << "[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
127  << " (" << thisNode.getKey().toString(16) << ")]\n"
128  << " Received a PubSubJoin RPC Response: id=" << rpcId << "\n"
129  << " msg=" << *_PubSubJoinResponse << " rtt=" << rtt
130  << endl;
131  break;
132  }
133  RPC_ON_RESPONSE( PubSubSubscription ) {
134  handleSubscriptionResponse( _PubSubSubscriptionResponse );
135  EV << "[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
136  << " (" << thisNode.getKey().toString(16) << ")]\n"
137  << " Received a PubSubSubscription RPC Response: id=" << rpcId << "\n"
138  << " msg=" << *_PubSubSubscriptionResponse << " rtt=" << rtt
139  << endl;
140  break;
141  }
142  RPC_ON_RESPONSE( PubSubResponsibleNode ) {
143  handleResponsibleNodeResponse( _PubSubResponsibleNodeResponse );
144  EV << "[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
145  << " (" << thisNode.getKey().toString(16) << ")]\n"
146  << " Received a PubSubResponsibleNode RPC Response: id=" << rpcId << "\n"
147  << " msg=" << *_PubSubResponsibleNodeResponse << " rtt=" << rtt
148  << endl;
149  break;
150  }
151  RPC_ON_RESPONSE( PubSubHelp ) {
152  handleHelpResponse( _PubSubHelpResponse );
153  EV << "[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
154  << " (" << thisNode.getKey().toString(16) << ")]\n"
155  << " Received a PubSubHelp RPC Response: id=" << rpcId << "\n"
156  << " msg=" << *_PubSubHelpResponse << " rtt=" << rtt
157  << endl;
158  break;
159  }
160  RPC_ON_RESPONSE( PubSubBackup ) {
161  handleBackupResponse( _PubSubBackupResponse );
162  EV << "[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
163  << " (" << thisNode.getKey().toString(16) << ")]\n"
164  << " Received a PubSubBackup RPC Response: id=" << rpcId << "\n"
165  << " msg=" << *_PubSubBackupResponse << " rtt=" << rtt
166  << endl;
167  break;
168  }
169  RPC_ON_RESPONSE( PubSubIntermediate ) {
170  handleIntermediateResponse( _PubSubIntermediateResponse );
171  EV << "[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
172  << " (" << thisNode.getKey().toString(16) << ")]\n"
173  << " Received a PubSubIntermediate RPC Response: id=" << rpcId << "\n"
174  << " msg=" << *_PubSubIntermediateResponse << " rtt=" << rtt
175  << endl;
176  break;
177  }
178  RPC_ON_RESPONSE( PubSubAdoptChild ) {
179  handleAdoptChildResponse( _PubSubAdoptChildResponse );
180  EV << "[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
181  << " (" << thisNode.getKey().toString(16) << ")]\n"
182  << " Received a PubSubAdoptChild RPC Response: id=" << rpcId << "\n"
183  << " msg=" << *_PubSubAdoptChildResponse << " rtt=" << rtt
184  << endl;
185  break;
186  }
187  RPC_ON_RESPONSE( PubSubPing ) {
188  handlePingResponse( _PubSubPingResponse );
189  EV << "[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
190  << " (" << thisNode.getKey().toString(16) << ")]\n"
191  << " Received a PubSubPing RPC Response: id=" << rpcId << "\n"
192  << " msg=" << *_PubSubPingResponse << " rtt=" << rtt
193  << endl;
194  break;
195  }
196  RPC_SWITCH_END( );
197 }
198 
200  const TransportAddress &dest,
201  cPolymorphic* context, int rpcId,
202  const OverlayKey &destKey)
203 {
204  RPC_SWITCH_START(msg)
205  RPC_ON_CALL( PubSubBackup ) {
206  handleBackupCallTimeout( _PubSubBackupCall, dest );
207  EV << "[PubSubMMOG::handleRpcTimeout() @ " << thisNode.getIp()
208  << " (" << thisNode.getKey().toString(16) << ")]\n"
209  << " Backup RPC Call timed out: id=" << rpcId << "\n"
210  << " msg=" << *_PubSubBackupCall
211  << " oldNode=" << dest
212  << endl;
213  break;
214  }
215  RPC_ON_CALL( PubSubPing ) {
216  handlePingCallTimeout( _PubSubPingCall, dest );
217  EV << "[PubSubMMOG::handleRpcTimeout() @ " << thisNode.getIp()
218  << " (" << thisNode.getKey().toString(16) << ")]\n"
219  << " Ping RPC Call timed out: id=" << rpcId << "\n"
220  << " msg=" << *_PubSubPingCall
221  << " oldNode=" << dest
222  << endl;
223  break;
224  }
225  RPC_ON_CALL( PubSubSubscription ) {
226  handleSubscriptionCallTimeout( _PubSubSubscriptionCall, dest );
227  EV << "[PubSubMMOG::handleRpcTimeout() @ " << thisNode.getIp()
228  << " (" << thisNode.getKey().toString(16) << ")]\n"
229  << " Subscription RPC Call timed out: id=" << rpcId << "\n"
230  << " msg=" << *_PubSubSubscriptionCall
231  << " oldNode=" << dest
232  << endl;
233  break;
234  }
235  RPC_SWITCH_END( )
236 
237  // FIXME:
238  // AdoptCall missing!
239  // IntermediateCall missing!
240  // (ResponsibleNodeCall missing)
241  // (HelpCall missing)
242 }
243 
245 {
246  if( PubSubMoveListMessage* moveMsg = dynamic_cast<PubSubMoveListMessage*>(msg) ){
247  handleMoveListMessage( moveMsg );
248  delete moveMsg;
249  } else if( PubSubMoveMessage* moveMsg = dynamic_cast<PubSubMoveMessage*>(msg) ){
250  handleMoveMessage( moveMsg );
251  } else if( PubSubUnsubscriptionMessage* unsMsg = dynamic_cast<PubSubUnsubscriptionMessage*>(msg) ){
252  handleUnsubscriptionMessage( unsMsg );
253  delete unsMsg;
254  } else if( PubSubNodeLeftMessage* leftMsg = dynamic_cast<PubSubNodeLeftMessage*>(msg) ){
255  handleNodeLeftMessage( leftMsg );
256  delete leftMsg;
257  } else if( PubSubReplacementMessage* replaceMsg = dynamic_cast<PubSubReplacementMessage*>(msg) ){
258  handleReplacementMessage( replaceMsg );
259  delete replaceMsg;
260  } else if( PubSubBackupSubscriptionMessage* backupMsg = dynamic_cast<PubSubBackupSubscriptionMessage*>(msg) ){
261  handleSubscriptionBackup( backupMsg );
262  delete backupMsg;
263  } else if( PubSubBackupUnsubscribeMessage* backupMsg = dynamic_cast<PubSubBackupUnsubscribeMessage*>(msg) ){
264  handleUnsubscribeBackup( backupMsg );
265  delete backupMsg;
266  } else if( PubSubBackupIntermediateMessage* backupMsg = dynamic_cast<PubSubBackupIntermediateMessage*>(msg) ){
267  handleIntermediateBackup( backupMsg );
268  delete backupMsg;
269  } else if( PubSubReleaseIntermediateMessage* releaseMsg = dynamic_cast<PubSubReleaseIntermediateMessage*>(msg) ){
270  handleReleaseIntermediate( releaseMsg );
271  delete releaseMsg;
272  }
273 }
274 
275 void PubSubMMOG::handleTimerEvent(cMessage* msg)
276 {
277  if( PubSubTimer* timer = dynamic_cast<PubSubTimer*>(msg) ) {
278  switch( timer->getType() ) {
279  case PUBSUB_HEARTBEAT:
280  sendHearbeatToChildren();
281  startTimer( timer );
282  break;
283  case PUBSUB_CHILDPING:
284  sendPingToChildren();
285  startTimer( timer );
286  break;
288  handleParentTimeout( timer );
289  break;
291  publishEvents();
292  startTimer( timer );
293  break;
294  }
295  } else if( msg == joinTimer ) {
296  // send a fake ready message to app to get initial position
297  // Note: This is not consistent to the paper, where the lobby server
298  // positions player. But it is needed for consistency with other MMOG protocols
299  CompReadyMessage* msg = new CompReadyMessage("fake READY");
300  msg->setReady(true);
301  msg->setComp(getThisCompType());
302  send( msg, "appOut");
303  delete joinTimer;
304  joinTimer = NULL;
305  // send initial AOI size to the application
306  // Note: This is not consistent to the paper.
307  // But it is needed for this nodes movement generation within the application layer.
308  GameAPIResizeAOIMessage* gameMsg = new GameAPIResizeAOIMessage("RESIZE_AOI");
309  gameMsg->setCommand(RESIZE_AOI);
310  gameMsg->setAOIsize(AOIWidth);
311  send( gameMsg, "appOut");
312  }
313 }
314 
315 void PubSubMMOG::handleAppMessage(cMessage* msg)
316 {
317  if( GameAPIPositionMessage *posMsg = dynamic_cast<GameAPIPositionMessage*>(msg) ) {
318  if( state == READY ) {
319  handleMove( posMsg );
320  } else if ( state == JOIN ) {
321  // We are not connected to our responsible node, inform app
322  CompReadyMessage* msg = new CompReadyMessage("Overlay not READY!");
323  msg->setReady(false);
324  msg->setComp(getThisCompType());
325  send( msg, "appOut");
326  } else if ( state == INIT ) {
327  // This is only called for the first MOVE message
328  // Trigger login
329  PubSubJoinCall* joinMsg = new PubSubJoinCall("Login");
330  joinMsg->setPosition( posMsg->getPosition() );
331  // FIXME: Ressource handling not yet supported!
332  joinMsg->setRessources( 4 );
333  sendUdpRpcCall( lobbyServer, joinMsg );
334 
335  state = JOIN;
336  setBootstrapedIcon();
337 
338  // tell app to wait until login is confirmed...
339  CompReadyMessage* readyMsg = new CompReadyMessage("Overlay not READY!");
340  readyMsg->setReady(false);
341  readyMsg->setComp(getThisCompType());
342  send( readyMsg, "appOut");
343 
344  currentRegionX = (unsigned int) (posMsg->getPosition().x/subspaceSize);
345  currentRegionY = (unsigned int) (posMsg->getPosition().y/subspaceSize);
346  }
347  delete msg;
348  }
349 }
350 
352 {
353  if( subResp->getFailed() ) {
354  // TODO: get new resp node...
355  } else {
356  if( state != READY ){
357  state = READY;
358  setBootstrapedIcon();
359  CompReadyMessage* readyMsg = new CompReadyMessage("Overlay READY!");
360  readyMsg->setReady(true);
361  readyMsg->setComp(getThisCompType());
362  sendDelayed( readyMsg, ceil(simTime()) - simTime(), "appOut" );
363  }
364  }
365 }
366 
368 {
369  state = JOIN;
370  setBootstrapedIcon();
371  PubSubSubspaceId region( currentRegionX, currentRegionY, numSubspaces);
372 
373  NodeHandle respNode = joinResp->getResponsibleNode();
374  PubSubSubspace sub(region);
375  sub.setResponsibleNode( respNode );
376  subscribedSubspaces.push_back( sub );
377  if( respNode.isUnspecified() ) {
378  PubSubResponsibleNodeCall* respCall = new PubSubResponsibleNodeCall("Request Responsible NodeHandle");
379  respCall->setSubspacePos( Vector2D(currentRegionX, currentRegionY) );
380  respCall->setBitLength( PUBSUB_RESPONSIBLENODECALL_L( respCall ) );
381  RECORD_STATS(
382  ++numPubSubSignalingMessages;
383  pubSubSignalingMessagesSize+= respCall->getByteLength()
384  );
385  sendUdpRpcCall( lobbyServer, respCall, NULL, 5, 5 ); // FIXME: Make it a parameter...
386  } else {
387  PubSubSubscriptionCall* subCall = new PubSubSubscriptionCall("JoinSubspace");
388  subCall->setSubspaceId( region.getId() );
389  subCall->setBitLength( PUBSUB_SUBSCRIPTIONCALL_L( subCall ));
390  RECORD_STATS(
391  ++numPubSubSignalingMessages;
392  pubSubSignalingMessagesSize+= subCall->getByteLength()
393  );
394  sendUdpRpcCall( respNode, subCall );
395  }
396 }
397 
399 {
400  int subspaceId = subResp->getSubspaceId();
401  NodeHandle respNode = subResp->getResponsibleNode();
402 
403  std::list<PubSubSubspace>::iterator it = subscribedSubspaces.begin();
404  while( it != subscribedSubspaces.end() ) {
405  if( it->getId().getId() == subspaceId) break;
406  ++it;
407  }
408  if( it != subscribedSubspaces.end() ) {
409  it->setResponsibleNode( respNode );
410 
411  PubSubSubscriptionCall* subCall = new PubSubSubscriptionCall("JoinSubspace");
412  subCall->setSubspaceId( subspaceId );
413  subCall->setBitLength( PUBSUB_SUBSCRIPTIONCALL_L( subCall ));
414  RECORD_STATS(
415  ++numPubSubSignalingMessages;
416  pubSubSignalingMessagesSize+= subCall->getByteLength()
417  );
418  sendUdpRpcCall( respNode, subCall );
419  }
420 }
421 
423 {
424  std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
425  it = responsibleSubspaces.find( PubSubSubspaceId(unsMsg->getSubspaceId(), numSubspaces) );
426 
427  if( it != responsibleSubspaces.end() ) {
428  unsubscribeChild( unsMsg->getSrc(), it->second );
429  }
430 }
431 
433 {
434  std::map<PubSubSubspaceId, PubSubSubspaceIntermediate>::iterator it;
435  it = intermediateSubspaces.find( PubSubSubspaceId(leftMsg->getSubspaceId(), numSubspaces) );
436 
437  if( it == intermediateSubspaces.end() ) return;
438 
439  it->second.removeChild( leftMsg->getNode() );
440 }
441 
443 {
444  std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
445  it = responsibleSubspaces.find( PubSubSubspaceId(subCall->getSubspaceId(), numSubspaces) );
446 
447  PubSubBackupSubscriptionMessage* backupMsg = 0;
449  if( it == responsibleSubspaces.end() ) {
450  resp = new PubSubSubscriptionResponse("Subscription failed");
451  resp->setFailed( true );
452  } else {
453  resp = new PubSubSubscriptionResponse("Subscription successful");
454  backupMsg = new PubSubBackupSubscriptionMessage("Backup: new subscription");
455  backupMsg->setSubspaceId( subCall->getSubspaceId() );
456  backupMsg->setChild( subCall->getSrcNode() );
457 
458  if( it->second.addChild( subCall->getSrcNode() )) {
459  // We have still room for the child
460  backupMsg->setParent( thisNode );
461  } else {
462  // Child has to go to an intermediate node...
463  if( PubSubSubspaceResponsible::IntermediateNode* iNode = it->second.getNextFreeIntermediate() ){
464  // find intermediate node with free slots
465  PubSubAdoptChildCall* adoptCall = new PubSubAdoptChildCall("Adopt child");
466  adoptCall->setChild( subCall->getSrcNode() );
467  adoptCall->setSubspaceId( subCall->getSubspaceId() );
468  adoptCall->setBitLength( PUBSUB_ADOPTCHILDCALL_L( adoptCall ));
469  sendUdpRpcCall( iNode->node, adoptCall );
470  RECORD_STATS(
471  ++numPubSubSignalingMessages;
472  pubSubSignalingMessagesSize+= adoptCall->getByteLength()
473  );
474  iNode->waitingChildren++;
475  } else {
476  // no free slots available, create new intermediate node
477  // FIXME: when getting two subscriptions at once, we're requesting too many intermediates
478  PubSubHelpCall* helpCall = new PubSubHelpCall("I need an intermediate node");
479  helpCall->setHelpType( PUBSUB_INTERMEDIATE );
480  helpCall->setSubspaceId( subCall->getSubspaceId() );
481  helpCall->setBitLength( PUBSUB_HELPCALL_L( helpCall ));
482  sendUdpRpcCall( lobbyServer, helpCall );
483  RECORD_STATS(
484  ++numPubSubSignalingMessages;
485  pubSubSignalingMessagesSize+= helpCall->getByteLength()
486  );
487  }
488  }
489  }
490  resp->setBitLength( PUBSUB_SUBSCRIPTIONRESPONSE_L( resp ));
491  sendRpcResponse( subCall, resp );
492  RECORD_STATS(
493  ++numPubSubSignalingMessages;
494  pubSubSignalingMessagesSize+= resp->getByteLength()
495  );
496 
497  if( it == responsibleSubspaces.end() ) return;
498 
499 // FIXME: just for testing
500 PubSubSubspaceResponsible& subspace = it->second;
501 int iii = subspace.getTotalChildrenCount();
502 subspace.fixTotalChildrenCount();
503 if( iii != subspace.getTotalChildrenCount() ){
504  opp_error("Huh?");
505 }
506 
507  if( !it->second.getBackupNode().isUnspecified() ){
508  backupMsg->setBitLength( PUBSUB_BACKUPSUBSCRIPTION_L( backupMsg ));
509  RECORD_STATS(
510  ++numPubSubSignalingMessages;
511  pubSubSignalingMessagesSize+= backupMsg->getByteLength()
512  );
513  sendMessageToUDP( it->second.getBackupNode(), backupMsg );
514  } else {
515  delete backupMsg;
516  }
517 }
518 
520 {
521  PubSubSubspaceId region((int) toCall->getSubspacePos().x, (int) toCall->getSubspacePos().y, numSubspaces);
522 
523  takeOverNewSubspace( region );
524 
525  PubSubTakeOverSubspaceResponse* toResp = new PubSubTakeOverSubspaceResponse("Accept subspace responsibility");
526  toResp->setSubspacePos( toCall->getSubspacePos() );
527  toResp->setBitLength( PUBSUB_TAKEOVERSUBSPACERESPONSE_L( toResp ));
528  RECORD_STATS(
529  ++numPubSubSignalingMessages;
530  pubSubSignalingMessagesSize+= toResp->getByteLength()
531  );
532  sendRpcResponse( toCall, toResp );
533 }
534 
535 void PubSubMMOG::receiveChangeNotification(int category, const cPolymorphic *details)
536 {
537  if(category == NF_OVERLAY_NODE_GRACEFUL_LEAVE && state == READY) {
538  }
539 }
540 
542 {
543  currentRegionX = (unsigned int) (posMsg->getPosition().x/subspaceSize);
544  currentRegionY = (unsigned int) (posMsg->getPosition().y/subspaceSize);
545 
546  PubSubSubspaceId region( currentRegionX, currentRegionY, numSubspaces);
547 
548  set<PubSubSubspaceId> expectedRegions;
549  int minX = (int) ((posMsg->getPosition().x - AOIWidth)/subspaceSize);
550  if( minX < 0 ) minX = 0;
551  int maxX = (int) ((posMsg->getPosition().x + AOIWidth)/subspaceSize);
552  if( maxX >= numSubspaces ) maxX = numSubspaces -1;
553  int minY = (int) ((posMsg->getPosition().y - AOIWidth)/subspaceSize);
554  if( minY < 0 ) minY = 0;
555  int maxY = (int) ((posMsg->getPosition().y + AOIWidth)/subspaceSize);
556  if( maxY >= numSubspaces ) maxY = numSubspaces -1;
557 
558  // FIXME: make parameter: unsubscription size
559  int minUnsubX = (int) ((posMsg->getPosition().x - 1.5*AOIWidth)/subspaceSize);
560  if( minUnsubX < 0 ) minUnsubX = 0;
561  int maxUnsubX = (int) ((posMsg->getPosition().x + 1.5*AOIWidth)/subspaceSize);
562  if( maxUnsubX >= numSubspaces ) maxUnsubX = numSubspaces -1;
563  int minUnsubY = (int) ((posMsg->getPosition().y - 1.5*AOIWidth)/subspaceSize);
564  if( minUnsubY < 0 ) minUnsubY = 0;
565  int maxUnsubY = (int) ((posMsg->getPosition().y + 1.5+AOIWidth)/subspaceSize);
566  if( maxUnsubY >= numSubspaces ) maxUnsubY = numSubspaces -1;
567 
568  for( int x = minX; x <= maxX; ++x ){
569  for( int y = minY; y <= maxY; ++y ){
570  expectedRegions.insert( PubSubSubspaceId( x, y, numSubspaces ));
571  }
572  }
573 
574  list<PubSubSubspace>::iterator subIt = subscribedSubspaces.begin();
575  PubSubSubspace* subspace = NULL;
576  while( subIt != subscribedSubspaces.end() ){
577  if( subIt->getId() == region ){
578  subspace = &*subIt;
579  }
580  expectedRegions.erase( subIt->getId() );
581 
582  // unsubscribe region if to far away
583  if( subIt->getId().getX() < minX || subIt->getId().getX() > maxX ||
584  subIt->getId().getY() < minY || subIt->getId().getY() > maxY ){
585  if( !subIt->getResponsibleNode().isUnspecified() ){
586  PubSubUnsubscriptionMessage* unsubMsg = new PubSubUnsubscriptionMessage("Unsubscribe from subspace");
587  unsubMsg->setSubspaceId( subIt->getId().getId() );
588  unsubMsg->setSrc( thisNode );
589  unsubMsg->setBitLength( PUBSUB_UNSUBSCRIPTION_L( unsubMsg ));
590  RECORD_STATS(
591  ++numPubSubSignalingMessages;
592  pubSubSignalingMessagesSize+= unsubMsg->getByteLength()
593  );
594  sendMessageToUDP( subIt->getResponsibleNode(), unsubMsg );
595  }
596  // Erase subspace from subscribedList and increase iterator
597  subscribedSubspaces.erase( subIt++ );
598  } else {
599  ++subIt;
600  }
601  }
602 
603  // if any "near" region is not yet subscribed, subscribe
604  for( set<PubSubSubspaceId>::iterator regionIt = expectedRegions.begin(); regionIt != expectedRegions.end(); ++regionIt ){
605  PubSubSubspace sub( *regionIt );
606  subscribedSubspaces.push_back( sub );
607  PubSubResponsibleNodeCall* respCall = new PubSubResponsibleNodeCall("Request Responsible NodeHandle");
608  respCall->setSubspacePos( Vector2D(regionIt->getX(), regionIt->getY()) );
609  respCall->setBitLength( PUBSUB_RESPONSIBLENODECALL_L( respCall ));
610  RECORD_STATS(
611  ++numPubSubSignalingMessages;
612  pubSubSignalingMessagesSize+= respCall->getByteLength()
613  );
614  sendUdpRpcCall( lobbyServer, respCall, NULL, 5, 5 ); // FIXME: Make it a parameter...
615  }
616 
617  if( subspace && !subspace->getResponsibleNode().isUnspecified() ){
618  PubSubMoveMessage* moveMsg = new PubSubMoveMessage("Player move");
619  moveMsg->setSubspaceId( region.getId() );
620  moveMsg->setPlayer( thisNode );
621  moveMsg->setPosition( posMsg->getPosition() );
622  moveMsg->setTimestamp( simTime() );
623  moveMsg->setBitLength( PUBSUB_MOVE_L( moveMsg ));
624  RECORD_STATS(
625  ++numMoveMessages;
626  moveMessagesSize+= moveMsg->getByteLength()
627  );
628  sendMessageToUDP( subspace->getResponsibleNode(), moveMsg );
629  } else {
630  // trying to move to not-yet subscribed region
631  // FIXME: change state to JOIN?
632  }
633 }
634 
636 {
637  std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
638  it = responsibleSubspaces.find( PubSubSubspaceId(moveMsg->getSubspaceId(), numSubspaces) );
639  if( it == responsibleSubspaces.end() ){
640  EV << "[PubSubMMOG::handleMoveMessage() @ " << thisNode.getIp()
641  << " (" << thisNode.getKey().toString(16) << ")]\n"
642  << " received moveMessage for unknown subspace" << moveMsg->getSubspaceId() << "\n"
643  << endl;
644  return;
645  }
646 
647  // If message arrived in the correct timeslot, store move message until deadline
648  // Note: This assumes, we get no messages with future timestamps. At least in
649  // the simulation, this assumption will hold.
650  // The allowOldMoveMessages parameter allows overriding the timeslot barriers and forward all
651  // messages.
652  if( allowOldMoveMessages || moveMsg->getTimestamp() >= eventDeliveryTimer->getArrivalTime() - 1.0/(2*movementRate) ){
653  it->second.waitingMoveMessages.push_back( moveMsg );
654  ++numEventsCorrectTimeslot;
655  } else {
656  EV << "[PubSubMMOG::handleMoveMessage() @ " << thisNode.getIp()
657  << " (" << thisNode.getKey().toString(16) << ")]\n"
658  << " received moveMesage with Timestamp: " << moveMsg->getTimestamp() << "\n"
659  << " deadline was: " << eventDeliveryTimer->getArrivalTime() - 1.0/(2*movementRate) << "\n"
660  << endl;
661  ++numEventsWrongTimeslot;
662  cancelAndDelete( moveMsg );
663  }
664 }
665 
667 {
668  simtime_t timestamp = moveMsg->getTimestamp();
669 
670  // If I'm intermediate node for this subspace, forward message to children
671  std::map<PubSubSubspaceId, PubSubSubspaceIntermediate>::iterator it;
672  it = intermediateSubspaces.find( PubSubSubspaceId(moveMsg->getSubspaceId(), numSubspaces) );
673  if( it != intermediateSubspaces.end() ){
674  // Forward only if the message has not already been forwarded
675  if( it->second.getLastTimestamp() < moveMsg->getTimestamp() ){
676  set<NodeHandle>::iterator childIt;
677  for( childIt = it->second.children.begin(); childIt != it->second.children.end(); ++childIt ){
678  sendMessageToUDP( *childIt, (BaseOverlayMessage*) moveMsg->dup() );
679  RECORD_STATS(
680  ++numMoveListMessages;
681  moveListMessagesSize+= moveMsg->getByteLength()
682  );
683  }
684  it->second.setTimestamp( timestamp );
685  }
686  }
687 
688  // If I'm subscribed to the subspace, transfer a GameAPIMoveList to app
689  std::list<PubSubSubspace>::iterator subIt;
690  for( subIt = subscribedSubspaces.begin(); subIt != subscribedSubspaces.end(); ++subIt ){
691  if( subIt->getId().getId() == moveMsg->getSubspaceId() ){
692  if( subIt->getLastTimestamp() < moveMsg->getTimestamp() ){
693  GameAPIListMessage* moveList = new GameAPIListMessage("player position update");
694  moveList->setCommand( NEIGHBOR_UPDATE );
695  moveList->setAddNeighborArraySize( moveMsg->getPlayerArraySize() );
696  moveList->setNeighborPositionArraySize( moveMsg->getPositionArraySize() );
697  for( unsigned int i = 0; i < moveMsg->getPlayerArraySize(); ++i ){
698  moveList->setAddNeighbor( i, moveMsg->getPlayer(i) );
699  moveList->setNeighborPosition( i, moveMsg->getPosition(i) );
700  RECORD_STATS(
701  globalStatistics->addStdDev("PubSubMMOG: MoveDelay",
702  SIMTIME_DBL(simTime() - timestamp + moveMsg->getPositionAge(i)) );
703  );
704  }
705  send( moveList, "appOut" );
706  RECORD_STATS(
707  if( timestamp < simTime() - maxMoveDelay ){
708  ++lostMovementLists;
709  } else {
710  ++receivedMovementLists;
711  }
712  if( subIt->getLastTimestamp() != 0) lostMovementLists += (int)(SIMTIME_DBL(timestamp - subIt->getLastTimestamp())*movementRate -1);
713 
714  );
715  subIt->setTimestamp( timestamp );
716  }
717  return;
718  }
719  }
720 }
721 
723 {
724  // lobby server answered our call for help
725  // (i.e. he sends us a candidate for backup/intermediate nodes
726  if( helpResp->getHelpType() == PUBSUB_BACKUP ){
727  PubSubBackupCall* backupCall = new PubSubBackupCall("Become my backup node!");
728  backupCall->setSubspaceId( helpResp->getSubspaceId() );
729 
730  // Find the subspace in the subspace map
731  std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
732  it = responsibleSubspaces.find( PubSubSubspaceId(helpResp->getSubspaceId(), numSubspaces) );
733  if( it == responsibleSubspaces.end() ){
734  EV << "[PubSubMMOG::handleHelpResponse() @ " << thisNode.getIp()
735  << " (" << thisNode.getKey().toString(16) << ")]\n"
736  << " received helpResponse for unknown subspace" << helpResp->getSubspaceId() << "\n"
737  << endl;
738  return;
739  }
740  PubSubSubspaceResponsible& subspace = it->second;
741 
742  // Assume the new backup will not refuse his task
743  subspace.setBackupNode( helpResp->getNode() );
744 
745 // FIXME: just for testing
746 int iii = subspace.getTotalChildrenCount();
747 subspace.fixTotalChildrenCount();
748 if( iii != subspace.getTotalChildrenCount() ){
749  opp_error("Huh?");
750 }
751 
752  // backup the load balancing tree
753  backupCall->setChildrenArraySize( subspace.getTotalChildrenCount() );
754  backupCall->setChildrenPosArraySize( subspace.getTotalChildrenCount() );
755  backupCall->setIntermediatesArraySize( subspace.intermediateNodes.size() );
756 
757  set<NodeHandle>::iterator childIt;
758  map<NodeHandle, bool>::iterator childMapIt;
759  unsigned int i = 0;
760  for( childMapIt = subspace.cachedChildren.begin(); childMapIt != subspace.cachedChildren.end(); ++childMapIt ){
761  backupCall->setChildren(i, childMapIt->first);
762  backupCall->setChildrenPos(i, -2);
763  ++i;
764  }
765  for( childIt = subspace.children.begin(); childIt != subspace.children.end(); ++childIt ){
766  backupCall->setChildren(i, *childIt);
767  backupCall->setChildrenPos(i, -1);
768  ++i;
769  }
770  for( unsigned int ii = 0; ii < subspace.intermediateNodes.size(); ++ii ){
772  backupCall->setIntermediates(ii, iNode.node);
773  for( childIt = iNode.children.begin(); childIt != iNode.children.end(); ++childIt ){
774  backupCall->setChildren(i, *childIt);
775  backupCall->setChildrenPos(i, ii);
776  ++i;
777  }
778  }
779 
780  backupCall->setBitLength( PUBSUB_BACKUPCALL_L( backupCall ));
781  RECORD_STATS(
782  ++numPubSubSignalingMessages;
783  pubSubSignalingMessagesSize+= backupCall->getByteLength()
784  );
785  sendUdpRpcCall( helpResp->getNode(), backupCall );
786 
787  } else if( helpResp->getHelpType() == PUBSUB_INTERMEDIATE ){
788  PubSubIntermediateCall* intermediateCall = new PubSubIntermediateCall("Become my intermediate node!");
789  intermediateCall->setSubspaceId( helpResp->getSubspaceId() );
790  intermediateCall->setBitLength( PUBSUB_INTERMEDIATECALL_L( intermediateCall ));
791  RECORD_STATS(
792  ++numPubSubSignalingMessages;
793  pubSubSignalingMessagesSize+= intermediateCall->getByteLength()
794  );
795  sendUdpRpcCall( helpResp->getNode(), intermediateCall );
796  }
797 }
798 
800 {
801  int intId = backupCall->getSubspaceId();
802  PubSubSubspaceId subspaceId(intId, numSubspaces);
803 
804  // Start Heartbeat Timer
805  PubSubTimer* parentTimeout = new PubSubTimer("ParentTimeout");
806  parentTimeout->setType( PUBSUB_PARENT_TIMEOUT );
807  parentTimeout->setSubspaceId( intId );
808  startTimer( parentTimeout );
809 
810  // insert subspace into responsible list
811  PubSubSubspaceResponsible subspace( subspaceId );
812  subspace.setResponsibleNode( backupCall->getSrcNode() );
813  subspace.setHeartbeatTimer( parentTimeout );
814 
815  // recounstruct load balancing tree
816  for( unsigned int i = 0; i < backupCall->getIntermediatesArraySize(); ++i ){
818  iNode.node = backupCall->getIntermediates(i);
819  subspace.intermediateNodes.push_back( iNode );
820  }
821  for( unsigned int i = 0; i < backupCall->getChildrenArraySize(); ++i ){
822  int pos = backupCall->getChildrenPos( i );
823  if( pos == -2 ){
824  subspace.cachedChildren.insert( make_pair( backupCall->getChildren(i), false ));
825  } else if( pos == -1 ){
826  subspace.children.insert( backupCall->getChildren(i) );
827  } else {
828  subspace.intermediateNodes[pos].children.insert( backupCall->getChildren(i) );
829  }
830  }
831 
832  backupSubspaces.insert( make_pair(subspaceId, subspace) );
833 
834  PubSubBackupResponse* backupResp = new PubSubBackupResponse("I'll be your backup");
835  backupResp->setSubspaceId( intId );
836  backupResp->setBitLength( PUBSUB_BACKUPRESPONSE_L( backupResp ));
837  RECORD_STATS(
838  ++numPubSubSignalingMessages;
839  pubSubSignalingMessagesSize+= backupResp->getByteLength()
840  );
841  sendRpcResponse( backupCall, backupResp );
842 }
843 
845 {
846  // Nothing to be done
847  // HandleHelpResponse() already did everything important
848 }
849 
851 {
852  // insert subspace into intermediate list
853  PubSubSubspaceId subspaceId(intermediateCall->getSubspaceId(), numSubspaces);
854  PubSubSubspaceIntermediate subspace( subspaceId );
855  subspace.setResponsibleNode( intermediateCall->getSrcNode() );
856  subspace.setTimestamp(0);
857  intermediateSubspaces.insert( make_pair(subspaceId, subspace) );
858 
859  PubSubIntermediateResponse* iResp = new PubSubIntermediateResponse("I'll be your intermediate node");
860  iResp->setSubspaceId( intermediateCall->getSubspaceId() );
861  iResp->setBitLength( PUBSUB_INTERMEDIATERESPONSE_L( iResp ));
862  RECORD_STATS(
863  ++numPubSubSignalingMessages;
864  pubSubSignalingMessagesSize+= iResp->getByteLength()
865  );
866  sendRpcResponse( intermediateCall, iResp );
867 }
868 
870 {
871  // we found a new intermediate node for a subspace
872  std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
873  it = responsibleSubspaces.find( PubSubSubspaceId(intermediateResp->getSubspaceId(), numSubspaces) );
874  if( it == responsibleSubspaces.end() ) {
875  EV << "[PubSubMMOG::handleIntermediateResponse() @ " << thisNode.getIp()
876  << " (" << thisNode.getKey().toString(16) << ")]\n"
877  << " Received Intermediate Response for unknown Subspace!\n"
878  << endl;
879  return;
880  }
881  PubSubSubspaceResponsible& subspace = it->second;
883  iNode.node = intermediateResp->getSrcNode();
884 
885  // if there is any broken intermediate node in list, replace it
886  bool newIntermediate = true;
887  deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
888  for( iit = subspace.intermediateNodes.begin(); iit != subspace.intermediateNodes.end(); ++iit ){
889  if( iit->node.isUnspecified() ){
890  iit->node = iNode.node;
891  newIntermediate = false;
892  break;
893  }
894  }
895  if( iit == subspace.intermediateNodes.end() ){
896  subspace.intermediateNodes.push_back( iNode );
897  iit = subspace.intermediateNodes.end() - 1;
898  }
899 
900  // inform Backup
901  if( !subspace.getBackupNode().isUnspecified() ){
902  PubSubBackupIntermediateMessage* backupMsg = new PubSubBackupIntermediateMessage("Backup: new Intermediate");
903  backupMsg->setSubspaceId( intermediateResp->getSubspaceId() );
904  backupMsg->setNode( iNode.node );
905  backupMsg->setPos( iit - subspace.intermediateNodes.begin() );
906  backupMsg->setBitLength( PUBSUB_BACKUPINTERMEDIATE_L( backupMsg ));
907  RECORD_STATS(
908  ++numPubSubSignalingMessages;
909  pubSubSignalingMessagesSize+= backupMsg->getByteLength()
910  );
911  sendMessageToUDP( subspace.getBackupNode(), backupMsg );
912  }
913 
914  // if needed, send adopt to parent
915  int intermediatePos = iit - subspace.intermediateNodes.begin();
916  int parentPos = intermediatePos/maxChildren -1;
917  if( parentPos >= 0 && !subspace.intermediateNodes[parentPos].node.isUnspecified() ){
918  PubSubAdoptChildCall* adoptCall = new PubSubAdoptChildCall("Adopt (intermediate) Node");
919  adoptCall->setSubspaceId( intermediateResp->getSubspaceId() );
920  adoptCall->setChild( iit->node );
921  adoptCall->setBitLength( PUBSUB_ADOPTCHILDCALL_L( adoptCall ));
922  RECORD_STATS(
923  ++numPubSubSignalingMessages;
924  pubSubSignalingMessagesSize+= adoptCall->getByteLength()
925  );
926  sendUdpRpcCall( subspace.intermediateNodes[parentPos].node, adoptCall );
927  }
928 
929  if( newIntermediate ){
930  // move one child from iNodes's parent to cache
931  if( parentPos >= 0 ) {
932  // parent is an intermediate node
934  if( parent.children.begin() != parent.children.end() ){
935  bool fixNeeded = false;
936  if( !subspace.cachedChildren.insert( make_pair( *(parent.children.begin()), false )).second ){
937  fixNeeded = true;
938  }
939  if( !subspace.getBackupNode().isUnspecified() ){
940  PubSubBackupSubscriptionMessage* backupMsg = new PubSubBackupSubscriptionMessage("Backup: nodes moved to cache");
941  backupMsg->setSubspaceId( intermediateResp->getSubspaceId() );
942  backupMsg->setChild( *(parent.children.begin()) );
943  backupMsg->setOldParent( parent.node );
944  backupMsg->setBitLength( PUBSUB_BACKUPSUBSCRIPTION_L( backupMsg ));
945  RECORD_STATS(
946  ++numPubSubSignalingMessages;
947  pubSubSignalingMessagesSize+= backupMsg->getByteLength()
948  );
949  sendMessageToUDP( subspace.getBackupNode(), backupMsg );
950  }
951  PubSubNodeLeftMessage* goneMsg = new PubSubNodeLeftMessage("Node left: moved");
952  goneMsg->setNode( *(parent.children.begin()) );
953  goneMsg->setSubspaceId( intermediateResp->getSubspaceId() );
954  goneMsg->setBitLength( PUBSUB_NODELEFT_L( goneMsg ));
955  RECORD_STATS(
956  ++numPubSubSignalingMessages;
957  pubSubSignalingMessagesSize+= goneMsg->getByteLength()
958  );
959  sendMessageToUDP( parent.node, goneMsg );
960  parent.children.erase( parent.children.begin() );
961  if( fixNeeded ){
962  subspace.fixTotalChildrenCount();
963  }
964  }
965 
966  } else {
967  // we are parent
968  if( subspace.children.begin() != subspace.children.end() ){
969  bool fixNeeded = false;
970  if( !subspace.cachedChildren.insert( make_pair( *(subspace.children.begin()), false )).second ){
971  fixNeeded = true;
972  }
973  if( !subspace.getBackupNode().isUnspecified() ){
974  PubSubBackupSubscriptionMessage* backupMsg = new PubSubBackupSubscriptionMessage("Backup: nodes moved to cache");
975  backupMsg->setSubspaceId( intermediateResp->getSubspaceId() );
976  backupMsg->setChild( *(subspace.children.begin()) );
977  backupMsg->setOldParent( thisNode );
978  backupMsg->setBitLength( PUBSUB_BACKUPSUBSCRIPTION_L( backupMsg ));
979  RECORD_STATS(
980  ++numPubSubSignalingMessages;
981  pubSubSignalingMessagesSize+= backupMsg->getByteLength()
982  );
983  sendMessageToUDP( subspace.getBackupNode(), backupMsg );
984  }
985  subspace.children.erase( *(subspace.children.begin()) );
986  if( fixNeeded ){
987  subspace.fixTotalChildrenCount();
988  }
989  }
990  }
991  } else {
992  // send adopt for all children intermediates
993  for( int pos = (intermediatePos+1) * maxChildren; pos < (int) subspace.intermediateNodes.size() &&
994  pos < (intermediatePos+2) * maxChildren; ++pos ){
995  if( subspace.intermediateNodes[pos].node.isUnspecified() ) continue;
996  PubSubAdoptChildCall* adoptCall = new PubSubAdoptChildCall("Adopt (intermediate) Node");
997  adoptCall->setSubspaceId( intermediateResp->getSubspaceId() );
998  adoptCall->setChild( subspace.intermediateNodes[pos].node );
999  adoptCall->setBitLength( PUBSUB_ADOPTCHILDCALL_L( adoptCall ));
1000  RECORD_STATS(
1001  ++numPubSubSignalingMessages;
1002  pubSubSignalingMessagesSize+= adoptCall->getByteLength()
1003  );
1004  sendUdpRpcCall( iit->node, adoptCall );
1005  }
1006  }
1007 
1008  // move as many cached children to the new node as possible
1009  std::map<NodeHandle,bool>::iterator childIt;
1010  for( childIt = subspace.cachedChildren.begin(); childIt != subspace.cachedChildren.end(); ++childIt ){
1011  if( childIt->second ) continue;
1012  PubSubAdoptChildCall* adoptCall = new PubSubAdoptChildCall("Adopt Node");
1013  adoptCall->setSubspaceId( intermediateResp->getSubspaceId() );
1014  adoptCall->setChild( childIt->first );
1015  adoptCall->setBitLength( PUBSUB_ADOPTCHILDCALL_L( adoptCall ));
1016  RECORD_STATS(
1017  ++numPubSubSignalingMessages;
1018  pubSubSignalingMessagesSize+= adoptCall->getByteLength()
1019  );
1020  sendUdpRpcCall( intermediateResp->getSrcNode(), adoptCall );
1021  childIt->second = true;
1022  if( (unsigned int) maxChildren == ++(iit->waitingChildren) ) break;
1023  }
1024 }
1025 
1027 {
1028  std::map<PubSubSubspaceId, PubSubSubspaceIntermediate>::iterator it;
1029  it = intermediateSubspaces.find( PubSubSubspaceId(adoptCall->getSubspaceId(), numSubspaces) );
1030  if( it == intermediateSubspaces.end() ) {
1031  EV << "[PubSubMMOG::handleAdoptChildCall() @ " << thisNode.getIp()
1032  << " (" << thisNode.getKey().toString(16) << ")]\n"
1033  << " Received Adopt Child Call for unknown Subspace!\n"
1034  << endl;
1035  cancelAndDelete( adoptCall );
1036  return;
1037  }
1038 
1039  it->second.addChild( adoptCall->getChild() );
1040  PubSubAdoptChildResponse* adoptResp = new PubSubAdoptChildResponse("I adopted child");
1041  adoptResp->setSubspaceId( adoptCall->getSubspaceId() );
1042  adoptResp->setChild( adoptCall->getChild() );
1043  adoptResp->setBitLength( PUBSUB_ADOPTCHILDRESPONSE_L( adoptResp ));
1044  RECORD_STATS(
1045  ++numPubSubSignalingMessages;
1046  pubSubSignalingMessagesSize+= adoptResp->getByteLength()
1047  );
1048  sendRpcResponse( adoptCall, adoptResp );
1049 }
1050 
1052 {
1053  std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1054  it = responsibleSubspaces.find( PubSubSubspaceId(adoptResp->getSubspaceId(), numSubspaces) );
1055  if( it == responsibleSubspaces.end() ) {
1056  EV << "[PubSubMMOG::handleAdoptChildResponse() @ " << thisNode.getIp()
1057  << " (" << thisNode.getKey().toString(16) << ")]\n"
1058  << " Received AdoptChild Response for unknown Subspace!\n"
1059  << endl;
1060  return;
1061  }
1062 
1063 // FIXME: just for testing
1064 PubSubSubspaceResponsible& subspace = it->second;
1065 int iii = subspace.getTotalChildrenCount();
1066 subspace.fixTotalChildrenCount();
1067 if( iii != subspace.getTotalChildrenCount() ){
1068  opp_error("Huh?");
1069 }
1070 
1071  // Find intermediate node in subspace
1072  deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
1073  for( iit = it->second.intermediateNodes.begin(); iit != it->second.intermediateNodes.end(); ++iit ){
1074  if( !iit->node.isUnspecified() && iit->node == adoptResp->getSrcNode() ){
1075 
1076  // if adoption was for a child intermediate node, nothing is to be done
1077  int intermediatePos = iit - it->second.intermediateNodes.begin();
1078  for( int pos = (intermediatePos+1) * maxChildren; pos < (int) it->second.intermediateNodes.size() &&
1079  pos < (intermediatePos+2) * maxChildren; ++pos )
1080  {
1081  if( !it->second.intermediateNodes[pos].node.isUnspecified() &&
1082  adoptResp->getChild() == it->second.intermediateNodes[pos].node ){
1083  return;
1084  }
1085  }
1086 
1087  // child is a "real" child->remove it from cache
1088  if( !it->second.cachedChildren.erase( adoptResp->getChild() ) ){
1089  // if node got deleted in the meantime, inform parent...
1090  PubSubNodeLeftMessage* goneMsg = new PubSubNodeLeftMessage("Node left Subspace");
1091  goneMsg->setNode( adoptResp->getChild() );
1092  goneMsg->setSubspaceId( it->second.getId().getId() );
1093  goneMsg->setBitLength( PUBSUB_NODELEFT_L( goneMsg ));
1094  RECORD_STATS(
1095  ++numPubSubSignalingMessages;
1096  pubSubSignalingMessagesSize+= goneMsg->getByteLength()
1097  );
1098  sendMessageToUDP( adoptResp->getSrcNode(), goneMsg );
1099  return;
1100  }
1101 
1102  // move child to intermediate node's childrenlist
1103  if( !iit->children.insert( adoptResp->getChild() ).second ){
1104  // Node was already in children list, fix children count
1105  subspace.fixTotalChildrenCount();
1106  }
1107  iit->waitingChildren--;
1108 
1109 // FIXME: just for testing
1110 PubSubSubspaceResponsible& subspace = it->second;
1111 int iii = subspace.getTotalChildrenCount();
1112 subspace.fixTotalChildrenCount();
1113 if( iii != subspace.getTotalChildrenCount() ){
1114  opp_error("Huh?");
1115 }
1116 
1117  // Inform Backup
1118  if( !it->second.getBackupNode().isUnspecified() ){
1119  PubSubBackupSubscriptionMessage* backupMsg = new PubSubBackupSubscriptionMessage("Backup: node got a new parent");
1120  backupMsg->setSubspaceId( adoptResp->getSubspaceId() );
1121  backupMsg->setChild( adoptResp->getChild() );
1122  backupMsg->setParent( adoptResp->getSrcNode() );
1123  backupMsg->setBitLength( PUBSUB_BACKUPSUBSCRIPTION_L( backupMsg ));
1124  RECORD_STATS(
1125  ++numPubSubSignalingMessages;
1126  pubSubSignalingMessagesSize+= backupMsg->getByteLength()
1127  );
1128  sendMessageToUDP( it->second.getBackupNode(), backupMsg );
1129  return;
1130  }
1131  }
1132  }
1133 
1134  EV << "[PubSubMMOG::handleAdoptChildResponse() @ " << thisNode.getIp()
1135  << " (" << thisNode.getKey().toString(16) << ")]\n"
1136  << " Received AdoptChild Response for unknown child!\n"
1137  << endl;
1138 }
1139 
1141 {
1142  int subspaceId = pingCall->getSubspaceId();
1143 
1144  if( pingCall->getPingType() == PUBSUB_PING_BACKUP ){
1145  // reset heartbeat timer
1146  std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1147  it = backupSubspaces.find( PubSubSubspaceId(pingCall->getSubspaceId(), numSubspaces) );
1148  if( it == backupSubspaces.end() ) {
1149  EV << "[PubSubMMOG::handlePingCall() @ " << thisNode.getIp()
1150  << " (" << thisNode.getKey().toString(16) << ")]\n"
1151  << " Received PingCall for unknown Subspace!\n"
1152  << endl;
1153  // FIXME: Somebody thinks we are his backup. What shall we do?
1154  } else {
1155  it->second.resetHeartbeatFailCount();
1156  startTimer( it->second.getHeartbeatTimer() );
1157  }
1158  }
1159 
1160  PubSubPingResponse* pingResp = new PubSubPingResponse("PingResponse");
1161  pingResp->setSubspaceId( subspaceId );
1162  pingResp->setBitLength( PUBSUB_PINGRESPONSE_L( pingResp ));
1163  RECORD_STATS(
1164  ++numPubSubSignalingMessages;
1165  pubSubSignalingMessagesSize+= pingResp->getByteLength()
1166  );
1167  sendRpcResponse( pingCall, pingResp );
1168 }
1169 
1171 {
1172 }
1173 
1175 {
1176  // create a new subspace
1177  PubSubSubspaceResponsible subspace( subspaceId );
1178  takeOverSubspace( subspace, true );
1179 }
1180 
1181 void PubSubMMOG::takeOverSubspace( PubSubSubspaceResponsible& subspace, bool isNew = false )
1182 {
1183  const PubSubSubspaceId& subspaceId = subspace.getId();
1184  int intId = subspaceId.getId();
1185 
1186  subspace.fixTotalChildrenCount();
1187 
1188  NodeHandle oldNode = subspace.getResponsibleNode();
1189 
1190  // insert subspace into responsible list
1191  subspace.setResponsibleNode( thisNode );
1192  responsibleSubspaces.insert( make_pair(subspaceId, subspace) );
1193 
1194  // request backup
1195  PubSubHelpCall* helpCall = new PubSubHelpCall("I need a backup node");
1196  helpCall->setHelpType( PUBSUB_BACKUP );
1197  helpCall->setSubspaceId( intId );
1198  helpCall->setBitLength( PUBSUB_HELPCALL_L( helpCall ));
1199  RECORD_STATS(
1200  ++numPubSubSignalingMessages;
1201  pubSubSignalingMessagesSize+= helpCall->getByteLength()
1202  );
1203  sendUdpRpcCall( lobbyServer, helpCall );
1204 
1205  if( !isNew ) {
1206  PubSubReplacementMessage* repMsg = new PubSubReplacementMessage("I replaced the responsible node");
1207  repMsg->setSubspaceId( intId );
1208  repMsg->setNewResponsibleNode( thisNode );
1209  repMsg->setBitLength( PUBSUB_REPLACEMENT_L( repMsg ));
1210 
1211  // Inform children and lobbyserver about takeover
1212  sendMessageToChildren( subspace, repMsg, NULL, repMsg );
1213  sendMessageToUDP( lobbyServer, repMsg );
1214 
1215  // inform lobby server over failed node
1216  PubSubFailedNodeMessage* failedNode = new PubSubFailedNodeMessage("Node failed");
1217  failedNode->setFailedNode( oldNode );
1218  failedNode->setBitLength( PUBSUB_FAILEDNODE_L( failedNode ));
1219  RECORD_STATS(
1220  ++numPubSubSignalingMessages;
1221  pubSubSignalingMessagesSize+= failedNode->getByteLength()
1222  );
1223  sendMessageToUDP( lobbyServer, failedNode );
1224  }
1225 }
1226 
1228 {
1229  std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1230  for( it = responsibleSubspaces.begin(); it != responsibleSubspaces.end(); ++it) {
1231  PubSubPingCall* bHeartbeat = new PubSubPingCall("Heartbeat Backup");
1232  bHeartbeat->setPingType( PUBSUB_PING_BACKUP );
1233  bHeartbeat->setSubspaceId( it->second.getId().getId() );
1234  bHeartbeat->setBitLength( PUBSUB_PINGCALL_L( bHeartbeat ));
1235 
1236  PubSubPingCall* iHeartbeat = new PubSubPingCall("Heartbeat Intermediate");
1237  iHeartbeat->setPingType( PUBSUB_PING_INTERMEDIATE );
1238  iHeartbeat->setSubspaceId( it->second.getId().getId() );
1239  iHeartbeat->setBitLength( PUBSUB_PINGCALL_L( iHeartbeat ));
1240 
1241  sendMessageToChildren( it->second, iHeartbeat, bHeartbeat, NULL);
1242  delete bHeartbeat;
1243  delete iHeartbeat;
1244  }
1245 }
1246 
1248 {
1249  std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1250  for( it = responsibleSubspaces.begin(); it != responsibleSubspaces.end(); ++it) {
1251  PubSubPingCall* heartbeat = new PubSubPingCall("Ping");
1252  heartbeat->setPingType( PUBSUB_PING_CHILD );
1253  heartbeat->setSubspaceId( it->second.getId().getId() );
1254  heartbeat->setBitLength( PUBSUB_PINGCALL_L( heartbeat ));
1255  sendMessageToChildren( it->second, NULL, NULL, heartbeat );
1256  delete heartbeat;
1257  }
1258 }
1259 
1261 {
1262  // our parent timed out. we have to take over the subspace...
1263  PubSubSubspaceId subspaceId(timer->getSubspaceId(), numSubspaces);
1264  std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1265  it = backupSubspaces.find( subspaceId );
1266  if( it == backupSubspaces.end() ) {
1267  delete timer;
1268  return;
1269  }
1270 
1271  // increase fail count; if to high, take over subspace
1272  it->second.incHeartbeatFailCount();
1273  if( it->second.getHeartbeatFailCount() > 1 ) { // FIXME: make it a parameter
1274 
1275  // Delete Timer
1276  cancelAndDelete( timer );
1277  it->second.setHeartbeatTimer( NULL );
1278 
1279  // Take over Subspace
1280  takeOverSubspace( it->second );
1281  backupSubspaces.erase( it );
1282 
1283  } else {
1284  startTimer( timer );
1285  }
1286 }
1287 
1289 {
1290  // FIXME: cast oldNode to NodeHandle
1291  // Inform Lobbyserver over failed node
1292  PubSubFailedNodeMessage* failedNode = new PubSubFailedNodeMessage("Node failed");
1293  failedNode->setFailedNode( oldNode );
1294  failedNode->setBitLength( PUBSUB_FAILEDNODE_L( failedNode ));
1295  RECORD_STATS(
1296  ++numPubSubSignalingMessages;
1297  pubSubSignalingMessagesSize+= failedNode->getByteLength()
1298  );
1299  sendMessageToUDP( lobbyServer, failedNode );
1300 
1301  // Request new Backup
1302  PubSubHelpCall* helpCall = new PubSubHelpCall("I need a backup node");
1303  helpCall->setHelpType( PUBSUB_BACKUP );
1304  helpCall->setSubspaceId( backupCall->getSubspaceId() );
1305  helpCall->setBitLength( PUBSUB_HELPCALL_L( helpCall ));
1306  RECORD_STATS(
1307  ++numPubSubSignalingMessages;
1308  pubSubSignalingMessagesSize+= helpCall->getByteLength()
1309  );
1310  sendUdpRpcCall( lobbyServer, helpCall );
1311 
1312  // find appropriate subspace and mark backup as failed
1313  PubSubSubspaceId subspaceId(backupCall->getSubspaceId(), numSubspaces);
1314  std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1315  it = responsibleSubspaces.find( subspaceId );
1316  if( it == responsibleSubspaces.end() ) {
1317  return;
1318  }
1319  it->second.setBackupNode( NodeHandle::UNSPECIFIED_NODE );
1320 }
1321 
1323 {
1324  // Inform Lobbyserver over failed node
1325  const NodeHandle& oldNodeHandle = dynamic_cast<const NodeHandle&>(oldNode);
1326  // FIXME: use oldNodeHandle instead of oldNode
1327  PubSubFailedNodeMessage* failedNode = new PubSubFailedNodeMessage("Node failed");
1328  failedNode->setBitLength( PUBSUB_FAILEDNODE_L( failedNode ));
1329  RECORD_STATS(
1330  ++numPubSubSignalingMessages;
1331  pubSubSignalingMessagesSize+= failedNode->getByteLength()
1332  );
1333  failedNode->setFailedNode( oldNode );
1334  sendMessageToUDP( lobbyServer, failedNode );
1335 
1336  // find appropriate subspace
1337  PubSubSubspaceId subspaceId(pingCall->getSubspaceId(), numSubspaces);
1338  std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1339  it = responsibleSubspaces.find( subspaceId );
1340  if( it == responsibleSubspaces.end() ) {
1341  return;
1342  }
1343  PubSubSubspaceResponsible& subspace = it->second;
1344 
1345  if( pingCall->getPingType() == PUBSUB_PING_CHILD ){
1346  // remove child
1347  unsubscribeChild( oldNodeHandle, subspace );
1348 
1349  } else if( pingCall->getPingType() == PUBSUB_PING_BACKUP ){
1350 
1351  // if we didn't already have (or asked for) a new backup
1352  if( !subspace.getBackupNode().isUnspecified() &&
1353  subspace.getBackupNode() == oldNodeHandle )
1354  {
1355  // our backup timed out. we have to request a new one...
1356  // delete old backup entry
1358 
1359  // Request new Backup
1360  PubSubHelpCall* helpCall = new PubSubHelpCall("I need a backup node");
1361  helpCall->setHelpType( PUBSUB_BACKUP );
1362  helpCall->setSubspaceId( pingCall->getSubspaceId() );
1363  helpCall->setBitLength( PUBSUB_HELPCALL_L( helpCall ));
1364  RECORD_STATS(
1365  ++numPubSubSignalingMessages;
1366  pubSubSignalingMessagesSize+= helpCall->getByteLength()
1367  );
1368  sendUdpRpcCall( lobbyServer, helpCall );
1369  }
1370 
1371  } else if( pingCall->getPingType() == PUBSUB_PING_INTERMEDIATE ){
1372  // one intermediate node timed out. we have to request a new one...
1373  // delete old intermediate entry
1374  deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
1375  for( iit = subspace.intermediateNodes.begin(); iit != subspace.intermediateNodes.end(); ++iit ){
1376  if( !iit->node.isUnspecified() && oldNode == iit->node ) break;
1377  }
1378  if( iit == subspace.intermediateNodes.end() ) return;
1379 
1380  NodeHandle oldNode = iit->node;
1381  iit->node = NodeHandle::UNSPECIFIED_NODE;
1382 
1383  // inform Backup
1384  if( !subspace.getBackupNode().isUnspecified() ){
1385  PubSubBackupIntermediateMessage* backupMsg = new PubSubBackupIntermediateMessage("Backup: Intermediate failed");
1386  backupMsg->setSubspaceId( pingCall->getSubspaceId() );
1387  backupMsg->setPos( iit - it->second.intermediateNodes.begin() );
1388  backupMsg->setBitLength( PUBSUB_BACKUPINTERMEDIATE_L( backupMsg ));
1389  RECORD_STATS(
1390  ++numPubSubSignalingMessages;
1391  pubSubSignalingMessagesSize+= backupMsg->getByteLength()
1392  );
1393  sendMessageToUDP( subspace.getBackupNode(), backupMsg );
1394  }
1395 
1396  bool fixNeeded = false;
1397  // Take over all children until new intermediate is found.
1398  set<NodeHandle>::iterator childIt;
1399  for( childIt = iit->children.begin(); childIt != iit->children.end(); ++childIt ){
1400  if( !subspace.cachedChildren.insert( make_pair(*childIt, false)).second ){
1401  fixNeeded = true;
1402  }
1403 
1404  // Inform Backup
1405  if( !subspace.getBackupNode().isUnspecified() ){
1406  PubSubBackupSubscriptionMessage* backupMsg = new PubSubBackupSubscriptionMessage("Backup: nodes moved to cache");
1407  backupMsg->setSubspaceId( pingCall->getSubspaceId() );
1408  backupMsg->setChild( *childIt );
1409  backupMsg->setOldParent( oldNodeHandle );
1410  backupMsg->setBitLength( PUBSUB_BACKUPSUBSCRIPTION_L( backupMsg ));
1411  RECORD_STATS(
1412  ++numPubSubSignalingMessages;
1413  pubSubSignalingMessagesSize+= backupMsg->getByteLength()
1414  );
1415  sendMessageToUDP( subspace.getBackupNode(), backupMsg );
1416  }
1417  }
1418  iit->children.clear();
1419  if( fixNeeded ) subspace.fixTotalChildrenCount();
1420 
1421  // inform parent of intermediate node
1422  int parentPos = (iit - subspace.intermediateNodes.begin())/maxChildren -1;
1423  if( parentPos >= 0 ){
1424  PubSubSubspaceResponsible::IntermediateNode& parent = subspace.intermediateNodes[parentPos];
1425  if( !parent.node.isUnspecified() ){
1426  PubSubNodeLeftMessage* goneMsg = new PubSubNodeLeftMessage("Intermediate left Subspace");
1427  goneMsg->setNode( oldNodeHandle );
1428  goneMsg->setSubspaceId( subspace.getId().getId() );
1429  goneMsg->setBitLength( PUBSUB_NODELEFT_L( goneMsg ));
1430  RECORD_STATS(
1431  ++numPubSubSignalingMessages;
1432  pubSubSignalingMessagesSize+= goneMsg->getByteLength()
1433  );
1434  sendMessageToUDP( parent.node, goneMsg );
1435  }
1436  }
1437 
1438  // Request new Intermediate
1439  PubSubHelpCall* helpCall = new PubSubHelpCall("I need an intermediate node");
1440  helpCall->setHelpType( PUBSUB_INTERMEDIATE );
1441  helpCall->setSubspaceId( pingCall->getSubspaceId() );
1442  helpCall->setBitLength( PUBSUB_HELPCALL_L( helpCall ));
1443  RECORD_STATS(
1444  ++numPubSubSignalingMessages;
1445  pubSubSignalingMessagesSize+= helpCall->getByteLength()
1446  );
1447  sendUdpRpcCall( lobbyServer, helpCall );
1448  }
1449 // FIXME: just for testing
1450 int iii = subspace.getTotalChildrenCount();
1451 subspace.fixTotalChildrenCount();
1452 if( iii != subspace.getTotalChildrenCount() ){
1453  opp_error("Huh?");
1454 }
1455 
1456 }
1457 
1459 {
1460  // FIXME: cast oldNode to NodeHandle
1461  // our subscription call timed out. This means the responsible node is dead...
1462  // Inform Lobbyserver over failed node
1463  PubSubFailedNodeMessage* failedNode = new PubSubFailedNodeMessage("Node failed");
1464  failedNode->setFailedNode( oldNode );
1465  failedNode->setBitLength( PUBSUB_FAILEDNODE_L( failedNode ));
1466  RECORD_STATS(
1467  ++numPubSubSignalingMessages;
1468  pubSubSignalingMessagesSize+= failedNode->getByteLength()
1469  );
1470  sendMessageToUDP( lobbyServer, failedNode );
1471 
1472  // Ask for new responsible node
1473  PubSubResponsibleNodeCall* respCall = new PubSubResponsibleNodeCall("Request Responsible NodeHandle");
1474  PubSubSubspaceId subspaceId( subscriptionCall->getSubspaceId(), numSubspaces);
1475  respCall->setSubspacePos( Vector2D(subspaceId.getX(), subspaceId.getY()) );
1476  respCall->setBitLength( PUBSUB_RESPONSIBLENODECALL_L( respCall ));
1477  RECORD_STATS(
1478  ++numPubSubSignalingMessages;
1479  pubSubSignalingMessagesSize+= respCall->getByteLength()
1480  );
1481  sendUdpRpcCall( lobbyServer, respCall, NULL, 5, 5 ); // FIXME: Make it a parameter...
1482 }
1483 
1485 {
1486  PubSubSubspaceId subspaceId(replaceMsg->getSubspaceId(), numSubspaces);
1487 
1488  // There's a new responsible node for a subspace
1489  // Replace the old one in the intermediateSubspaces map...
1490  std::map<PubSubSubspaceId, PubSubSubspaceIntermediate>::iterator it;
1491  it = intermediateSubspaces.find( subspaceId );
1492  if( it != intermediateSubspaces.end() ) {
1493  it->second.setResponsibleNode( replaceMsg->getNewResponsibleNode() );
1494  }
1495 
1496  // ... and in the subsribed subspaces list
1497  std::list<PubSubSubspace>::iterator iit;
1498  for( iit = subscribedSubspaces.begin(); iit != subscribedSubspaces.end(); ++iit ){
1499  if( iit->getId() == subspaceId ) {
1500  iit->setResponsibleNode( replaceMsg->getNewResponsibleNode() );
1501  return;
1502  }
1503  }
1504 }
1505 
1507 {
1508  PubSubSubspaceId subspaceId(releaseMsg->getSubspaceId(), numSubspaces);
1509  intermediateSubspaces.erase( subspaceId );
1510 }
1511 
1513 {
1514  // find appropriate subspace
1515  PubSubSubspaceId subspaceId(backupMsg->getSubspaceId(), numSubspaces);
1516  std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1517  it = backupSubspaces.find( subspaceId );
1518  if( it == backupSubspaces.end() ) {
1519  return;
1520  }
1521 
1522  if( backupMsg->getPos() >= (int) it->second.intermediateNodes.size() ){
1523  it->second.intermediateNodes.resize( backupMsg->getPos() + 1 );
1524  }
1525  it->second.intermediateNodes[ backupMsg->getPos() ].node = backupMsg->getNode();
1526 }
1527 
1529 {
1530  // Note: this funktion may break subspace's tatalChildrenCall
1531  // You have to use fixTotalChildrenCount before using the subspace
1532  // find appropriate subspace
1533  PubSubSubspaceId subspaceId(backupMsg->getSubspaceId(), numSubspaces);
1534  std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1535  it = backupSubspaces.find( subspaceId );
1536  if( it == backupSubspaces.end() ) {
1537  return;
1538  }
1539  PubSubSubspaceResponsible& subspace = it->second;
1540 
1541  deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
1542 
1543  if( !backupMsg->getOldParent().isUnspecified() ){
1544  // oldParent set -> move child
1545  if( backupMsg->getOldParent() == subspace.getResponsibleNode() ){
1546  // direct child -> cache
1547  subspace.removeChild( backupMsg->getChild() );
1548  subspace.cachedChildren.insert(make_pair( backupMsg->getChild(), false) );
1549 
1550  } else {
1551  // from I -> chache
1552  for( iit = subspace.intermediateNodes.begin(); iit != subspace.intermediateNodes.end(); ++iit ){
1553 // if( !iit->node.isUnspecified() && iit->node == backupMsg->getOldParent() ){
1554  iit->children.erase( backupMsg->getChild() );
1555 // }
1556  }
1557  subspace.cachedChildren.insert(make_pair( backupMsg->getChild(), false) );
1558  }
1559  } else if( backupMsg->getParent().isUnspecified() ){
1560  // parent not set -> new child to chache
1561  subspace.cachedChildren.insert(make_pair( backupMsg->getChild(), false) );
1562 
1563  } else if( backupMsg->getParent() == subspace.getResponsibleNode() ){
1564  // new direct child
1565  subspace.addChild( backupMsg->getChild() );
1566  } else {
1567  // move child from cache to intermediate
1568  subspace.cachedChildren.erase( backupMsg->getChild() );
1569 
1570  for( iit = subspace.intermediateNodes.begin(); iit != subspace.intermediateNodes.end(); ++iit ){
1571  if( !iit->node.isUnspecified() && iit->node == backupMsg->getParent() ){
1572  iit->children.insert( backupMsg->getChild() );
1573  }
1574  }
1575  // FIXME: check for errors
1576  }
1577 }
1578 
1580 {
1581  // Note: this funktion may break subspace's tatalChildrenCall
1582  // You have to use fixTotalChildrenCount before using the subspace
1583  // find appropriate subspace
1584  PubSubSubspaceId subspaceId(backupMsg->getSubspaceId(), numSubspaces);
1585  std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1586  it = backupSubspaces.find( subspaceId );
1587  if( it == backupSubspaces.end() ) {
1588  return;
1589  }
1590  PubSubSubspaceResponsible& subspace = it->second;
1591 
1592  deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
1593  set<NodeHandle>::iterator childIt;
1594 
1595  if( !subspace.removeChild(backupMsg->getChild()) && !subspace.cachedChildren.erase( backupMsg->getChild()) ){
1596  for( iit = subspace.intermediateNodes.begin(); iit != subspace.intermediateNodes.end(); ++iit ){
1597  iit->children.erase( backupMsg->getChild() );
1598  }
1599  }
1600  if( !backupMsg->getIntermediate().isUnspecified() ){
1601  // remove intermediate
1602  for( iit = subspace.intermediateNodes.begin(); iit != subspace.intermediateNodes.end(); ++iit ){
1603  if( !iit->node.isUnspecified() && iit->node == backupMsg->getIntermediate() ){
1604  for( childIt = iit->children.begin(); childIt != iit->children.end(); ++childIt ){
1605  // FIXME: note really stable. let the resp node inform us about child moves
1606  // remove children of last intermediate
1607  if( subspace.getNumChildren() + subspace.getNumIntermediates() < maxChildren ){
1608  // we have room for the child
1609  subspace.children.insert( *childIt );
1610  } else {
1611  // Node has to go to some intermediate
1612  // cache it
1613  subspace.cachedChildren.insert( make_pair(*childIt, true) );
1614  }
1615  }
1616  subspace.intermediateNodes.erase( iit );
1617  break;
1618  }
1619  }
1620  }
1621 }
1622 
1624 {
1625  PubSubBackupUnsubscribeMessage* backupMsg = new PubSubBackupUnsubscribeMessage("Backup: node left subspace");
1626  backupMsg->setChild( node );
1627  backupMsg->setSubspaceId( subspace.getId().getId() );
1629  if( iNode && !iNode->node.isUnspecified() ) {
1630  // Node is handled by an intermediate node, inform him
1631  PubSubNodeLeftMessage* goneMsg = new PubSubNodeLeftMessage("Node left Subspace");
1632  goneMsg->setNode( node );
1633  goneMsg->setSubspaceId( subspace.getId().getId() );
1634  goneMsg->setBitLength( PUBSUB_NODELEFT_L( goneMsg ));
1635  RECORD_STATS(
1636  ++numPubSubSignalingMessages;
1637  pubSubSignalingMessagesSize+= goneMsg->getByteLength()
1638  );
1639  sendMessageToUDP( iNode->node, goneMsg );
1640  }
1641  if ( subspace.getTotalChildrenCount() < ( maxChildren - 1) * subspace.getNumIntermediates()){// FIXME: parameter when to start cleanup?
1642  // Too many "free" slots, remove one intermediate node
1644  if( !liNode.node.isUnspecified() ){
1645  // Inform node + lobby about release from intermediate node status
1646  PubSubReleaseIntermediateMessage* releaseMsg = new PubSubReleaseIntermediateMessage("I don't need you anymore as intermediate");
1647  releaseMsg->setSubspaceId( subspace.getId().getId() );
1648  releaseMsg->setBitLength( PUBSUB_RELEASEINTERMEDIATE_L( releaseMsg ));
1649  RECORD_STATS(
1650  ++numPubSubSignalingMessages;
1651  pubSubSignalingMessagesSize+= releaseMsg->getByteLength()
1652  );
1653  sendMessageToUDP( liNode.node, releaseMsg );
1654 
1655  PubSubHelpReleaseMessage* helpRMsg = new PubSubHelpReleaseMessage("node is not my intermediate anymore");
1656  helpRMsg->setSubspaceId( subspace.getId().getId() );
1657  helpRMsg->setNode( liNode.node );
1658  helpRMsg->setBitLength( PUBSUB_HELPRELEASE_L( helpRMsg ));
1659  RECORD_STATS(
1660  ++numPubSubSignalingMessages;
1661  pubSubSignalingMessagesSize+= helpRMsg->getByteLength()
1662  );
1663  sendMessageToUDP( lobbyServer, helpRMsg );
1664 
1665  // inform parent of intermediate node
1666  int parentPos = (subspace.intermediateNodes.size()-1)/maxChildren -1;
1667  if( parentPos >= 0 ){
1668  PubSubSubspaceResponsible::IntermediateNode& parent = subspace.intermediateNodes[parentPos];
1669  if( !parent.node.isUnspecified() ){
1670  PubSubNodeLeftMessage* goneMsg = new PubSubNodeLeftMessage("Intermediate left Subspace");
1671  goneMsg->setNode( liNode.node );
1672  goneMsg->setSubspaceId( subspace.getId().getId() );
1673  goneMsg->setBitLength( PUBSUB_NODELEFT_L( goneMsg ));
1674  RECORD_STATS(
1675  ++numPubSubSignalingMessages;
1676  pubSubSignalingMessagesSize+= goneMsg->getByteLength()
1677  );
1678  sendMessageToUDP( parent.node, goneMsg );
1679  }
1680  }
1681  }
1682 
1683  bool fixNeeded = false;
1684  set<NodeHandle>::iterator childIt;
1685  for( childIt = liNode.children.begin(); childIt != liNode.children.end(); ++childIt ){
1686  // remove children of last intermediate
1687  if( subspace.getNumChildren() + subspace.getNumIntermediates() < maxChildren ){
1688  // we have room for the child
1689  if( !subspace.children.insert( *childIt ).second ) fixNeeded = true;
1690 
1691  //FIXME: send backup new->toMe
1692  } else {
1693  // Node has to go to some intermediate
1694  // find intermediate with free capacities
1696  newINode = subspace.getNextFreeIntermediate();
1697 
1698  if( newINode && newINode->node != liNode.node ){
1699  // cache it
1700  if( !subspace.cachedChildren.insert( make_pair(*childIt, true) ).second ) fixNeeded = true;
1701  //FIXME: send backup new->toCache
1702 
1703  ++(newINode->waitingChildren);
1704 
1705  // let him adopt the child
1706  PubSubAdoptChildCall* adoptCall = new PubSubAdoptChildCall("Adopt Node");
1707  adoptCall->setSubspaceId( subspace.getId().getId() );
1708  adoptCall->setChild( *childIt );
1709  adoptCall->setBitLength( PUBSUB_ADOPTCHILDCALL_L( adoptCall ));
1710  RECORD_STATS(
1711  ++numPubSubSignalingMessages;
1712  pubSubSignalingMessagesSize+= adoptCall->getByteLength()
1713  );
1714  sendUdpRpcCall( newINode->node, adoptCall );
1715  } else {
1716  // no intermediate found
1717  // just move child to cache and wait for a new one
1718  if( !subspace.cachedChildren.insert( make_pair(*childIt, false) ).second ) fixNeeded = true;
1719  }
1720  }
1721  }
1722  // delete node from subspace's intermediate node list
1723  subspace.intermediateNodes.pop_back();
1724  // inform backup about deleted intermediate
1725  backupMsg->setIntermediate( liNode.node );
1726 
1727  if( fixNeeded ) subspace.fixTotalChildrenCount();
1728  }
1729 
1730 // FIXME: just for testing
1731 int iii = subspace.getTotalChildrenCount();
1732 subspace.fixTotalChildrenCount();
1733 if( iii != subspace.getTotalChildrenCount() ){
1734  opp_error("Huh?");
1735 }
1736 
1737  if( !subspace.getBackupNode().isUnspecified() ){
1738  backupMsg->setBitLength( PUBSUB_BACKUPUNSUBSCRIBE_L( backupMsg ));
1739  RECORD_STATS(
1740  ++numPubSubSignalingMessages;
1741  pubSubSignalingMessagesSize+= backupMsg->getByteLength()
1742  );
1743  sendMessageToUDP( subspace.getBackupNode(), backupMsg );
1744  } else {
1745  delete backupMsg;
1746  }
1747 }
1748 
1750  BaseOverlayMessage* toIntermediates,
1751  BaseOverlayMessage* toBackup,
1752  BaseOverlayMessage* toPlayers )
1753 {
1754  BaseCallMessage* intermediateCall = dynamic_cast<BaseCallMessage*>(toIntermediates);
1755  BaseCallMessage* backupCall = dynamic_cast<BaseCallMessage*>(toBackup);
1756  BaseCallMessage* playerCall = dynamic_cast<BaseCallMessage*>(toPlayers);
1757 
1758  std::set<NodeHandle>::iterator childIt;
1759 
1760  if( toPlayers ) {
1761  // Inform all children ...
1762  for( childIt = subspace.children.begin(); childIt != subspace.children.end(); ++childIt ) {
1763  if( playerCall ){
1764  RECORD_STATS(
1765  ++numPubSubSignalingMessages;
1766  pubSubSignalingMessagesSize+= playerCall->getByteLength()
1767  );
1768  sendUdpRpcCall( *childIt, static_cast<BaseCallMessage*>(playerCall->dup()) );
1769  } else {
1770  RECORD_STATS(
1771  ++numPubSubSignalingMessages;
1772  pubSubSignalingMessagesSize+= toPlayers->getByteLength()
1773  );
1774  sendMessageToUDP( *childIt, static_cast<BaseOverlayMessage*>(toPlayers->dup()) );
1775  }
1776  }
1777  // ... and all cached children ...
1778  std::map<NodeHandle, bool>::iterator cacheChildIt;
1779  for( cacheChildIt = subspace.cachedChildren.begin(); cacheChildIt != subspace.cachedChildren.end(); ++cacheChildIt ) {
1780  if( playerCall ){
1781  RECORD_STATS(
1782  ++numPubSubSignalingMessages;
1783  pubSubSignalingMessagesSize+= playerCall->getByteLength()
1784  );
1785  sendUdpRpcCall( cacheChildIt->first, static_cast<BaseCallMessage*>(playerCall->dup()) );
1786  } else {
1787  RECORD_STATS(
1788  ++numPubSubSignalingMessages;
1789  pubSubSignalingMessagesSize+= toPlayers->getByteLength()
1790  );
1791  sendMessageToUDP( cacheChildIt->first, static_cast<BaseOverlayMessage*>(toPlayers->dup()) );
1792  }
1793  }
1794  }
1795  deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
1796  // ... all intermediate nodes ...
1797  for( iit = subspace.intermediateNodes.begin(); iit != subspace.intermediateNodes.end(); ++iit ){
1798  if( toIntermediates && !iit->node.isUnspecified() ){
1799  if( intermediateCall ){
1800  RECORD_STATS(
1801  ++numPubSubSignalingMessages;
1802  pubSubSignalingMessagesSize+= intermediateCall->getByteLength()
1803  );
1804  sendUdpRpcCall( iit->node, static_cast<BaseCallMessage*>(intermediateCall->dup()) );
1805  } else {
1806  RECORD_STATS(
1807  ++numPubSubSignalingMessages;
1808  pubSubSignalingMessagesSize+= toIntermediates->getByteLength()
1809  );
1810  sendMessageToUDP( iit->node, static_cast<BaseOverlayMessage*>(toIntermediates->dup()) );
1811  }
1812  }
1813  if( toPlayers ) {
1814  // .. and all intermediate node's children ...
1815  for( childIt = iit->children.begin(); childIt != iit->children.end(); ++childIt ){
1816  if( playerCall ){
1817  RECORD_STATS(
1818  ++numPubSubSignalingMessages;
1819  pubSubSignalingMessagesSize+= playerCall->getByteLength()
1820  );
1821  sendUdpRpcCall( *childIt, static_cast<BaseCallMessage*>(playerCall->dup()) );
1822  } else {
1823  RECORD_STATS(
1824  ++numPubSubSignalingMessages;
1825  pubSubSignalingMessagesSize+= toPlayers->getByteLength()
1826  );
1827  sendMessageToUDP( *childIt, static_cast<BaseOverlayMessage*>(toPlayers->dup()) );
1828  }
1829  }
1830  }
1831  }
1832  // ... and the backup node
1833  if( toBackup && !subspace.getBackupNode().isUnspecified() ) {
1834  if( backupCall ){
1835  RECORD_STATS(
1836  ++numPubSubSignalingMessages;
1837  pubSubSignalingMessagesSize+= backupCall->getByteLength()
1838  );
1839  sendUdpRpcCall( subspace.getBackupNode(), static_cast<BaseCallMessage*>(backupCall->dup()) );
1840  } else {
1841  RECORD_STATS(
1842  ++numPubSubSignalingMessages;
1843  pubSubSignalingMessagesSize+= toBackup->getByteLength()
1844  );
1845  sendMessageToUDP( subspace.getBackupNode(), static_cast<BaseOverlayMessage*>(toBackup->dup()) );
1846  }
1847  }
1848 }
1849 
1850 
1852 {
1853  // FOr all (responsible) subspaces
1854  int numRespSubspaces = responsibleSubspaces.size();
1855  std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1856  for( it = responsibleSubspaces.begin(); it != responsibleSubspaces.end(); ++it ){
1857  PubSubSubspaceResponsible& subspace = it->second;
1858 
1859  // Prepare a movement list message aggregating all stored move messages
1860  PubSubMoveListMessage* moveList = new PubSubMoveListMessage("Movement list");
1861  moveList->setTimestamp( simTime() );
1862  moveList->setSubspaceId( subspace.getId().getId() );
1863  moveList->setPlayerArraySize( subspace.waitingMoveMessages.size() );
1864  moveList->setPositionArraySize( subspace.waitingMoveMessages.size() );
1865  moveList->setPositionAgeArraySize( subspace.waitingMoveMessages.size() );
1866 
1867  std::deque<PubSubMoveMessage*>::iterator msgIt;
1868  int pos = 0;
1869  for( msgIt = subspace.waitingMoveMessages.begin(); msgIt != subspace.waitingMoveMessages.end(); ++msgIt ){
1870  moveList->setPlayer( pos, (*msgIt)->getPlayer() );
1871  moveList->setPosition( pos, (*msgIt)->getPosition() );
1872  moveList->setPositionAge( pos, simTime() - (*msgIt)->getCreationTime() );
1873  pos++;
1874  cancelAndDelete( *msgIt );
1875  }
1876  subspace.waitingMoveMessages.clear();
1877 
1878  moveList->setBitLength( PUBSUB_MOVELIST_L( moveList ));
1879  // Send message to all direct children...
1880  for( set<NodeHandle>::iterator childIt = subspace.children.begin();
1881  childIt != subspace.children.end(); ++childIt )
1882  {
1883  RECORD_STATS(
1884  ++numMoveListMessages;
1885  moveListMessagesSize+= moveList->getByteLength();
1886  respMoveListMessagesSize+= (int)((double) moveList->getByteLength() / numRespSubspaces)
1887  );
1888  sendMessageToUDP( *childIt, (BaseOverlayMessage*) moveList->dup() );
1889  }
1890 
1891  //... all cached children (if messages are not too big) ...
1892  if( moveList->getByteLength() < 1024 ){ // FIXME: magic number. make it a parameter, or dependant on the available bandwidth
1893  for( map<NodeHandle, bool>::iterator childIt = subspace.cachedChildren.begin();
1894  childIt != subspace.cachedChildren.end(); ++childIt )
1895  {
1896  RECORD_STATS(
1897  ++numMoveListMessages;
1898  moveListMessagesSize+= moveList->getByteLength();
1899  respMoveListMessagesSize+= (int)((double) moveList->getByteLength() / numRespSubspaces)
1900  );
1901  sendMessageToUDP( childIt->first, (BaseOverlayMessage*) moveList->dup() );
1902  // ... but don't send msgs to too many cached children, as this would exhaust our bandwidth
1903  }
1904  }
1905 
1906  // ... all direct intermediates and intermediates with broken parent
1907  deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
1908  for( iit = subspace.intermediateNodes.begin(); iit != subspace.intermediateNodes.end(); ++iit )
1909  {
1910  int intermediatePos = iit - subspace.intermediateNodes.begin();
1911  if( intermediatePos >= maxChildren &&
1912  !subspace.intermediateNodes[intermediatePos/maxChildren -1].node.isUnspecified() ) continue;
1913  if( !iit->node.isUnspecified() ) {
1914  RECORD_STATS(
1915  ++numMoveListMessages;
1916  moveListMessagesSize+= moveList->getByteLength();
1917  respMoveListMessagesSize+= (int)((double) moveList->getByteLength() / numRespSubspaces)
1918  );
1919  sendMessageToUDP( iit->node, (BaseOverlayMessage*) moveList->dup() );
1920  }
1921  }
1922 
1923  delete moveList;
1924  }
1925 }
1926 
1928 {
1929  if(ev.isGUI()) {
1930  if(state == READY) {
1931  getParentModule()->getParentModule()->getDisplayString().setTagArg("i2", 1, "green");
1932  getDisplayString().setTagArg("i", 1, "green");
1933  }
1934  else if(state == JOIN) {
1935  getParentModule()->getParentModule()->getDisplayString().setTagArg("i2", 1, "yellow");
1936  getDisplayString().setTagArg("i", 1, "yellow");
1937  }
1938  else {
1939  getParentModule()->getParentModule()->getDisplayString().setTagArg("i2", 1, "red");
1940  getDisplayString().setTagArg("i", 1, "red");
1941  }
1942  }
1943 }
1944 
1946 {
1947  if( !timer ) {
1948  EV << "[PubSubMMOG::startTimer() @ " << thisNode.getIp()
1949  << " (" << thisNode.getKey().toString(16) << ")]\n"
1950  << " WARNING! Trying to start NULL timer @ " << thisNode << "\n"
1951  << endl;
1952  return;
1953  }
1954 
1955  if( timer->isScheduled() ) {
1956  cancelEvent( timer );
1957  }
1958 
1959  simtime_t duration = 0;
1960  switch( timer->getType() ) {
1961  case PUBSUB_HEARTBEAT:
1962  duration = parentTimeout/2;
1963  break;
1964  case PUBSUB_CHILDPING:
1965  duration = parentTimeout*10; // FIXME: make it a parameter
1966  break;
1967  case PUBSUB_PARENT_TIMEOUT:
1968  duration = parentTimeout;
1969  break;
1970  case PUBSUB_EVENTDELIVERY:
1971  duration = 1.0/movementRate;
1972  break;
1973  }
1974  scheduleAt(simTime() + duration, timer );
1975 }
1976 
1978 {
1979  simtime_t time = globalStatistics->calcMeasuredLifetime(creationTime);
1980  if (time < GlobalStatistics::MIN_MEASURED) return;
1981 
1982  globalStatistics->addStdDev("PubSubMMOG: Sent Signaling Messages/s",
1983  numPubSubSignalingMessages / time);
1984  globalStatistics->addStdDev("PubSubMMOG: Sent Signaling bytes/s",
1985  pubSubSignalingMessagesSize / time);
1986  globalStatistics->addStdDev("PubSubMMOG: Sent Move Messages/s",
1987  numMoveMessages / time);
1988  globalStatistics->addStdDev("PubSubMMOG: Sent Move bytes/s",
1989  moveMessagesSize / time);
1990  globalStatistics->addStdDev("PubSubMMOG: Sent MoveList Messages/s",
1991  numMoveListMessages / time);
1992  globalStatistics->addStdDev("PubSubMMOG: Sent MoveList bytes/s",
1993  moveListMessagesSize / time);
1994  globalStatistics->addStdDev("PubSubMMOG: Received Move Events (correct timeslot)/s",
1995  numEventsCorrectTimeslot / time);
1996  globalStatistics->addStdDev("PubSubMMOG: Received Move Events (wrong timeslot)/s",
1997  numEventsWrongTimeslot / time);
1998  globalStatistics->addStdDev("PubSubMMOG: Responsible Nodes: Send MoveList Bytes/s",
1999  respMoveListMessagesSize / time);
2000  globalStatistics->addStdDev("PubSubMMOG: Lost or too long delayed MoveLists/s",
2001  lostMovementLists / time);
2002  globalStatistics->addStdDev("PubSubMMOG: Received valid MoveLists/s",
2003  receivedMovementLists / time);
2004 }
2005 
2007 {
2008  // Delete all waiting move messages
2009  std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
2010  for( it = responsibleSubspaces.begin(); it != responsibleSubspaces.end(); ++it) {
2011  deque<PubSubMoveMessage*>::iterator msgIt;
2012  for( msgIt = it->second.waitingMoveMessages.begin(); msgIt != it->second.waitingMoveMessages.end(); ++msgIt ){
2013  cancelAndDelete( *msgIt );
2014  }
2015  it->second.waitingMoveMessages.clear();
2016  }
2017 
2018  cancelAndDelete(heartbeatTimer);
2019 }