45 numSubspaces = par(
"numSubspaces");
46 subspaceSize = (int) ( (
unsigned int) par(
"areaDimension") / numSubspaces);
49 subspaces.resize( numSubspaces );
50 for (
int i = 0; i < numSubspaces; ++i ) {
51 for(
int ii = 0; ii < numSubspaces; ++ii ) {
55 WATCH_VECTOR( subspaces[i] );
60 numPubSubSignalingMessages = 0;
61 pubSubSignalingMessagesSize = 0;
62 WATCH( numPubSubSignalingMessages );
63 WATCH( pubSubSignalingMessagesSize );
64 WATCH_MAP( playerMap );
69 if(
PubSubTimer* timer = dynamic_cast<PubSubTimer*>(msg) ) {
75 subspaces[subspaceId.getX()][subspaceId.getY()].waitingForRespNode =
false;
84 failedNode( failMsg->getFailedNode() );
88 replaceResponsibleNode( repMsg->getSubspaceId(), repMsg->getNewResponsibleNode() );
91 handleHelpReleaseMessage( helpRMsg );
109 cPolymorphic* context,
110 int rpcId, simtime_t rtt)
114 handleTakeOverResponse( _PubSubTakeOverSubspaceResponse );
122 cPolymorphic* context,
int rpcId,
127 handleTakeOverTimeout( _PubSubTakeOverSubspaceCall, dest );
128 EV <<
"[PubSubMMOG::handleRpcTimeout() @ " << thisNode.getIp()
129 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
130 <<
" TakeOverSubspace RPC Call timed out: id=" << rpcId <<
"\n"
131 <<
" msg=" << *_PubSubTakeOverSubspaceCall
145 pair<PlayerMap::iterator, bool> inserter;
146 inserter = playerMap.insert( make_pair( e.
handle, e ));
147 ChildEntry* childEntry = &(inserter.first->second);
150 PlayerRessourceMap::iterator rInserter;
151 rInserter = playerRessourceMap.insert( make_pair( e.
ressources, childEntry ));
152 bool insertedAtBegin = rInserter == playerRessourceMap.begin();
156 unsigned int x = (
unsigned int) (joinMsg->
getPosition().
x / subspaceSize);
157 unsigned int y = (
unsigned int) (joinMsg->
getPosition().
y / subspaceSize);
163 ++numPubSubSignalingMessages;
164 pubSubSignalingMessagesSize += joinResp->getByteLength()
166 sendRpcResponse( joinMsg, joinResp );
174 ChildEntry* child = playerRessourceMap.begin()->second;
177 ++numPubSubSignalingMessages;
178 pubSubSignalingMessagesSize += toCall->getByteLength()
180 sendUdpRpcCall( child->
handle, toCall );
182 playerRessourceMap.erase( playerRessourceMap.begin() );
185 if( insertedAtBegin ){
186 rInserter = playerRessourceMap.insert( make_pair(child->
ressources, child) );
188 playerRessourceMap.insert( make_pair(child->
ressources, child) );
195 if( waitingForHelp.size() > 0 ) {
196 std::list<PubSubHelpCall*>::iterator it = waitingForHelp.begin();
197 while( it != waitingForHelp.end() ) {
199 if( childEntry->
dutySet.insert( (*it)->getSubspaceId() ).second ){
207 helpResp->
setType( (*it)->getType() );
211 ++numPubSubSignalingMessages;
212 pubSubSignalingMessagesSize += helpResp->getByteLength()
214 sendRpcResponse( *it, helpResp );
216 waitingForHelp.erase( it++ );
221 playerRessourceMap.erase( rInserter );
222 playerRessourceMap.insert( make_pair(childEntry->
ressources, childEntry) );
231 PlayerRessourceMap::iterator it;
232 for( it = playerRessourceMap.begin(); it != playerRessourceMap.end(); ++it ){
233 if( it->second->handle != src &&
234 it->second->dutySet.find( subspaceId ) == it->second->dutySet.end() &&
235 it->second->ressources > 1 ){
241 if( it == playerRessourceMap.end() ){
242 waitingForHelp.push_back( helpMsg );
249 child->
dutySet.insert( subspaceId );
250 playerRessourceMap.erase( it );
251 playerRessourceMap.insert( make_pair(child->
ressources, child) );
260 ++numPubSubSignalingMessages;
261 pubSubSignalingMessagesSize += helpResp->getByteLength()
263 sendRpcResponse( helpMsg, helpResp );
270 NodeHandle respNode = subspaces[x][y].getResponsibleNode();
279 ++numPubSubSignalingMessages;
280 pubSubSignalingMessagesSize += msg->getByteLength()
282 sendRpcResponse( respCall, msg );
294 ChildEntry* child = playerRessourceMap.begin()->second;
297 ++numPubSubSignalingMessages;
298 pubSubSignalingMessagesSize += msg->getByteLength()
300 sendUdpRpcCall( child->
handle, msg );
302 playerRessourceMap.erase( playerRessourceMap.begin() );
307 playerRessourceMap.insert( make_pair(child->
ressources, child) );
321 replaceResponsibleNode( region, takeOverResp->
getSrcNode() );
327 subspaces[(int) pos.
x][(
int) pos.
y].waitingForRespNode =
false;
328 failedNode( oldNode );
333 PlayerMap::iterator playerIt = playerMap.find( helpRMsg->
getNode() );
334 if( playerIt == playerMap.end() ){
344 pair<PlayerRessourceMap::iterator, PlayerRessourceMap::iterator> resRange;
345 PlayerRessourceMap::iterator resIt;
346 resRange = playerRessourceMap.equal_range( nodeEntry->
ressources );
347 for( resIt = resRange.first; resIt != resRange.second; ++resIt ){
348 if( resIt->second == nodeEntry ){
349 playerRessourceMap.erase( resIt );
354 playerRessourceMap.insert( make_pair(nodeEntry->
ressources, nodeEntry) );
359 replaceResponsibleNode(
PubSubSubspaceId( subspaceId, numSubspaces), respNode );
373 pair<PlayerRessourceMap::iterator, PlayerRessourceMap::iterator> resRange;
374 PlayerRessourceMap::iterator resIt;
375 PlayerMap::iterator plIt = playerMap.find( respNode );
377 if( plIt == playerMap.end() ){
381 opp_error(
"PlayerMap inconsistent: Allegedly failed node wants to become Responsible node");
398 std::list<PubSubResponsibleNodeCall*>::iterator it;
405 ++numPubSubSignalingMessages;
406 pubSubSignalingMessagesSize += msg->getByteLength()
408 sendRpcResponse( *it, msg );
419 PlayerMap::iterator playerIt = playerMap.find( node );
420 if( playerIt == playerMap.end() ){
424 ChildEntry* respNodeEntry = &(playerIt->second);
428 opp_error(
"Trying to delete node that's still there...");
432 set<int>::iterator dutyIt;
433 for( dutyIt = respNodeEntry->
dutySet.begin(); dutyIt != respNodeEntry->
dutySet.end(); ++dutyIt ){
445 scheduleAt( simTime() + 5, graceTimer );
450 pair<PlayerRessourceMap::iterator, PlayerRessourceMap::iterator> resRange;
451 PlayerRessourceMap::iterator resIt;
453 resRange = playerRessourceMap.equal_range( respNodeEntry->
ressources );
454 for( resIt = resRange.first; resIt != resRange.second; ++resIt ){
455 if( resIt->second == respNodeEntry ){
456 playerRessourceMap.erase( resIt );
460 playerMap.erase( playerIt );
465 simtime_t time = globalStatistics->calcMeasuredLifetime(creationTime);
468 globalStatistics->addStdDev(
"PubSubLobby: Sent Signaling Messages/s",
469 numPubSubSignalingMessages / time);
470 globalStatistics->addStdDev(
"PubSubLobby: Sent Signaling bytes/s",
471 pubSubSignalingMessagesSize / time);