OverSim
PubSubLobby.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2006 Institut fuer Telematik, Universitaet Karlsruhe (TH)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 //
18 
24 #include "PubSubLobby.h"
25 
26 #include <GlobalStatistics.h>
27 #include <GlobalNodeListAccess.h>
28 
29 using namespace std;
30 
31 std::ostream& operator<< (std::ostream& o, const PubSubLobby::ChildEntry& entry)
32 {
33  o << "Node: " << entry.handle << " ressources: " << entry.ressources;
34  return o;
35 }
36 
38 
40 {
41  // because of IPAddressResolver, we need to wait until interfaces are registered,
42  // address auto-assignment takes place etc.
43  if(stage != MIN_STAGE_OVERLAY) return;
44 
45  numSubspaces = par("numSubspaces");
46  subspaceSize = (int) ( (unsigned int) par("areaDimension") / numSubspaces);
47 
48  // FIXME: Inefficient, make subspace a single dimensioned array
49  subspaces.resize( numSubspaces );
50  for ( int i = 0; i < numSubspaces; ++i ) {
51  for( int ii = 0; ii < numSubspaces; ++ii ) {
52  PubSubSubspaceId region( i, ii, numSubspaces );
53  subspaces[i].push_back( PubSubSubspaceLobby( region ) );
54  }
55  WATCH_VECTOR( subspaces[i] );
56  }
57  thisNode.setKey( OverlayKey::random() );
58  GlobalNodeListAccess().get()->registerPeer( thisNode );
59 
60  numPubSubSignalingMessages = 0;
61  pubSubSignalingMessagesSize = 0;
62  WATCH( numPubSubSignalingMessages );
63  WATCH( pubSubSignalingMessagesSize );
64  WATCH_MAP( playerMap );
65 }
66 
67 void PubSubLobby::handleTimerEvent(cMessage* msg)
68 {
69  if( PubSubTimer* timer = dynamic_cast<PubSubTimer*>(msg) ) {
70  if( timer->getType() == PUBSUB_TAKEOVER_GRACE_TIME ){
71  // Grace period for subspace takeover timed out.
72  // If noone claimed the subspace yet, the next respNode query will
73  // trigger the selection of a new responsible node
74  PubSubSubspaceId subspaceId(timer->getSubspaceId(), numSubspaces );
75  subspaces[subspaceId.getX()][subspaceId.getY()].waitingForRespNode = false;
76  delete timer;
77  }
78  }
79 }
80 
82 {
83  if( PubSubFailedNodeMessage* failMsg = dynamic_cast<PubSubFailedNodeMessage*>(msg) ){
84  failedNode( failMsg->getFailedNode() );
85  delete msg;
86 
87  } else if( PubSubReplacementMessage* repMsg = dynamic_cast<PubSubReplacementMessage*>(msg) ){
88  replaceResponsibleNode( repMsg->getSubspaceId(), repMsg->getNewResponsibleNode() );
89  delete msg;
90  } else if( PubSubHelpReleaseMessage* helpRMsg = dynamic_cast<PubSubHelpReleaseMessage*>(msg) ){
91  handleHelpReleaseMessage( helpRMsg );
92  delete msg;
93  }
94 }
95 
97 {
98  // delegate messages
99  RPC_SWITCH_START( msg )
100  RPC_DELEGATE( PubSubJoin, handleJoin );
101  RPC_DELEGATE( PubSubHelp, handleHelpCall );
102  RPC_DELEGATE( PubSubResponsibleNode, handleRespCall );
103  RPC_SWITCH_END( )
104 
105  return RPC_HANDLED;
106 }
107 
109  cPolymorphic* context,
110  int rpcId, simtime_t rtt)
111 {
112  RPC_SWITCH_START(msg);
113  RPC_ON_RESPONSE( PubSubTakeOverSubspace ) {
114  handleTakeOverResponse( _PubSubTakeOverSubspaceResponse );
115  break;
116  }
117  RPC_SWITCH_END( );
118 }
119 
121  const TransportAddress &dest,
122  cPolymorphic* context, int rpcId,
123  const OverlayKey &destKey)
124 {
125  RPC_SWITCH_START(msg)
126  RPC_ON_CALL( PubSubTakeOverSubspace ) {
127  handleTakeOverTimeout( _PubSubTakeOverSubspaceCall, dest );
128  EV << "[PubSubMMOG::handleRpcTimeout() @ " << thisNode.getIp()
129  << " (" << thisNode.getKey().toString(16) << ")]\n"
130  << " TakeOverSubspace RPC Call timed out: id=" << rpcId << "\n"
131  << " msg=" << *_PubSubTakeOverSubspaceCall
132  << endl;
133  break;
134  }
135  RPC_SWITCH_END( )
136 }
137 
139 {
140  // Insert node in the queue of possible backup nodes
141  ChildEntry e;
142  e.handle = joinMsg->getSrcNode();
143  e.ressources = joinMsg->getRessources();
144 
145  pair<PlayerMap::iterator, bool> inserter;
146  inserter = playerMap.insert( make_pair( e.handle, e ));
147  ChildEntry* childEntry = &(inserter.first->second);
148  //pair<PlayerRessourceMap::iterator, bool> rInserter;
149  //rInserter = playerRessourceMap.insert( make_pair( e.ressources, childEntry ));
150  PlayerRessourceMap::iterator rInserter;
151  rInserter = playerRessourceMap.insert( make_pair( e.ressources, childEntry ));
152  bool insertedAtBegin = rInserter == playerRessourceMap.begin();
153 
154  // send answer with responsible node
155  PubSubJoinResponse* joinResp = new PubSubJoinResponse( "Join Response");
156  unsigned int x = (unsigned int) (joinMsg->getPosition().x / subspaceSize);
157  unsigned int y = (unsigned int) (joinMsg->getPosition().y / subspaceSize);
158  PubSubSubspaceLobby& subspace = subspaces[x][y];
159  NodeHandle respNode = subspace.getResponsibleNode();
160  joinResp->setResponsibleNode( respNode );
161  joinResp->setBitLength( PUBSUB_JOINRESPONSE_L( joinResp ));
162  RECORD_STATS(
163  ++numPubSubSignalingMessages;
164  pubSubSignalingMessagesSize += joinResp->getByteLength()
165  );
166  sendRpcResponse( joinMsg, joinResp );
167 
168  if( respNode.isUnspecified() && !subspace.waitingForRespNode) {
169  // respNode is unknown, create new...
170  // TODO: refactor: make a funktion out of this...
171  PubSubTakeOverSubspaceCall* toCall = new PubSubTakeOverSubspaceCall( "Take over subspace");
172  toCall->setSubspacePos( Vector2D(x, y) );
173 
174  ChildEntry* child = playerRessourceMap.begin()->second;
175  toCall->setBitLength( PUBSUB_TAKEOVERSUBSPACECALL_L( toCall ));
176  RECORD_STATS(
177  ++numPubSubSignalingMessages;
178  pubSubSignalingMessagesSize += toCall->getByteLength()
179  );
180  sendUdpRpcCall( child->handle, toCall );
181 
182  playerRessourceMap.erase( playerRessourceMap.begin() );
183  child->dutySet.insert( subspace.getId().getId() );
184  child->ressources-=2; // XXX FIXME: make it a parameter...
185  if( insertedAtBegin ){
186  rInserter = playerRessourceMap.insert( make_pair(child->ressources, child) );
187  } else {
188  playerRessourceMap.insert( make_pair(child->ressources, child) );
189  }
190 
191  subspace.waitingForRespNode = true;
192  }
193 
194  // New node is out of luck: he gets to help all waiting nodes as long as he has ressources left
195  if( waitingForHelp.size() > 0 ) {
196  std::list<PubSubHelpCall*>::iterator it = waitingForHelp.begin();
197  while( it != waitingForHelp.end() ) {
198  // Insert subspace into node's dutySet
199  if( childEntry->dutySet.insert( (*it)->getSubspaceId() ).second ){
200  // If it was not already there (due to duplicate HelpCalls because of retransmissions),
201  // decrease ressources
202  childEntry->ressources -= ( (*it)->getHelpType() == PUBSUB_BACKUP ) ? 2 : 1; // FIXME: make it a parameter
203  }
204 
205  PubSubHelpResponse* helpResp = new PubSubHelpResponse("Ask him to help you");
206  helpResp->setSubspaceId( (*it)->getSubspaceId() );
207  helpResp->setType( (*it)->getType() );
208  helpResp->setNode( e.handle );
209  helpResp->setBitLength( PUBSUB_HELPRESPONSE_L( helpResp ));
210  RECORD_STATS(
211  ++numPubSubSignalingMessages;
212  pubSubSignalingMessagesSize += helpResp->getByteLength()
213  );
214  sendRpcResponse( *it, helpResp );
215 
216  waitingForHelp.erase( it++ );
217 
218  if( childEntry->ressources <= 0 ) break; // FIXME: clean up duplicate calls!
219  }
220  // Fix ressource map entry
221  playerRessourceMap.erase( rInserter );
222  playerRessourceMap.insert( make_pair(childEntry->ressources, childEntry) );
223  }
224 }
225 
227 {
228  // A node needs help! Give him the handle of the node with the most ressources...
229  const NodeHandle& src = helpMsg->getSrcNode();
230  int subspaceId = helpMsg->getSubspaceId();
231  PlayerRessourceMap::iterator it;
232  for( it = playerRessourceMap.begin(); it != playerRessourceMap.end(); ++it ){
233  if( it->second->handle != src &&
234  it->second->dutySet.find( subspaceId ) == it->second->dutySet.end() &&
235  it->second->ressources > 1 ){
236  break;
237  }
238  }
239 
240  // No suitable node found!
241  if( it == playerRessourceMap.end() ){
242  waitingForHelp.push_back( helpMsg );
243  return;
244  }
245 
246  // decrease ressources
247  ChildEntry* child = it->second;
248  child->ressources -= ( helpMsg->getHelpType() == PUBSUB_BACKUP ) ? 2 : 1; // FIXME: make it a parameter
249  child->dutySet.insert( subspaceId );
250  playerRessourceMap.erase( it );
251  playerRessourceMap.insert( make_pair(child->ressources, child) );
252 
253  // Send handle to requesting node
254  PubSubHelpResponse* helpResp = new PubSubHelpResponse("Ask him to help you");
255  helpResp->setSubspaceId( subspaceId );
256  helpResp->setHelpType( helpMsg->getHelpType() );
257  helpResp->setNode( child->handle );
258  helpResp->setBitLength( PUBSUB_HELPRESPONSE_L( helpResp ));
259  RECORD_STATS(
260  ++numPubSubSignalingMessages;
261  pubSubSignalingMessagesSize += helpResp->getByteLength()
262  );
263  sendRpcResponse( helpMsg, helpResp );
264 }
265 
267 {
268  unsigned int x = (unsigned int) respCall->getSubspacePos().x;
269  unsigned int y = (unsigned int) respCall->getSubspacePos().y;
270  NodeHandle respNode = subspaces[x][y].getResponsibleNode();
271  if( !respNode.isUnspecified() ) {
272  PubSubSubspaceId region( x, y, numSubspaces);
273 
274  PubSubResponsibleNodeResponse* msg = new PubSubResponsibleNodeResponse( "ResponsibleNode Response");
275  msg->setResponsibleNode( respNode );
276  msg->setSubspaceId( region.getId() );
277  msg->setBitLength( PUBSUB_RESPONSIBLENODERESPONSE_L( msg ));
278  RECORD_STATS(
279  ++numPubSubSignalingMessages;
280  pubSubSignalingMessagesSize += msg->getByteLength()
281  );
282  sendRpcResponse( respCall, msg );
283  } else {
284  // no responsible node for subspace known.
285  // push call to list of waiting nodes ...
286  PubSubSubspaceLobby& subspace = subspaces[x][y];
287  subspace.waitingNodes.push_back( respCall );
288 
289  if (!subspace.waitingForRespNode) {
290  // ... and ask a node to take over the subspace
291  PubSubTakeOverSubspaceCall* msg = new PubSubTakeOverSubspaceCall( "Take over subspace");
292  msg->setSubspacePos( Vector2D( x, y) );
293 
294  ChildEntry* child = playerRessourceMap.begin()->second;
295  msg->setBitLength( PUBSUB_TAKEOVERSUBSPACECALL_L( msg ));
296  RECORD_STATS(
297  ++numPubSubSignalingMessages;
298  pubSubSignalingMessagesSize += msg->getByteLength()
299  );
300  sendUdpRpcCall( child->handle, msg );
301 
302  playerRessourceMap.erase( playerRessourceMap.begin() );
303  child->dutySet.insert( subspace.getId().getId() );
304  // Decrease ressources. Note: the ressources are decreased by the cost of an "backup" node
305  // The rest will be decreased when the new responsible answeres the takeover call
306  child->ressources-=1; // FIXME: make it a parameter...
307  playerRessourceMap.insert( make_pair(child->ressources, child) );
308 
309  subspace.waitingForRespNode = true;
310  }
311  }
312 }
313 
315 {
316  NodeHandle respNode = takeOverResp->getSrcNode();
317  unsigned int x = (unsigned int) takeOverResp->getSubspacePos().x;
318  unsigned int y = (unsigned int) takeOverResp->getSubspacePos().y;
319  PubSubSubspaceId region( x, y, numSubspaces);
320 
321  replaceResponsibleNode( region, takeOverResp->getSrcNode() );
322 }
323 
325 {
326  Vector2D pos = takeOverCall->getSubspacePos();
327  subspaces[(int) pos.x][(int) pos.y].waitingForRespNode = false;
328  failedNode( oldNode );
329 }
330 
332 {
333  PlayerMap::iterator playerIt = playerMap.find( helpRMsg->getNode() );
334  if( playerIt == playerMap.end() ){
335  // Player was already deleted
336  return;
337  }
338  ChildEntry* nodeEntry = &(playerIt->second);
339 
340  // remove subspace from node's duty set
341  nodeEntry->dutySet.erase( helpRMsg->getSubspaceId() );
342 
343  // Increase node's ressources
344  pair<PlayerRessourceMap::iterator, PlayerRessourceMap::iterator> resRange;
345  PlayerRessourceMap::iterator resIt;
346  resRange = playerRessourceMap.equal_range( nodeEntry->ressources );
347  for( resIt = resRange.first; resIt != resRange.second; ++resIt ){
348  if( resIt->second == nodeEntry ){
349  playerRessourceMap.erase( resIt );
350  break;
351  }
352  }
353  nodeEntry->ressources += 1; // FIXME: make it a parameter
354  playerRessourceMap.insert( make_pair(nodeEntry->ressources, nodeEntry) );
355 }
356 
357 void PubSubLobby::replaceResponsibleNode( int subspaceId, NodeHandle respNode )
358 {
359  replaceResponsibleNode( PubSubSubspaceId( subspaceId, numSubspaces), respNode );
360 }
361 
363 {
364  // a new responsible node was found for a subspace
365  PubSubSubspaceLobby& subspace = subspaces[subspaceId.getX()][subspaceId.getY()];
366 // NodeHandle oldNode = subspace.getResponsibleNode();
367 
368  // set new responsible node
369  subspace.setResponsibleNode( respNode );
370  subspace.waitingForRespNode = false;
371 
372  // decrease responsible node's ressources
373  pair<PlayerRessourceMap::iterator, PlayerRessourceMap::iterator> resRange;
374  PlayerRessourceMap::iterator resIt;
375  PlayerMap::iterator plIt = playerMap.find( respNode );
376 
377  if( plIt == playerMap.end() ){
378  // FIXME: How to react?
379  // Best would be: reinsert node. But most probable we have two nodes that want to be
380  // responsible, so how to avoid the resulting inconsostency?
381  opp_error("PlayerMap inconsistent: Allegedly failed node wants to become Responsible node");
382  }
383 // ChildEntry* respNodeEntry = &(plIt->second);
384 // resRange = playerRessourceMap.equal_range( respNodeEntry->ressources );
385 // for( resIt = resRange.first; resIt != resRange.second; ++resIt ){
386 // if( resIt->second == respNodeEntry ){
387 // playerRessourceMap.erase( resIt );
388 // break;
389 // }
390 // }
391 // respNodeEntry->ressources -= 2; // FIXME: make it a parameter
392 // playerRessourceMap.insert( make_pair(respNodeEntry->ressources, respNodeEntry) );
393 
394  // remove old node from backupList->he failed...
395 // failedNode( oldNode );
396 
397  // inform all waiting nodes...
398  std::list<PubSubResponsibleNodeCall*>::iterator it;
399  for( it = subspace.waitingNodes.begin(); it != subspace.waitingNodes.end(); ++it ) {
400  PubSubResponsibleNodeResponse* msg = new PubSubResponsibleNodeResponse( "ResponsibleNode Response");
401  msg->setResponsibleNode( respNode );
402  msg->setSubspaceId( subspaceId.getId() );
403  msg->setBitLength( PUBSUB_RESPONSIBLENODERESPONSE_L( msg ));
404  RECORD_STATS(
405  ++numPubSubSignalingMessages;
406  pubSubSignalingMessagesSize += msg->getByteLength()
407  );
408  sendRpcResponse( *it, msg );
409  }
410  subspace.waitingNodes.clear();
411 }
412 
413 // void PubSubLobby::failedNode(const NodeHandle& node)
415 {
416  if( node.isUnspecified() ) return;
417 
418  // Find node in playerMap
419  PlayerMap::iterator playerIt = playerMap.find( node );
420  if( playerIt == playerMap.end() ){
421  // Player was already deleted
422  return;
423  }
424  ChildEntry* respNodeEntry = &(playerIt->second);
425 
426 // FIXME: only for debugging
427 if( GlobalNodeListAccess().get()->getPeerInfo( node ) ){
428  opp_error("Trying to delete node that's still there...");
429 }
430 
431  // check if node was responsible for a subspace
432  set<int>::iterator dutyIt;
433  for( dutyIt = respNodeEntry->dutySet.begin(); dutyIt != respNodeEntry->dutySet.end(); ++dutyIt ){
434  PubSubSubspaceId subspaceId( *dutyIt, numSubspaces );
435  PubSubSubspaceLobby& subspace = subspaces[subspaceId.getX()][subspaceId.getY()];
436  if( !subspace.getResponsibleNode().isUnspecified() && node == subspace.getResponsibleNode() ){
437  // remove old responsible node
438  subspace.setResponsibleNode(NodeHandle());
439 
440  // wait for the backup node to claim subspace; if timer expires, waiting-flag will be reset
441  subspace.waitingForRespNode = true;
442  PubSubTimer* graceTimer = new PubSubTimer("Grace timer for claiming subspace");
443  graceTimer->setType( PUBSUB_TAKEOVER_GRACE_TIME );
444  graceTimer->setSubspaceId( subspace.getId().getId() );
445  scheduleAt( simTime() + 5, graceTimer ); //FIXME: make it a parameter
446  }
447  }
448 
449  // delete node from backupList
450  pair<PlayerRessourceMap::iterator, PlayerRessourceMap::iterator> resRange;
451  PlayerRessourceMap::iterator resIt;
452 
453  resRange = playerRessourceMap.equal_range( respNodeEntry->ressources );
454  for( resIt = resRange.first; resIt != resRange.second; ++resIt ){
455  if( resIt->second == respNodeEntry ){
456  playerRessourceMap.erase( resIt );
457  break;
458  }
459  }
460  playerMap.erase( playerIt );
461 }
462 
464 {
465  simtime_t time = globalStatistics->calcMeasuredLifetime(creationTime);
466  if (time < GlobalStatistics::MIN_MEASURED) return;
467 
468  globalStatistics->addStdDev("PubSubLobby: Sent Signaling Messages/s",
469  numPubSubSignalingMessages / time);
470  globalStatistics->addStdDev("PubSubLobby: Sent Signaling bytes/s",
471  pubSubSignalingMessagesSize / time);
472 }
473 
475 {
476 }