24 #include <NotifierConsts.h>
46 lobbyServer = globalNodeList->getBootstrapNode();
48 joinTimer =
new cMessage(
"join timer");
49 simtime_t joinTime = ceil(simTime() + (simtime_t) par(
"joinDelay"));
50 scheduleAt( joinTime, joinTimer );
52 movementRate = par(
"movementRate");
53 eventDeliveryTimer =
new PubSubTimer(
"event delivery timer");
55 scheduleAt( joinTime + 1.0/(2*movementRate), eventDeliveryTimer );
57 numSubspaces = par(
"numSubspaces");
58 subspaceSize = (int) ( (
unsigned int) par(
"areaDimension") / numSubspaces);
61 maxChildren = par(
"maxChildren");
64 AOIWidth = par(
"AOIWidth");
65 maxMoveDelay = par(
"maxMoveDelay");
67 parentTimeout = par(
"parentTimeout");
70 startTimer( heartbeatTimer );
73 startTimer( childPingTimer );
75 allowOldMoveMessages = par(
"allowOldMoveMessages");
77 numEventsWrongTimeslot = numEventsCorrectTimeslot = 0;
78 numPubSubSignalingMessages = 0;
79 pubSubSignalingMessagesSize = 0;
82 numMoveListMessages = 0;
83 moveListMessagesSize = 0;
84 respMoveListMessagesSize = 0;
85 lostMovementLists = 0;
86 receivedMovementLists = 0;
87 WATCH( numPubSubSignalingMessages );
88 WATCH( pubSubSignalingMessagesSize );
89 WATCH( numMoveMessages );
90 WATCH( moveMessagesSize );
91 WATCH( numMoveListMessages );
92 WATCH( moveListMessagesSize );
93 WATCH( numEventsWrongTimeslot );
94 WATCH( numEventsCorrectTimeslot );
95 WATCH( lostMovementLists );
96 WATCH( receivedMovementLists );
97 WATCH_LIST( subscribedSubspaces );
98 WATCH_MAP( responsibleSubspaces );
99 WATCH_MAP( backupSubspaces );
100 WATCH_MAP( intermediateSubspaces );
107 RPC_DELEGATE( PubSubSubscription, handleSubscriptionCall );
110 RPC_DELEGATE( PubSubIntermediate, handleIntermediateCall );
120 cPolymorphic* context,
int rpcId,
125 handleJoinResponse( _PubSubJoinResponse );
126 EV <<
"[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
127 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
128 <<
" Received a PubSubJoin RPC Response: id=" << rpcId <<
"\n"
129 <<
" msg=" << *_PubSubJoinResponse <<
" rtt=" << rtt
134 handleSubscriptionResponse( _PubSubSubscriptionResponse );
135 EV <<
"[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
136 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
137 <<
" Received a PubSubSubscription RPC Response: id=" << rpcId <<
"\n"
138 <<
" msg=" << *_PubSubSubscriptionResponse <<
" rtt=" << rtt
143 handleResponsibleNodeResponse( _PubSubResponsibleNodeResponse );
144 EV <<
"[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
145 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
146 <<
" Received a PubSubResponsibleNode RPC Response: id=" << rpcId <<
"\n"
147 <<
" msg=" << *_PubSubResponsibleNodeResponse <<
" rtt=" << rtt
152 handleHelpResponse( _PubSubHelpResponse );
153 EV <<
"[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
154 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
155 <<
" Received a PubSubHelp RPC Response: id=" << rpcId <<
"\n"
156 <<
" msg=" << *_PubSubHelpResponse <<
" rtt=" << rtt
161 handleBackupResponse( _PubSubBackupResponse );
162 EV <<
"[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
163 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
164 <<
" Received a PubSubBackup RPC Response: id=" << rpcId <<
"\n"
165 <<
" msg=" << *_PubSubBackupResponse <<
" rtt=" << rtt
170 handleIntermediateResponse( _PubSubIntermediateResponse );
171 EV <<
"[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
172 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
173 <<
" Received a PubSubIntermediate RPC Response: id=" << rpcId <<
"\n"
174 <<
" msg=" << *_PubSubIntermediateResponse <<
" rtt=" << rtt
179 handleAdoptChildResponse( _PubSubAdoptChildResponse );
180 EV <<
"[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
181 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
182 <<
" Received a PubSubAdoptChild RPC Response: id=" << rpcId <<
"\n"
183 <<
" msg=" << *_PubSubAdoptChildResponse <<
" rtt=" << rtt
188 handlePingResponse( _PubSubPingResponse );
189 EV <<
"[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
190 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
191 <<
" Received a PubSubPing RPC Response: id=" << rpcId <<
"\n"
192 <<
" msg=" << *_PubSubPingResponse <<
" rtt=" << rtt
201 cPolymorphic* context,
int rpcId,
206 handleBackupCallTimeout( _PubSubBackupCall, dest );
207 EV <<
"[PubSubMMOG::handleRpcTimeout() @ " << thisNode.getIp()
208 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
209 <<
" Backup RPC Call timed out: id=" << rpcId <<
"\n"
210 <<
" msg=" << *_PubSubBackupCall
211 <<
" oldNode=" << dest
216 handlePingCallTimeout( _PubSubPingCall, dest );
217 EV <<
"[PubSubMMOG::handleRpcTimeout() @ " << thisNode.getIp()
218 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
219 <<
" Ping RPC Call timed out: id=" << rpcId <<
"\n"
220 <<
" msg=" << *_PubSubPingCall
221 <<
" oldNode=" << dest
226 handleSubscriptionCallTimeout( _PubSubSubscriptionCall, dest );
227 EV <<
"[PubSubMMOG::handleRpcTimeout() @ " << thisNode.getIp()
228 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
229 <<
" Subscription RPC Call timed out: id=" << rpcId <<
"\n"
230 <<
" msg=" << *_PubSubSubscriptionCall
231 <<
" oldNode=" << dest
247 handleMoveListMessage( moveMsg );
250 handleMoveMessage( moveMsg );
252 handleUnsubscriptionMessage( unsMsg );
255 handleNodeLeftMessage( leftMsg );
258 handleReplacementMessage( replaceMsg );
261 handleSubscriptionBackup( backupMsg );
264 handleUnsubscribeBackup( backupMsg );
267 handleIntermediateBackup( backupMsg );
270 handleReleaseIntermediate( releaseMsg );
277 if(
PubSubTimer* timer = dynamic_cast<PubSubTimer*>(msg) ) {
278 switch( timer->getType() ) {
280 sendHearbeatToChildren();
284 sendPingToChildren();
288 handleParentTimeout( timer );
295 }
else if( msg == joinTimer ) {
301 msg->
setComp(getThisCompType());
302 send( msg,
"appOut");
311 send( gameMsg,
"appOut");
318 if( state == READY ) {
319 handleMove( posMsg );
320 }
else if ( state == JOIN ) {
324 msg->
setComp(getThisCompType());
325 send( msg,
"appOut");
326 }
else if ( state == INIT ) {
333 sendUdpRpcCall( lobbyServer, joinMsg );
336 setBootstrapedIcon();
341 readyMsg->
setComp(getThisCompType());
342 send( readyMsg,
"appOut");
344 currentRegionX = (
unsigned int) (posMsg->getPosition().x/subspaceSize);
345 currentRegionY = (
unsigned int) (posMsg->getPosition().y/subspaceSize);
356 if( state != READY ){
358 setBootstrapedIcon();
361 readyMsg->
setComp(getThisCompType());
362 sendDelayed( readyMsg, ceil(simTime()) - simTime(),
"appOut" );
370 setBootstrapedIcon();
376 subscribedSubspaces.push_back( sub );
382 ++numPubSubSignalingMessages;
383 pubSubSignalingMessagesSize+= respCall->getByteLength()
385 sendUdpRpcCall( lobbyServer, respCall, NULL, 5, 5 );
391 ++numPubSubSignalingMessages;
392 pubSubSignalingMessagesSize+= subCall->getByteLength()
394 sendUdpRpcCall( respNode, subCall );
403 std::list<PubSubSubspace>::iterator it = subscribedSubspaces.begin();
404 while( it != subscribedSubspaces.end() ) {
405 if( it->getId().getId() == subspaceId)
break;
408 if( it != subscribedSubspaces.end() ) {
409 it->setResponsibleNode( respNode );
415 ++numPubSubSignalingMessages;
416 pubSubSignalingMessagesSize+= subCall->getByteLength()
418 sendUdpRpcCall( respNode, subCall );
424 std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
427 if( it != responsibleSubspaces.end() ) {
428 unsubscribeChild( unsMsg->
getSrc(), it->second );
434 std::map<PubSubSubspaceId, PubSubSubspaceIntermediate>::iterator it;
437 if( it == intermediateSubspaces.end() )
return;
439 it->second.removeChild( leftMsg->
getNode() );
444 std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
449 if( it == responsibleSubspaces.end() ) {
458 if( it->second.addChild( subCall->
getSrcNode() )) {
469 sendUdpRpcCall( iNode->node, adoptCall );
471 ++numPubSubSignalingMessages;
472 pubSubSignalingMessagesSize+= adoptCall->getByteLength()
474 iNode->waitingChildren++;
482 sendUdpRpcCall( lobbyServer, helpCall );
484 ++numPubSubSignalingMessages;
485 pubSubSignalingMessagesSize+= helpCall->getByteLength()
491 sendRpcResponse( subCall, resp );
493 ++numPubSubSignalingMessages;
494 pubSubSignalingMessagesSize+= resp->getByteLength()
497 if( it == responsibleSubspaces.end() )
return;
507 if( !it->second.getBackupNode().isUnspecified() ){
510 ++numPubSubSignalingMessages;
511 pubSubSignalingMessagesSize+= backupMsg->getByteLength()
513 sendMessageToUDP( it->second.getBackupNode(), backupMsg );
523 takeOverNewSubspace( region );
529 ++numPubSubSignalingMessages;
530 pubSubSignalingMessagesSize+= toResp->getByteLength()
532 sendRpcResponse( toCall, toResp );
537 if(category == NF_OVERLAY_NODE_GRACEFUL_LEAVE && state == READY) {
543 currentRegionX = (
unsigned int) (posMsg->
getPosition().
x/subspaceSize);
544 currentRegionY = (
unsigned int) (posMsg->
getPosition().
y/subspaceSize);
548 set<PubSubSubspaceId> expectedRegions;
549 int minX = (int) ((posMsg->
getPosition().
x - AOIWidth)/subspaceSize);
550 if( minX < 0 ) minX = 0;
551 int maxX = (int) ((posMsg->
getPosition().
x + AOIWidth)/subspaceSize);
552 if( maxX >= numSubspaces ) maxX = numSubspaces -1;
553 int minY = (int) ((posMsg->
getPosition().
y - AOIWidth)/subspaceSize);
554 if( minY < 0 ) minY = 0;
555 int maxY = (int) ((posMsg->
getPosition().
y + AOIWidth)/subspaceSize);
556 if( maxY >= numSubspaces ) maxY = numSubspaces -1;
559 int minUnsubX = (int) ((posMsg->
getPosition().
x - 1.5*AOIWidth)/subspaceSize);
560 if( minUnsubX < 0 ) minUnsubX = 0;
561 int maxUnsubX = (int) ((posMsg->
getPosition().
x + 1.5*AOIWidth)/subspaceSize);
562 if( maxUnsubX >= numSubspaces ) maxUnsubX = numSubspaces -1;
563 int minUnsubY = (int) ((posMsg->
getPosition().
y - 1.5*AOIWidth)/subspaceSize);
564 if( minUnsubY < 0 ) minUnsubY = 0;
565 int maxUnsubY = (int) ((posMsg->
getPosition().
y + 1.5+AOIWidth)/subspaceSize);
566 if( maxUnsubY >= numSubspaces ) maxUnsubY = numSubspaces -1;
568 for(
int x = minX; x <= maxX; ++x ){
569 for(
int y = minY; y <= maxY; ++y ){
574 list<PubSubSubspace>::iterator subIt = subscribedSubspaces.begin();
576 while( subIt != subscribedSubspaces.end() ){
577 if( subIt->getId() == region ){
580 expectedRegions.erase( subIt->getId() );
583 if( subIt->getId().getX() < minX || subIt->getId().getX() > maxX ||
584 subIt->getId().getY() < minY || subIt->getId().getY() > maxY ){
585 if( !subIt->getResponsibleNode().isUnspecified() ){
588 unsubMsg->
setSrc( thisNode );
591 ++numPubSubSignalingMessages;
592 pubSubSignalingMessagesSize+= unsubMsg->getByteLength()
594 sendMessageToUDP( subIt->getResponsibleNode(), unsubMsg );
597 subscribedSubspaces.erase( subIt++ );
604 for( set<PubSubSubspaceId>::iterator regionIt = expectedRegions.begin(); regionIt != expectedRegions.end(); ++regionIt ){
606 subscribedSubspaces.push_back( sub );
611 ++numPubSubSignalingMessages;
612 pubSubSignalingMessagesSize+= respCall->getByteLength()
614 sendUdpRpcCall( lobbyServer, respCall, NULL, 5, 5 );
626 moveMessagesSize+= moveMsg->getByteLength()
637 std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
639 if( it == responsibleSubspaces.end() ){
640 EV <<
"[PubSubMMOG::handleMoveMessage() @ " << thisNode.getIp()
641 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
642 <<
" received moveMessage for unknown subspace" << moveMsg->
getSubspaceId() <<
"\n"
652 if( allowOldMoveMessages || moveMsg->
getTimestamp() >= eventDeliveryTimer->getArrivalTime() - 1.0/(2*movementRate) ){
653 it->second.waitingMoveMessages.push_back( moveMsg );
654 ++numEventsCorrectTimeslot;
656 EV <<
"[PubSubMMOG::handleMoveMessage() @ " << thisNode.getIp()
657 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
658 <<
" received moveMesage with Timestamp: " << moveMsg->
getTimestamp() <<
"\n"
659 <<
" deadline was: " << eventDeliveryTimer->getArrivalTime() - 1.0/(2*movementRate) <<
"\n"
661 ++numEventsWrongTimeslot;
662 cancelAndDelete( moveMsg );
671 std::map<PubSubSubspaceId, PubSubSubspaceIntermediate>::iterator it;
673 if( it != intermediateSubspaces.end() ){
675 if( it->second.getLastTimestamp() < moveMsg->
getTimestamp() ){
676 set<NodeHandle>::iterator childIt;
677 for( childIt = it->second.children.begin(); childIt != it->second.children.end(); ++childIt ){
680 ++numMoveListMessages;
681 moveListMessagesSize+= moveMsg->getByteLength()
684 it->second.setTimestamp( timestamp );
689 std::list<PubSubSubspace>::iterator subIt;
690 for( subIt = subscribedSubspaces.begin(); subIt != subscribedSubspaces.end(); ++subIt ){
692 if( subIt->getLastTimestamp() < moveMsg->
getTimestamp() ){
701 globalStatistics->addStdDev(
"PubSubMMOG: MoveDelay",
702 SIMTIME_DBL(simTime() - timestamp + moveMsg->
getPositionAge(i)) );
705 send( moveList,
"appOut" );
707 if( timestamp < simTime() - maxMoveDelay ){
710 ++receivedMovementLists;
712 if( subIt->getLastTimestamp() != 0) lostMovementLists += (
int)(SIMTIME_DBL(timestamp - subIt->getLastTimestamp())*movementRate -1);
715 subIt->setTimestamp( timestamp );
731 std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
733 if( it == responsibleSubspaces.end() ){
734 EV <<
"[PubSubMMOG::handleHelpResponse() @ " << thisNode.getIp()
735 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
736 <<
" received helpResponse for unknown subspace" << helpResp->
getSubspaceId() <<
"\n"
757 set<NodeHandle>::iterator childIt;
758 map<NodeHandle, bool>::iterator childMapIt;
765 for( childIt = subspace.
children.begin(); childIt != subspace.
children.end(); ++childIt ){
773 for( childIt = iNode.
children.begin(); childIt != iNode.
children.end(); ++childIt ){
782 ++numPubSubSignalingMessages;
783 pubSubSignalingMessagesSize+= backupCall->getByteLength()
785 sendUdpRpcCall( helpResp->
getNode(), backupCall );
792 ++numPubSubSignalingMessages;
793 pubSubSignalingMessagesSize+= intermediateCall->getByteLength()
795 sendUdpRpcCall( helpResp->
getNode(), intermediateCall );
808 startTimer( parentTimeout );
825 }
else if( pos == -1 ){
832 backupSubspaces.insert( make_pair(subspaceId, subspace) );
838 ++numPubSubSignalingMessages;
839 pubSubSignalingMessagesSize+= backupResp->getByteLength()
841 sendRpcResponse( backupCall, backupResp );
855 subspace.setResponsibleNode( intermediateCall->
getSrcNode() );
856 subspace.setTimestamp(0);
857 intermediateSubspaces.insert( make_pair(subspaceId, subspace) );
863 ++numPubSubSignalingMessages;
864 pubSubSignalingMessagesSize+= iResp->getByteLength()
866 sendRpcResponse( intermediateCall, iResp );
872 std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
874 if( it == responsibleSubspaces.end() ) {
875 EV <<
"[PubSubMMOG::handleIntermediateResponse() @ " << thisNode.getIp()
876 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
877 <<
" Received Intermediate Response for unknown Subspace!\n"
886 bool newIntermediate =
true;
887 deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
889 if( iit->node.isUnspecified() ){
890 iit->node = iNode.
node;
891 newIntermediate =
false;
908 ++numPubSubSignalingMessages;
909 pubSubSignalingMessagesSize+= backupMsg->getByteLength()
916 int parentPos = intermediatePos/maxChildren -1;
917 if( parentPos >= 0 && !subspace.
intermediateNodes[parentPos].node.isUnspecified() ){
923 ++numPubSubSignalingMessages;
924 pubSubSignalingMessagesSize+= adoptCall->getByteLength()
929 if( newIntermediate ){
931 if( parentPos >= 0 ) {
935 bool fixNeeded =
false;
946 ++numPubSubSignalingMessages;
947 pubSubSignalingMessagesSize+= backupMsg->getByteLength()
956 ++numPubSubSignalingMessages;
957 pubSubSignalingMessagesSize+= goneMsg->getByteLength()
959 sendMessageToUDP( parent.
node, goneMsg );
969 bool fixNeeded =
false;
980 ++numPubSubSignalingMessages;
981 pubSubSignalingMessagesSize+= backupMsg->getByteLength()
993 for(
int pos = (intermediatePos+1) * maxChildren; pos < (int) subspace.
intermediateNodes.size() &&
994 pos < (intermediatePos+2) * maxChildren; ++pos ){
1001 ++numPubSubSignalingMessages;
1002 pubSubSignalingMessagesSize+= adoptCall->getByteLength()
1004 sendUdpRpcCall( iit->node, adoptCall );
1009 std::map<NodeHandle,bool>::iterator childIt;
1011 if( childIt->second )
continue;
1014 adoptCall->
setChild( childIt->first );
1017 ++numPubSubSignalingMessages;
1018 pubSubSignalingMessagesSize+= adoptCall->getByteLength()
1020 sendUdpRpcCall( intermediateResp->
getSrcNode(), adoptCall );
1021 childIt->second =
true;
1022 if( (
unsigned int) maxChildren == ++(iit->waitingChildren) )
break;
1028 std::map<PubSubSubspaceId, PubSubSubspaceIntermediate>::iterator it;
1030 if( it == intermediateSubspaces.end() ) {
1031 EV <<
"[PubSubMMOG::handleAdoptChildCall() @ " << thisNode.getIp()
1032 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
1033 <<
" Received Adopt Child Call for unknown Subspace!\n"
1035 cancelAndDelete( adoptCall );
1039 it->second.addChild( adoptCall->
getChild() );
1045 ++numPubSubSignalingMessages;
1046 pubSubSignalingMessagesSize+= adoptResp->getByteLength()
1048 sendRpcResponse( adoptCall, adoptResp );
1053 std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1055 if( it == responsibleSubspaces.end() ) {
1056 EV <<
"[PubSubMMOG::handleAdoptChildResponse() @ " << thisNode.getIp()
1057 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
1058 <<
" Received AdoptChild Response for unknown Subspace!\n"
1072 deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
1073 for( iit = it->second.intermediateNodes.begin(); iit != it->second.intermediateNodes.end(); ++iit ){
1074 if( !iit->node.isUnspecified() && iit->node == adoptResp->
getSrcNode() ){
1077 int intermediatePos = iit - it->second.intermediateNodes.begin();
1078 for(
int pos = (intermediatePos+1) * maxChildren; pos < (int) it->second.intermediateNodes.size() &&
1079 pos < (intermediatePos+2) * maxChildren; ++pos )
1081 if( !it->second.intermediateNodes[pos].node.isUnspecified() &&
1082 adoptResp->
getChild() == it->second.intermediateNodes[pos].node ){
1088 if( !it->second.cachedChildren.erase( adoptResp->
getChild() ) ){
1095 ++numPubSubSignalingMessages;
1096 pubSubSignalingMessagesSize+= goneMsg->getByteLength()
1098 sendMessageToUDP( adoptResp->
getSrcNode(), goneMsg );
1103 if( !iit->children.insert( adoptResp->
getChild() ).second ){
1107 iit->waitingChildren--;
1118 if( !it->second.getBackupNode().isUnspecified() ){
1125 ++numPubSubSignalingMessages;
1126 pubSubSignalingMessagesSize+= backupMsg->getByteLength()
1128 sendMessageToUDP( it->second.getBackupNode(), backupMsg );
1134 EV <<
"[PubSubMMOG::handleAdoptChildResponse() @ " << thisNode.getIp()
1135 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
1136 <<
" Received AdoptChild Response for unknown child!\n"
1146 std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1148 if( it == backupSubspaces.end() ) {
1149 EV <<
"[PubSubMMOG::handlePingCall() @ " << thisNode.getIp()
1150 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
1151 <<
" Received PingCall for unknown Subspace!\n"
1155 it->second.resetHeartbeatFailCount();
1156 startTimer( it->second.getHeartbeatTimer() );
1164 ++numPubSubSignalingMessages;
1165 pubSubSignalingMessagesSize+= pingResp->getByteLength()
1167 sendRpcResponse( pingCall, pingResp );
1178 takeOverSubspace( subspace,
true );
1184 int intId = subspaceId.
getId();
1192 responsibleSubspaces.insert( make_pair(subspaceId, subspace) );
1200 ++numPubSubSignalingMessages;
1201 pubSubSignalingMessagesSize+= helpCall->getByteLength()
1203 sendUdpRpcCall( lobbyServer, helpCall );
1212 sendMessageToChildren( subspace, repMsg, NULL, repMsg );
1213 sendMessageToUDP( lobbyServer, repMsg );
1220 ++numPubSubSignalingMessages;
1221 pubSubSignalingMessagesSize+= failedNode->getByteLength()
1223 sendMessageToUDP( lobbyServer, failedNode );
1229 std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1230 for( it = responsibleSubspaces.begin(); it != responsibleSubspaces.end(); ++it) {
1241 sendMessageToChildren( it->second, iHeartbeat, bHeartbeat, NULL);
1249 std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1250 for( it = responsibleSubspaces.begin(); it != responsibleSubspaces.end(); ++it) {
1255 sendMessageToChildren( it->second, NULL, NULL, heartbeat );
1264 std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1265 it = backupSubspaces.find( subspaceId );
1266 if( it == backupSubspaces.end() ) {
1272 it->second.incHeartbeatFailCount();
1273 if( it->second.getHeartbeatFailCount() > 1 ) {
1276 cancelAndDelete( timer );
1277 it->second.setHeartbeatTimer( NULL );
1280 takeOverSubspace( it->second );
1281 backupSubspaces.erase( it );
1284 startTimer( timer );
1296 ++numPubSubSignalingMessages;
1297 pubSubSignalingMessagesSize+= failedNode->getByteLength()
1299 sendMessageToUDP( lobbyServer, failedNode );
1307 ++numPubSubSignalingMessages;
1308 pubSubSignalingMessagesSize+= helpCall->getByteLength()
1310 sendUdpRpcCall( lobbyServer, helpCall );
1314 std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1315 it = responsibleSubspaces.find( subspaceId );
1316 if( it == responsibleSubspaces.end() ) {
1330 ++numPubSubSignalingMessages;
1331 pubSubSignalingMessagesSize+= failedNode->getByteLength()
1334 sendMessageToUDP( lobbyServer, failedNode );
1338 std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1339 it = responsibleSubspaces.find( subspaceId );
1340 if( it == responsibleSubspaces.end() ) {
1347 unsubscribeChild( oldNodeHandle, subspace );
1365 ++numPubSubSignalingMessages;
1366 pubSubSignalingMessagesSize+= helpCall->getByteLength()
1368 sendUdpRpcCall( lobbyServer, helpCall );
1374 deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
1376 if( !iit->node.isUnspecified() && oldNode == iit->node )
break;
1387 backupMsg->
setPos( iit - it->second.intermediateNodes.begin() );
1390 ++numPubSubSignalingMessages;
1391 pubSubSignalingMessagesSize+= backupMsg->getByteLength()
1396 bool fixNeeded =
false;
1398 set<NodeHandle>::iterator childIt;
1399 for( childIt = iit->children.begin(); childIt != iit->children.end(); ++childIt ){
1400 if( !subspace.
cachedChildren.insert( make_pair(*childIt,
false)).second ){
1412 ++numPubSubSignalingMessages;
1413 pubSubSignalingMessagesSize+= backupMsg->getByteLength()
1418 iit->children.clear();
1423 if( parentPos >= 0 ){
1427 goneMsg->
setNode( oldNodeHandle );
1431 ++numPubSubSignalingMessages;
1432 pubSubSignalingMessagesSize+= goneMsg->getByteLength()
1434 sendMessageToUDP( parent.
node, goneMsg );
1444 ++numPubSubSignalingMessages;
1445 pubSubSignalingMessagesSize+= helpCall->getByteLength()
1447 sendUdpRpcCall( lobbyServer, helpCall );
1467 ++numPubSubSignalingMessages;
1468 pubSubSignalingMessagesSize+= failedNode->getByteLength()
1470 sendMessageToUDP( lobbyServer, failedNode );
1478 ++numPubSubSignalingMessages;
1479 pubSubSignalingMessagesSize+= respCall->getByteLength()
1481 sendUdpRpcCall( lobbyServer, respCall, NULL, 5, 5 );
1490 std::map<PubSubSubspaceId, PubSubSubspaceIntermediate>::iterator it;
1491 it = intermediateSubspaces.find( subspaceId );
1492 if( it != intermediateSubspaces.end() ) {
1497 std::list<PubSubSubspace>::iterator iit;
1498 for( iit = subscribedSubspaces.begin(); iit != subscribedSubspaces.end(); ++iit ){
1499 if( iit->getId() == subspaceId ) {
1509 intermediateSubspaces.erase( subspaceId );
1516 std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1517 it = backupSubspaces.find( subspaceId );
1518 if( it == backupSubspaces.end() ) {
1522 if( backupMsg->
getPos() >= (int) it->second.intermediateNodes.size() ){
1523 it->second.intermediateNodes.resize( backupMsg->
getPos() + 1 );
1525 it->second.intermediateNodes[ backupMsg->
getPos() ].node = backupMsg->
getNode();
1534 std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1535 it = backupSubspaces.find( subspaceId );
1536 if( it == backupSubspaces.end() ) {
1541 deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
1554 iit->children.erase( backupMsg->
getChild() );
1571 if( !iit->node.isUnspecified() && iit->node == backupMsg->
getParent() ){
1572 iit->children.insert( backupMsg->
getChild() );
1585 std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1586 it = backupSubspaces.find( subspaceId );
1587 if( it == backupSubspaces.end() ) {
1592 deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
1593 set<NodeHandle>::iterator childIt;
1603 if( !iit->node.isUnspecified() && iit->node == backupMsg->
getIntermediate() ){
1604 for( childIt = iit->children.begin(); childIt != iit->children.end(); ++childIt ){
1609 subspace.
children.insert( *childIt );
1636 ++numPubSubSignalingMessages;
1637 pubSubSignalingMessagesSize+= goneMsg->getByteLength()
1639 sendMessageToUDP( iNode->
node, goneMsg );
1650 ++numPubSubSignalingMessages;
1651 pubSubSignalingMessagesSize+= releaseMsg->getByteLength()
1653 sendMessageToUDP( liNode.
node, releaseMsg );
1660 ++numPubSubSignalingMessages;
1661 pubSubSignalingMessagesSize+= helpRMsg->getByteLength()
1663 sendMessageToUDP( lobbyServer, helpRMsg );
1667 if( parentPos >= 0 ){
1675 ++numPubSubSignalingMessages;
1676 pubSubSignalingMessagesSize+= goneMsg->getByteLength()
1678 sendMessageToUDP( parent.
node, goneMsg );
1683 bool fixNeeded =
false;
1684 set<NodeHandle>::iterator childIt;
1685 for( childIt = liNode.
children.begin(); childIt != liNode.
children.end(); ++childIt ){
1689 if( !subspace.
children.insert( *childIt ).second ) fixNeeded =
true;
1698 if( newINode && newINode->
node != liNode.
node ){
1700 if( !subspace.
cachedChildren.insert( make_pair(*childIt,
true) ).second ) fixNeeded =
true;
1711 ++numPubSubSignalingMessages;
1712 pubSubSignalingMessagesSize+= adoptCall->getByteLength()
1714 sendUdpRpcCall( newINode->
node, adoptCall );
1718 if( !subspace.
cachedChildren.insert( make_pair(*childIt,
false) ).second ) fixNeeded =
true;
1740 ++numPubSubSignalingMessages;
1741 pubSubSignalingMessagesSize+= backupMsg->getByteLength()
1758 std::set<NodeHandle>::iterator childIt;
1762 for( childIt = subspace.
children.begin(); childIt != subspace.
children.end(); ++childIt ) {
1765 ++numPubSubSignalingMessages;
1766 pubSubSignalingMessagesSize+= playerCall->getByteLength()
1768 sendUdpRpcCall( *childIt, static_cast<BaseCallMessage*>(playerCall->
dup()) );
1771 ++numPubSubSignalingMessages;
1772 pubSubSignalingMessagesSize+= toPlayers->getByteLength()
1774 sendMessageToUDP( *childIt, static_cast<BaseOverlayMessage*>(toPlayers->
dup()) );
1778 std::map<NodeHandle, bool>::iterator cacheChildIt;
1782 ++numPubSubSignalingMessages;
1783 pubSubSignalingMessagesSize+= playerCall->getByteLength()
1785 sendUdpRpcCall( cacheChildIt->first, static_cast<BaseCallMessage*>(playerCall->
dup()) );
1788 ++numPubSubSignalingMessages;
1789 pubSubSignalingMessagesSize+= toPlayers->getByteLength()
1791 sendMessageToUDP( cacheChildIt->first, static_cast<BaseOverlayMessage*>(toPlayers->
dup()) );
1795 deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
1798 if( toIntermediates && !iit->node.isUnspecified() ){
1799 if( intermediateCall ){
1801 ++numPubSubSignalingMessages;
1802 pubSubSignalingMessagesSize+= intermediateCall->getByteLength()
1804 sendUdpRpcCall( iit->node, static_cast<BaseCallMessage*>(intermediateCall->
dup()) );
1807 ++numPubSubSignalingMessages;
1808 pubSubSignalingMessagesSize+= toIntermediates->getByteLength()
1810 sendMessageToUDP( iit->node, static_cast<BaseOverlayMessage*>(toIntermediates->
dup()) );
1815 for( childIt = iit->children.begin(); childIt != iit->children.end(); ++childIt ){
1818 ++numPubSubSignalingMessages;
1819 pubSubSignalingMessagesSize+= playerCall->getByteLength()
1821 sendUdpRpcCall( *childIt, static_cast<BaseCallMessage*>(playerCall->
dup()) );
1824 ++numPubSubSignalingMessages;
1825 pubSubSignalingMessagesSize+= toPlayers->getByteLength()
1827 sendMessageToUDP( *childIt, static_cast<BaseOverlayMessage*>(toPlayers->
dup()) );
1836 ++numPubSubSignalingMessages;
1837 pubSubSignalingMessagesSize+= backupCall->getByteLength()
1842 ++numPubSubSignalingMessages;
1843 pubSubSignalingMessagesSize+= toBackup->getByteLength()
1854 int numRespSubspaces = responsibleSubspaces.size();
1855 std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
1856 for( it = responsibleSubspaces.begin(); it != responsibleSubspaces.end(); ++it ){
1867 std::deque<PubSubMoveMessage*>::iterator msgIt;
1870 moveList->
setPlayer( pos, (*msgIt)->getPlayer() );
1871 moveList->
setPosition( pos, (*msgIt)->getPosition() );
1872 moveList->
setPositionAge( pos, simTime() - (*msgIt)->getCreationTime() );
1874 cancelAndDelete( *msgIt );
1880 for( set<NodeHandle>::iterator childIt = subspace.
children.begin();
1881 childIt != subspace.
children.end(); ++childIt )
1884 ++numMoveListMessages;
1885 moveListMessagesSize+= moveList->getByteLength();
1886 respMoveListMessagesSize+= (int)((
double) moveList->getByteLength() / numRespSubspaces)
1892 if( moveList->getByteLength() < 1024 ){
1893 for( map<NodeHandle, bool>::iterator childIt = subspace.
cachedChildren.begin();
1897 ++numMoveListMessages;
1898 moveListMessagesSize+= moveList->getByteLength();
1899 respMoveListMessagesSize+= (int)((
double) moveList->getByteLength() / numRespSubspaces)
1907 deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
1911 if( intermediatePos >= maxChildren &&
1912 !subspace.
intermediateNodes[intermediatePos/maxChildren -1].node.isUnspecified() )
continue;
1913 if( !iit->node.isUnspecified() ) {
1915 ++numMoveListMessages;
1916 moveListMessagesSize+= moveList->getByteLength();
1917 respMoveListMessagesSize+= (int)((
double) moveList->getByteLength() / numRespSubspaces)
1930 if(state == READY) {
1931 getParentModule()->getParentModule()->getDisplayString().setTagArg(
"i2", 1,
"green");
1932 getDisplayString().setTagArg(
"i", 1,
"green");
1934 else if(state == JOIN) {
1935 getParentModule()->getParentModule()->getDisplayString().setTagArg(
"i2", 1,
"yellow");
1936 getDisplayString().setTagArg(
"i", 1,
"yellow");
1939 getParentModule()->getParentModule()->getDisplayString().setTagArg(
"i2", 1,
"red");
1940 getDisplayString().setTagArg(
"i", 1,
"red");
1948 EV <<
"[PubSubMMOG::startTimer() @ " << thisNode.getIp()
1949 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
1950 <<
" WARNING! Trying to start NULL timer @ " << thisNode <<
"\n"
1955 if( timer->isScheduled() ) {
1956 cancelEvent( timer );
1959 simtime_t duration = 0;
1962 duration = parentTimeout/2;
1965 duration = parentTimeout*10;
1968 duration = parentTimeout;
1971 duration = 1.0/movementRate;
1974 scheduleAt(simTime() + duration, timer );
1979 simtime_t time = globalStatistics->calcMeasuredLifetime(creationTime);
1982 globalStatistics->addStdDev(
"PubSubMMOG: Sent Signaling Messages/s",
1983 numPubSubSignalingMessages / time);
1984 globalStatistics->addStdDev(
"PubSubMMOG: Sent Signaling bytes/s",
1985 pubSubSignalingMessagesSize / time);
1986 globalStatistics->addStdDev(
"PubSubMMOG: Sent Move Messages/s",
1987 numMoveMessages / time);
1988 globalStatistics->addStdDev(
"PubSubMMOG: Sent Move bytes/s",
1989 moveMessagesSize / time);
1990 globalStatistics->addStdDev(
"PubSubMMOG: Sent MoveList Messages/s",
1991 numMoveListMessages / time);
1992 globalStatistics->addStdDev(
"PubSubMMOG: Sent MoveList bytes/s",
1993 moveListMessagesSize / time);
1994 globalStatistics->addStdDev(
"PubSubMMOG: Received Move Events (correct timeslot)/s",
1995 numEventsCorrectTimeslot / time);
1996 globalStatistics->addStdDev(
"PubSubMMOG: Received Move Events (wrong timeslot)/s",
1997 numEventsWrongTimeslot / time);
1998 globalStatistics->addStdDev(
"PubSubMMOG: Responsible Nodes: Send MoveList Bytes/s",
1999 respMoveListMessagesSize / time);
2000 globalStatistics->addStdDev(
"PubSubMMOG: Lost or too long delayed MoveLists/s",
2001 lostMovementLists / time);
2002 globalStatistics->addStdDev(
"PubSubMMOG: Received valid MoveLists/s",
2003 receivedMovementLists / time);
2009 std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
2010 for( it = responsibleSubspaces.begin(); it != responsibleSubspaces.end(); ++it) {
2011 deque<PubSubMoveMessage*>::iterator msgIt;
2012 for( msgIt = it->second.waitingMoveMessages.begin(); msgIt != it->second.waitingMoveMessages.end(); ++msgIt ){
2013 cancelAndDelete( *msgIt );
2015 it->second.waitingMoveMessages.clear();
2018 cancelAndDelete(heartbeatTimer);