OverSim
Pastry.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2012 Institute of Telematics, Karlsruhe Institute of Technology (KIT)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 //
18 
24 #include <cassert>
25 
26 #include <IPAddressResolver.h>
27 #include <IPvXAddress.h>
28 #include <IInterfaceTable.h>
29 #include <IPv4InterfaceData.h>
30 #include <RpcMacros.h>
31 #include <InitStages.h>
32 #include <GlobalStatistics.h>
33 
34 #include "Pastry.h"
35 
36 
38 
40 {
41  // destroy self timer messages
42  cancelAndDelete(readyWait);
43  cancelAndDelete(joinUpdateWait);
44  cancelAndDelete(secondStageWait);
45  if (useDiscovery) cancelAndDelete(discoveryTimeout);
46  if (routingTableMaintenanceInterval > 0) cancelAndDelete(repairTaskTimeout);
47 
48  clearVectors();
49 }
50 
51 
53 {
54  // purge pending state messages
55  if (!stReceived.empty()) {
56  for (std::vector<PastryStateMsgHandle>::iterator it =
57  stReceived.begin(); it != stReceived.end(); it++) {
58  // check whether one of the pointers is a duplicate of stateCache
59  if (it->msg == stateCache.msg) stateCache.msg = NULL;
60  if (it->prox == stateCache.prox) stateCache.prox = NULL;
61  delete it->msg;
62  delete it->prox;
63  }
64  stReceived.clear();
65  stReceivedPos = stReceived.end();
66  }
67 
68  // purge notify list:
69  notifyList.clear();
70 }
71 
72 
74 {
75  clearVectors();
76 
77  // purge vector of waiting sendState messages:
78  if (! sendStateWait.empty()) {
79  for (std::vector<PastrySendState*>::iterator it =
80  sendStateWait.begin(); it != sendStateWait.end(); it++) {
81  if ( (*it)->isScheduled() ) cancelEvent(*it);
82  delete *it;
83  }
84  sendStateWait.clear();
85  }
86 
88 }
89 
90 
92 {
93  if ( stage != MIN_STAGE_OVERLAY )
94  return;
95 
96  // Pastry provides KBR services
97  kbr = true;
98 
99  baseInit();
100 
101  useDiscovery = par("useDiscovery");
102  useSecondStage = par("useSecondStage");
103  pingBeforeSecondStage = par("pingBeforeSecondStage");
104  secondStageInterval = par("secondStageWait");
105  discoveryTimeoutAmount = par("discoveryTimeoutAmount");
106  useRoutingTableMaintenance = par("useRoutingTableMaintenance");
107  routingTableMaintenanceInterval = par("routingTableMaintenanceInterval");
108  sendStateAtLeafsetRepair = par("sendStateAtLeafsetRepair");
109  partialJoinPath = par("partialJoinPath");
110  readyWaitAmount = par("readyWait");
111  minimalJoinState = par("minimalJoinState");
112 
113  overrideOldPastry = par("overrideOldPastry");
114  overrideNewPastry = par("overrideNewPastry");
115 
116  if (overrideOldPastry) {
117  useSecondStage = true;
118  useDiscovery = false;
121  minimalJoinState = false;
122  }
123 
124  if (overrideNewPastry) {
125  useSecondStage = false;
126  useDiscovery = true;
128  sendStateAtLeafsetRepair = false;
129  minimalJoinState = true;
130  }
131 
132  readyWait = new cMessage("readyWait");
133  secondStageWait = new cMessage("secondStageWait");
134  joinUpdateWait = new cMessage("joinUpdateWait");
135 
137  (useDiscovery ? new cMessage("discoveryTimeout") : NULL);
139  (useRoutingTableMaintenance ? new cMessage("repairTaskTimeout") : NULL);
140 
141  updateCounter = 0;
142 }
143 
144 
146 {
147  changeState(INIT);
148 
150  // no existing pastry network -> first node of a new one
152  } else {
153  // join existing pastry network
156  else changeState(JOIN);
157  }
158 }
159 
160 
161 void Pastry::changeState(int toState)
162 {
163  if (readyWait->isScheduled()) cancelEvent(readyWait);
164  baseChangeState(toState);
165 
166  switch (toState) {
167  case INIT:
168  cancelAllRpcs();
169  purgeVectors();
170  break;
171 
172  case DISCOVERY: {
173  state = DISCOVERY;
174  nearNodeRtt = MAXTIME;
177  NULL, "PING bootstrapNode in discovery mode",
178  NULL, PING_DISCOVERY, UDP_TRANSPORT); //TODO
179 
180  RequestLeafSetCall* call =
181  new RequestLeafSetCall("REQUEST LEAFSET Call");
183  call->setBitLength(PASTRYREQUESTLEAFSETCALL_L(call));
185  leafsetReqBytesSent += call->getByteLength());
187 
188  depth = -1;
189  }
190  break;
191 
192  case JOIN: {
193  joinHopCount = 0;
194 
195  PastryJoinCall* call = new PastryJoinCall("JOIN Call");
197  call->setBitLength(PASTRYJOINCALL_L(msg));
198  RECORD_STATS(joinSent++; joinBytesSent += call->getByteLength());
200  }
201  break;
202 
203  case READY:
204  // determine list of all known nodes as notifyList
205  notifyList.clear();
208  sort(notifyList.begin(), notifyList.end());
209  notifyList.erase(unique(notifyList.begin(), notifyList.end()),
210  notifyList.end());
211 
212  // schedule update
213  cancelEvent(joinUpdateWait);
214  scheduleAt(simTime() + 0.0001, joinUpdateWait);
215 
216  // schedule second stage
217  if (useSecondStage) {
218  cancelEvent(secondStageWait);
219  scheduleAt(simTime() + secondStageInterval, secondStageWait);
220  }
221 
222  // schedule routing table maintenance task
224  cancelEvent(repairTaskTimeout);
225  scheduleAt(simTime() + routingTableMaintenanceInterval, repairTaskTimeout);
226  }
227  break;
228  }
229 }
230 
231 
233  cPolymorphic* context, int rpcId,
234  simtime_t rtt)
235 {
236  if (state == DISCOVERY) {
237  EV << "[Pastry::pingResponse() @ " << thisNode.getIp()
238  << " (" << thisNode.getKey().toString(16) << ")]\n"
239  << " Pong (or Ping-context from NeighborCache) received (from "
240  << pingResponse->getSrcNode().getIp() << ") in DISCOVERY mode"
241  << endl;
242 
243  if (nearNodeRtt > rtt) {
244  nearNode = pingResponse->getSrcNode();
245  nearNodeRtt = rtt;
246  nearNodeImproved = true;
247  }
248  }
249 }
250 
251 
252 void Pastry::handleTimerEvent(cMessage* msg)
253 {
254  if (msg == readyWait) {
255  if (partialJoinPath) {
257  sort(stReceived.begin(), stReceived.end(), stateMsgIsSmaller);
258 
259  // start pinging the nodes found in the first state message:
260  stReceivedPos = stReceived.begin();
262  EV << "[Pastry::handleTimerEvent() @ " << thisNode.getIp()
263  << " (" << thisNode.getKey().toString(16) << ")]\n"
264  << " joining despite some missing STATE messages."
265  << endl;
266  processState();
267  } else {
268  EV << "[Pastry::handleTimerEvent() @ " << thisNode.getIp()
269  << " (" << thisNode.getKey().toString(16) << ")]\n"
270  << " timeout waiting for missing state messages"
271  << " in JOIN state, restarting..."
272  << endl;
273  join();
274  }
275  } else if (msg == joinUpdateWait) {
276  EV << "[Pastry::handleTimerEvent() @ " << thisNode.getIp()
277  << " (" << thisNode.getKey().toString(16) << ")]\n"
278  << " sending state updates to all nodes."
279  << endl;
280  doJoinUpdate();
281  } else if (msg == secondStageWait) {
282  EV << "[Pastry::handleTimerEvent() @ " << thisNode.getIp()
283  << " (" << thisNode.getKey().toString(16) << ")]\n"
284  << " sending STATE requests to all nodes in"
285  << " second stage of initialization."
286  << endl;
287  doSecondStage();
288  } else if (msg == discoveryTimeout) {
289  if ((depth == 0) && (nearNodeImproved)) {
290  depth++; //repeat last step if closer node was found
291  }
292  if ((depth == 0) || (discoveryModeProbedNodes < 1)) {
293  changeState(JOIN);
294  } else {
295  RequestRoutingRowCall* call =
296  new RequestRoutingRowCall("REQUEST ROUTING ROW Call");
298  call->setRow(depth);
299  call->setBitLength(PASTRYREQUESTROUTINGROWCALL_L(call));
301  routingTableRowReqBytesSent += call->getByteLength());
302  sendUdpRpcCall(nearNode, call);
303  }
304  } else if (msg == repairTaskTimeout) {
305  EV << "[Pastry::handleTimerEvent() @ " << thisNode.getIp()
306  << " (" << thisNode.getKey().toString(16) << ")]\n"
307  << " starting routing table maintenance"
308  << endl;
310  scheduleAt(simTime() + routingTableMaintenanceInterval,
312  } else if (dynamic_cast<PastrySendState*>(msg)) {
313  PastrySendState* sendStateMsg = static_cast<PastrySendState*>(msg);
314 
315  std::vector<PastrySendState*>::iterator pos =
316  std::find(sendStateWait.begin(), sendStateWait.end(),
317  sendStateMsg);
318  if (pos != sendStateWait.end()) sendStateWait.erase(pos);
319 
322  stateBytesSent += stateMsg->getByteLength());
323  sendMessageToUDP(sendStateMsg->getDest(), stateMsg);
324 
325  delete sendStateMsg;
326  }
327 }
328 
329 
331 {
332  PastrySendState* selfMsg = new PastrySendState("sendStateWait");
333  selfMsg->setDest(destination);
334  sendStateWait.push_back(selfMsg);
335  scheduleAt(simTime() + 0.0001, selfMsg);
336 }
337 
338 
340 {
341  PastryStateMessage* stateMsg = check_and_cast<PastryStateMessage*>(msg);
342  uint32_t type = stateMsg->getPastryStateMsgType();
343 
344  if (debugOutput) {
345  EV << "[Pastry::handleUDPMessage() @ " << thisNode.getIp()
346  << " (" << thisNode.getKey().toString(16) << ")]\n"
347  << " incoming STATE message of type "
348  << cEnum::get("PastryStateMsgType")->getStringFor(type) << endl;
349  }
350 
352  stateMsg->getByteLength());
353 
354  handleStateMessage(stateMsg);
355 }
356 
357 
359 {
360  if (BasePastry::handleRpcCall(msg)) return true;
361 
362  if (state != READY) {
363  EV << "[Pastry::handleRpcCall() @ " << thisNode.getIp()
364  << " (" << thisNode.getKey().toString(16) << ")]\n"
365  << " Received RPC call and state != READY"
366  << endl;
367  return false;
368  }
369 
370  // delegate messages
371  RPC_SWITCH_START( msg )
372  // RPC_DELEGATE( <messageName>[Call|Response], <methodToCall> )
373  RPC_DELEGATE( PastryJoin, handlePastryJoinCall );
374  RPC_DELEGATE( RequestState, handleRequestStateCall );
375  RPC_DELEGATE( RequestRepair, handleRequestRepairCall );
376  RPC_SWITCH_END( )
377 
378  return RPC_HANDLED;
379 }
380 
381 
383 {
384  EV << "[Pastry::handlePastryJoinCall() @ " << thisNode.getIp()
385  << " (" << thisNode.getKey().toString(16) << ")]"
386  << endl;
387 
389  joinBytesReceived += call->getByteLength());
390 
391  if (state != READY) {
392  if (call->getSrcNode() == thisNode) {
393  EV << "[Pastry::handlePastryJoinCall() @ " << thisNode.getIp()
394  << " (" << thisNode.getKey().toString(16) << ")]\n"
395  << " PastryJoinCall received by originator!"
396  << endl;
397  } else {
398  EV << "[Pastry::handlePastryJoinCall() @ " << thisNode.getIp()
399  << " (" << thisNode.getKey().toString(16) << ")]\n"
400  << " received join message before reaching "
401  << "READY state, dropping message!"
402  << endl;
403  }
404  } else if (call->getSrcNode() == thisNode) {
405  EV << "[Pastry::handlePastryJoinCall() @ " << thisNode.getIp()
406  << " (" << thisNode.getKey().toString(16) << ")]\n"
407  << " PastryJoinCall gets dropped because it is "
408  << "outdated and has been received by originator!"
409  << endl;
410  } else {
411  OverlayCtrlInfo* overlayCtrlInfo =
412  check_and_cast<OverlayCtrlInfo*>(call->getControlInfo());
413 
414  uint32_t joinHopCount = overlayCtrlInfo->getHopCount();
415  if ((joinHopCount > 1) &&
418  joinHopCount--;
419 
420  // remove node from state if it is rejoining
421  handleFailedNode(call->getSrcNode());
422 
423  PastryJoinResponse* response = new PastryJoinResponse("JOIN Response");
424 
425  // create new state msg and set special fields for some types:
426  response->setStatType(MAINTENANCE_STAT);
427  response->setTimestamp(simTime());
428 
429  response->setBitLength(PASTRYJOINRESPONSE_L(response));
430  response->encapsulate(createStateMessage((minimalJoinState ?
433  -1, joinHopCount, true));
434 
435  // send...
437  stateBytesSent += response->getByteLength());
438 
439  sendRpcResponse(call, response);
440  }
441 }
442 
443 
445 {
446  EV << "[Pastry::handleRequestStateCall() @ " << thisNode.getIp()
447  << " (" << thisNode.getKey().toString(16) << ")]"
448  << endl;
449 
451  stateReqBytesReceived += call->getByteLength());
452 
453  if (state != READY) {
454  EV << " received repair request before reaching"
455  << " READY state, dropping message!"
456  << endl;
457  delete call;
458  return;
459  }
460 
461  RequestStateResponse* response =
462  new RequestStateResponse("REQUEST STATE Response");
463  response->setStatType(MAINTENANCE_STAT);
464 
465  response->setBitLength(PASTRYREQUESTSTATERESPONSE_L(response));
466  response->encapsulate(createStateMessage());
468  stateBytesSent += response->getByteLength());
469 
470  sendRpcResponse(call, response);
471 }
472 
473 
475 {
476  EV << "[Pastry::handleRequestRepairCall() @ " << thisNode.getIp()
477  << " (" << thisNode.getKey().toString(16) << ")]"
478  << endl;
479 
481  repairReqBytesReceived += call->getByteLength());
482 
483  if (state != READY) {
484  EV << " received repair request before reaching"
485  << " READY state, dropping message!"
486  << endl;
487  delete call;
488  return;
489  }
490 
491  RequestRepairResponse* response =
492  new RequestRepairResponse("REQUEST REPAIR Response");
493  response->setStatType(MAINTENANCE_STAT);
494 
495  response->setBitLength(PASTRYREQUESTREPAIRRESPONSE_L(response));
496  response->encapsulate(createStateMessage(PASTRY_STATE_REPAIR));
498  stateBytesSent += response->getByteLength());
499 
500  sendRpcResponse(call, response);
501 }
502 
503 
505 {
506  EV << "[Pastry::handleRequestRepairResponse() @ " << thisNode.getIp()
507  << " (" << thisNode.getKey().toString(16) << ")]"
508  << endl;
509 
511  stateBytesReceived += response->getByteLength());
512 
513  if (state == READY) {
514  handleStateMessage(check_and_cast<PastryStateMessage*>(response->decapsulate()));
515  }
516 }
517 
518 
520  cPolymorphic* context, int rpcId,
521  simtime_t rtt)
522 {
523  BasePastry::handleRpcResponse(msg, context, rpcId, rtt);
524 
525  RPC_SWITCH_START(msg)
526  RPC_ON_RESPONSE( PastryJoin ) {
527  EV << "[Pastry::handleRpcResponse() @ " << thisNode.getIp()
528  << " (" << thisNode.getKey().toString(16) << ")]\n"
529  << " Received a JOIN RPC Response: id=" << rpcId << "\n"
530  << " msg=" << *_PastryJoinResponse << " rtt=" << SIMTIME_DBL(rtt)
531  << endl;
532  handlePastryJoinResponse(_PastryJoinResponse);
533  break;
534  }
535  RPC_ON_RESPONSE( RequestState ) {
536  EV << "[Pastry::handleRpcResponse() @ " << thisNode.getIp()
537  << " (" << thisNode.getKey().toString(16) << ")]\n"
538  << " Received a RequestState RPC Response: id=" << rpcId << "\n"
539  << " msg=" << *_RequestStateResponse << " rtt=" << SIMTIME_DBL(rtt)
540  << endl;
541  handleRequestStateResponse(_RequestStateResponse);
542  break;
543  }
544  RPC_ON_RESPONSE( RequestRepair ) {
545  EV << "[BasePastry::handleRpcResponse() @ " << thisNode.getIp()
546  << " (" << thisNode.getKey().toString(16) << ")]\n"
547  << " Received a Request Repair RPC Response: id=" << rpcId << "\n"
548  << " msg=" << *_RequestRepairResponse << " rtt=" << SIMTIME_DBL(rtt)
549  << endl;
550  handleRequestRepairResponse(_RequestRepairResponse);
551  break;
552  }
553  RPC_ON_RESPONSE( RequestLeafSet ) {
554  EV << "[Pastry::handleRpcResponse() @ " << thisNode.getIp()
555  << " (" << thisNode.getKey().toString(16) << ")]\n"
556  << " Received a RequestLeafSet RPC Response: id=" << rpcId << "\n"
557  << " msg=" << *_RequestLeafSetResponse << " rtt=" << SIMTIME_DBL(rtt)
558  << endl;
559  handleRequestLeafSetResponse(_RequestLeafSetResponse);
560  break;
561  }
562  RPC_ON_RESPONSE( RequestRoutingRow ) {
563  EV << "[Pastry::handleRpcResponse() @ " << thisNode.getIp()
564  << " (" << thisNode.getKey().toString(16) << ")]\n"
565  << " Received a RequestRoutingRow RPC Response: id=" << rpcId << "\n"
566  << " msg=" << *_RequestRoutingRowResponse << " rtt=" << rtt
567  << endl;
568  handleRequestRoutingRowResponse(_RequestRoutingRowResponse);
569  break;
570  }
571  RPC_SWITCH_END( )
572 }
573 
574 
576  const TransportAddress& dest,
577  cPolymorphic* context, int rpcId,
578  const OverlayKey& key)
579 {
580  BasePastry::handleRpcTimeout(call, dest, context, rpcId, key);
581 
582  EV << "[Pastry::handleRpcTimeout() @ " << thisNode.getIp()
583  << " (" << thisNode.getKey().toString(16) << ")]\n"
584  << " Timeout of RPC Call: id=" << rpcId << "\n"
585  << " msg=" << *call << " key=" << key
586  << endl;
587 
588  if (state == DISCOVERY && dynamic_cast<RequestLeafSetCall*>(call)) {
589  join();
590  }
591 }
592 
593 
595 {
596  EV << "[Pastry::handlePastryJoinResponse() @ " << thisNode.getIp()
597  << " (" << thisNode.getKey().toString(16) << ")]"
598  << endl;
599 
601  stateBytesReceived += response->getByteLength());
602 
603  if (state == JOIN) {
604  handleStateMessage(check_and_cast<PastryStateMessage*>(response->decapsulate()));
605  }
606 }
607 
609 {
610  EV << "[Pastry::handleRequestStateResponse() @ " << thisNode.getIp()
611  << " (" << thisNode.getKey().toString(16) << ")]"
612  << endl;
613 
615  stateBytesReceived += response->getByteLength());
616 
617  if (state == READY) {
618  handleStateMessage(check_and_cast<PastryStateMessage*>(response->decapsulate()));
619  }
620 }
621 
622 
624 {
625  EV << "[Pastry::handleRequestLeafSetResponse() @ " << thisNode.getIp()
626  << " (" << thisNode.getKey().toString(16) << ")]"
627  << endl;
628 
629  if (state == DISCOVERY) {
630  const NodeHandle* node;
632  PastryStateMessage* leaves =
633  check_and_cast<PastryStateMessage*>(response->getEncapsulatedPacket());
634  for (uint32_t i = 0; i < leaves->getLeafSetArraySize(); ++i) {
635  node = &(leaves->getLeafSet(i));
636  // unspecified nodes not considered
637  if ( !(node->isUnspecified()) ) {
639  NULL, "PING received leaves for nearest node",
640  NULL, -1, UDP_TRANSPORT); //TODO
642  }
643  }
644 
645  EV << " received leafset, waiting for pings"
646  << endl;
647 
648  if (discoveryTimeout->isScheduled()) cancelEvent(discoveryTimeout);
649  scheduleAt(simTime() + discoveryTimeoutAmount, discoveryTimeout);
650  }
651 }
652 
653 
655 {
656  EV << "[Pastry::handleRequestRoutingRowResponse() @ " << thisNode.getIp()
657  << " (" << thisNode.getKey().toString(16) << ")]"
658  << endl;
659 
660  if (state == DISCOVERY) {
661  PastryStateMessage* rowState =
662  check_and_cast<PastryStateMessage*>(response->getEncapsulatedPacket());
663  uint32_t nodesPerRow = rowState->getRoutingTableArraySize();
664  const NodeHandle* node;
665  if (depth == -1) {
666  depth = rowState->getRow();
667  }
669  nearNodeImproved = false;
670 
671  if (depth > 0) {
672  for (uint32_t i = 0; i < nodesPerRow; i++) {
673  node = &(rowState->getRoutingTable(i));
674  // unspecified nodes not considered
675  if ( !(node->isUnspecified()) ) {
676  // we look for best connection here,
677  // so Timeout is short and there are no retries
678  pingNode(*node, discoveryTimeoutAmount, 0, NULL,
679  "PING received routing table for nearest node",
680  NULL, -1, UDP_TRANSPORT); //TODO
682  }
683  }
684  depth--;
685  }
686 
687  EV << " received routing table, waiting for pings"
688  << endl;
689 
690  if (discoveryTimeout->isScheduled()) cancelEvent(discoveryTimeout);
691  scheduleAt(simTime() + discoveryTimeoutAmount, discoveryTimeout);
692  }
693 }
694 
695 
697  BaseRouteMessage* msg)
698 {
699  if (dest == thisNode) {
700  return true;
701  }
702 
703  PastryJoinCall* call =
704  dynamic_cast<PastryJoinCall*>(msg->getEncapsulatedPacket());
705 
706  if (call && call->getSrcNode() != thisNode) {
708  joinBytesSeen += call->getByteLength());
709  // remove node from state if it is rejoining
710  handleFailedNode(call->getSrcNode());
711 
712  PastryStateMessage* stateMsg =
716  -1,
717  check_and_cast<OverlayCtrlInfo*>(msg->getControlInfo())->getHopCount(),
718  false);
720  stateBytesSent += stateMsg->getByteLength());
721  sendMessageToUDP(call->getSrcNode(), stateMsg);
722  }
723 
724  // forward now:
725  return true;
726 }
727 
728 
729 void Pastry::iterativeJoinHook(BaseOverlayMessage* msg, bool incrHopCount)
730 {
731  PastryFindNodeExtData* findNodeExt = NULL;
732  if (msg && msg->hasObject("findNodeExt")) {
733  findNodeExt =
734  check_and_cast<PastryFindNodeExtData*>(msg->
735  getObject("findNodeExt"));
736  }
737  // Send state tables on any JOIN message we see:
738  if (findNodeExt) {
739  const TransportAddress& stateRecipient =
740  findNodeExt->getSendStateTo();
741  if (!stateRecipient.isUnspecified()) {
743  PastryStateMessage* stateMsg =
747  -1,
748  findNodeExt->getJoinHopCount(),
749  false);
751  stateBytesSent += stateMsg->getByteLength());
752  sendMessageToUDP(stateRecipient, stateMsg);
753  }
754  if (incrHopCount) {
755  findNodeExt->setJoinHopCount(findNodeExt->getJoinHopCount() + 1);
756  }
757  }
758 }
759 
760 
762 {
763  // send "update" state message to all nodes who sent us their state
764  // during INIT, remove these from notifyList so they don't get our
765  // state twice
766  std::vector<TransportAddress>::iterator nListPos;
767  if (!stReceived.empty()) {
768  for (std::vector<PastryStateMsgHandle>::iterator it =
769  stReceived.begin(); it != stReceived.end(); ++it) {
770  PastryStateMessage* stateMsg =
772  it->msg->getTimestamp());
774  stateBytesSent += stateMsg->getByteLength());
775  sendMessageToUDP(it->msg->getSender(), stateMsg);
776 
777  nListPos = find(notifyList.begin(), notifyList.end(),
778  it->msg->getSender());
779  if (nListPos != notifyList.end()) {
780  notifyList.erase(nListPos);
781  }
782  delete it->msg;
783  delete it->prox;
784  }
785  stReceived.clear();
786  }
787 
788  // send a normal STATE message to all remaining known nodes
789  for (std::vector<TransportAddress>::iterator it =
790  notifyList.begin(); it != notifyList.end(); it++) {
791  if (*it != thisNode) {
792  PastryStateMessage* stateMsg =
795  stateBytesSent += stateMsg->getByteLength());
796  sendMessageToUDP(*it, stateMsg);
797  }
798  }
799  notifyList.clear();
800 
801  updateTooltip();
802 }
803 
805 {
806  getParentModule()->getParentModule()->bubble("entering SECOND STAGE");
807 
808  // probe nodes in local state
809  if (leafSet->isValid()) {
811 
812  PastryStateMsgHandle handle(stateMsg);
813 
814  if (!stateCache.msg) {
815  stateCache = handle;
816  processState();
817  } else {
818  stateCacheQueue.push(handle);
819  if (stateCacheQueue.size() > 15) {
820  delete stateCacheQueue.front().msg;
821  stateCacheQueue.pop();
822  EV << "[Pastry::doSecondStage() @ " << thisNode.getIp()
823  << " (" << thisNode.getKey().toString(16) << ")]\n"
824  << " stateCacheQueue full -> pop()" << endl;
825  }
827  prePing(stateMsg);
828  }
829  }
830  }
831 
832  // "second stage" for locality:
833  notifyList.clear();
836  sort(notifyList.begin(), notifyList.end());
837  notifyList.erase(unique(notifyList.begin(), notifyList.end()),
838  notifyList.end());
839  for (std::vector<TransportAddress>::iterator it = notifyList.begin();
840  it != notifyList.end(); it++) {
841  if (*it == thisNode) continue;
842 
843  EV << "[Pastry::doSecondStage() @ " << thisNode.getIp()
844  << " (" << thisNode.getKey().toString(16) << ")]\n"
845  << " second stage: requesting state from " << *it
846  << endl;
847 
848  RequestStateCall* call =
849  new RequestStateCall("REQUEST STATE Call");
850  call->setBitLength(PASTRYREQUESTREPAIRCALL_L(call));
852  stateReqBytesSent += call->getByteLength());
853  sendUdpRpcCall(*it, call);
854  }
855  notifyList.clear();
856 }
857 
858 
860 {
861  for (int i = 0; i < routingTable->getLastRow(); i++) {
862  const TransportAddress& ask4row = routingTable->getRandomNode(i);
863 
864  assert(!dynamic_cast<const NodeHandle&>(ask4row).getKey().isUnspecified());
865 
866  if ((!ask4row.isUnspecified()) && (ask4row != thisNode)) {
867  RequestRoutingRowCall* call =
868  new RequestRoutingRowCall("REQUEST ROUTING ROW Call");
870  call->setRow(i + 1);
871  call->setBitLength(PASTRYREQUESTROUTINGROWCALL_L(call));
873  routingTableRowReqBytesSent += call->getByteLength());
874  sendUdpRpcCall(ask4row, call);
875  } else {
876  EV << "[Pastry::doRoutingTableMaintenance() @ "
877  << thisNode.getIp()
878  << " (" << thisNode.getKey().toString(16) << ")]\n"
879  << " could not send Message to Node in Row" << i
880  << endl;
881  }
882  }
883 }
884 
885 
887 {
888  if (state != READY) return false;
889 
890  bool wasValid = leafSet->isValid();
891 
892  if (failed.isUnspecified())
893  opp_error("Pastry::handleFailedNode(): failed is unspecified!");
894 
895  const TransportAddress& lsAsk = leafSet->failedNode(failed);
896  const TransportAddress& rtAsk = routingTable->failedNode(failed);
897  neighborhoodSet->failedNode(failed);
898 
899  if (!lsAsk.isUnspecified()) {
900  newLeafs();
902  RequestRepairCall* call =
903  new RequestRepairCall("REQUEST REPAIR Call");
904  call->setBitLength(PASTRYREQUESTREPAIRCALL_L(call));
906  repairReqBytesSent += call->getByteLength());
907  sendUdpRpcCall(lsAsk, call);
908  } else {
909  RequestLeafSetCall* call =
910  new RequestLeafSetCall("REQUEST LEAFSET Call");
911  call->setBitLength(PASTRYREQUESTLEAFSETCALL_L(call));
913  leafsetReqBytesSent += call->getByteLength());
914  sendUdpRpcCall(lsAsk, call);
915  }
916  }
917  if (!rtAsk.isUnspecified() && (lsAsk.isUnspecified() || lsAsk != rtAsk)) {
918  RequestRepairCall* call =
919  new RequestRepairCall("REQUEST REPAIR Call");
920  call->setBitLength(PASTRYREQUESTREPAIRCALL_L(call));
921  RECORD_STATS(repairReqSent++; repairReqBytesSent += call->getByteLength());
922  sendUdpRpcCall(rtAsk, call);
923  }
924 
925  if (wasValid && lsAsk.isUnspecified() && (! leafSet->isValid())) {
926  EV << "[Pastry::handleFailedNode() @ " << thisNode.getIp()
927  << " (" << thisNode.getKey().toString(16) << ")]\n"
928  << " lost connection to the network, trying to re-join."
929  << endl;
930 
931  join();
932  return false;
933  }
934 
935  return true;
936 }
937 
938 
940 {
941  EV << "[Pastry::checkProxCache() @ " << thisNode.getIp()
942  << " (" << thisNode.getKey().toString(16) << ")]"
943  << endl;
944 
945  // no cached STATE message?
946  assert(stateCache.msg || !stateCache.prox);
947  if (!stateCache.msg) {
948  return;
949  }
950 
951  // no entries in stateCache.prox?
952  if (stateCache.prox->pr_rt.empty() &&
953  stateCache.prox->pr_ls.empty() &&
954  stateCache.prox->pr_ns.empty())
955  throw cRuntimeError("ERROR in Pastry: stateCache.prox empty!");
956 
957  // some entries not yet determined?
958  if ((find(stateCache.prox->pr_rt.begin(), stateCache.prox->pr_rt.end(),
960  (find(stateCache.prox->pr_ls.begin(), stateCache.prox->pr_ls.end(),
962  (find(stateCache.prox->pr_ns.begin(), stateCache.prox->pr_ns.end(),
964 
965  return;
966  }
967 
968  EV << "[Pastry::checkProxCache() @ " << thisNode.getIp()
969  << " (" << thisNode.getKey().toString(16) << ")]\n"
970  << " all proximities for current STATE message from "
971  << stateCache.msg->getSender().getIp()
972  << " collected!"
973  << endl;
974 
975  simtime_t now = simTime();
976 
977  if (state == JOIN) {
978  // save pointer to proximity vectors (it is NULL until now):
979  stReceivedPos->prox = stateCache.prox;
980 
981  // collected proximities for all STATE messages?
982  if (++stReceivedPos == stReceived.end()) {
983  EV << "[Pastry::checkProxCache() @ " << thisNode.getIp()
984  << " (" << thisNode.getKey().toString(16) << ")]\n"
985  << " proximities for all STATE messages collected!"
986  << endl;
987  stateCache.msg = NULL;
988  stateCache.prox = NULL;
989  if (debugOutput) {
990  EV << "[Pastry::checkProxCache() @ " << thisNode.getIp()
991  << " (" << thisNode.getKey().toString(16) << ")]\n"
992  << " [JOIN] starting to build own state from "
993  << stReceived.size() << " received state messages..."
994  << endl;
995  }
996  if (mergeState()) {
998  EV << "[Pastry::checkProxCache() @ " << thisNode.getIp()
999  << " (" << thisNode.getKey().toString(16) << ")]\n"
1000  << " changeState(READY) called"
1001  << endl;
1002  } else {
1003  EV << "[Pastry::checkProxCache() @ " << thisNode.getIp()
1004  << " (" << thisNode.getKey().toString(16) << ")]\n"
1005  << " Error initializing while joining! Restarting ..."
1006  << endl;
1007  joinOverlay();
1008  }
1009 
1010  } else {
1011  EV << "[Pastry::checkProxCache() @ " << thisNode.getIp()
1012  << " (" << thisNode.getKey().toString(16) << ")]\n"
1013  << " NOT all proximities for all STATE messages collected!"
1014  << endl;
1015 
1016  // process next state message in vector:
1017  if (stReceivedPos->msg == NULL)
1018  throw cRuntimeError("stReceivedPos->msg = NULL");
1020  if (stateCache.msg == NULL)
1021  throw cRuntimeError("msg = NULL");
1022  processState();
1023  }
1024  } else {
1025  // state == READY
1027  // try to repair routingtable based on repair message:
1028  const TransportAddress& askRt =
1030  if (! askRt.isUnspecified()) {
1031  RequestRepairCall* call =
1032  new RequestRepairCall("REQUEST REPAIR Call");
1033  call->setBitLength(PASTRYREQUESTREPAIRCALL_L(call));
1034  RECORD_STATS(repairReqSent++; repairReqBytesSent += call->getByteLength());
1035  sendUdpRpcCall(askRt, call);
1036  }
1037 
1038  // while not really known, it's safe to assume that a repair
1039  // message changed our state:
1040  lastStateChange = now;
1041  } else {
1042  if (stateCache.outdatedUpdate) {
1043  // send another STATE message on outdated state update:
1044  updateCounter++;
1046  } else {
1047  // merge info in own state tables
1048  // except leafset (was already handled in handleStateMessage)
1050  lastStateChange = now;
1051  EV << "[Pastry::checkProxCache() @ " << thisNode.getIp()
1052  << " (" << thisNode.getKey().toString(16) << ")]\n"
1053  << " Merging nodes into routing table"
1054  << endl;
1056  lastStateChange = now;
1057  EV << "[Pastry::checkProxCache() @ " << thisNode.getIp()
1058  << " (" << thisNode.getKey().toString(16) << ")]\n"
1059  << " Merged nodes into routing table"
1060  << endl;
1061  }
1062  }
1063  }
1064  updateTooltip();
1065 
1067  }
1068 }
1069 
1071 {
1072  // if state message was not an update, send one back:
1073  if (stateCache.msg &&
1075  (alwaysSendUpdate || lastStateChange == simTime()) &&
1077  thisNode != stateCache.msg->getSender()) {//hack
1078  PastryStateMessage* stateMsg =
1082  stateBytesSent += stateMsg->getByteLength());
1083 
1084  sendMessageToUDP(stateCache.msg->getSender(), stateMsg);
1085  }
1086 
1087  delete stateCache.msg;
1088  stateCache.msg = NULL;
1089  delete stateCache.prox;
1090  stateCache.prox = NULL;
1091 
1092  // process next queued message:
1093  if (! stateCacheQueue.empty()) {
1094  stateCache = stateCacheQueue.front();
1095  stateCacheQueue.pop();
1096  processState();
1097  }
1098 }
1099 
1100 
1102 {
1103  bool ret = true;
1104 
1105  if (state == JOIN) {
1106  // building initial state
1107  if (debugOutput) {
1108  EV << "[Pastry::mergeState() @ " << thisNode.getIp()
1109  << " (" << thisNode.getKey().toString(16) << ")]\n"
1110  << " [JOIN] starting to build own state from "
1111  << stReceived.size() << " received state messages..."
1112  << endl;
1113  }
1114  if (stateCache.msg &&
1116  if (debugOutput) {
1117  EV << "[Pastry::mergeState() @ " << thisNode.getIp()
1118  << " (" << thisNode.getKey().toString(16) << ")]\n"
1119  << " [JOIN] initializing NeighborhoodSet from "
1120  << stReceived.front().msg->getRow() << ". hop"
1121  << endl;
1122  }
1123  if (!neighborhoodSet->mergeState(stReceived.front().msg,
1124  stReceived.front().prox )) {
1125  EV << "[Pastry::mergeState() @ " << thisNode.getIp()
1126  << " (" << thisNode.getKey().toString(16) << ")]\n"
1127  << " Error initializing own neighborhoodSet"
1128  << " while joining! Restarting ..."
1129  << endl;
1130  ret = false;
1131  }
1132  }
1133  if (debugOutput) {
1134  EV << "[Pastry::mergeState() @ " << thisNode.getIp()
1135  << " (" << thisNode.getKey().toString(16) << ")]\n"
1136  << " [JOIN] initializing LeafSet from "
1137  << stReceived.back().msg->getRow() << ". hop"
1138  << endl;
1139  }
1140 
1141  if (!leafSet->mergeState(stReceived.back().msg,
1142  stReceived.back().prox )) {
1143  EV << "[Pastry::mergeState() @ " << thisNode.getIp()
1144  << " (" << thisNode.getKey().toString(16) << ")]\n"
1145  << " Error initializing own leafSet while joining!"
1146  << " Restarting ..."
1147  << endl;
1148 
1149  ret = false;
1150  } else {
1151  newLeafs();
1152  }
1153  if (debugOutput) {
1154  EV << "[Pastry::mergeState() @ " << thisNode.getIp()
1155  << " (" << thisNode.getKey().toString(16) << ")]\n"
1156  << " [JOIN] initializing RoutingTable from all hops"
1157  << endl;
1158  }
1159 
1160  assert(!stateCache.msg ||
1162 
1164  EV << "[Pastry::mergeState() @ " << thisNode.getIp()
1165  << " (" << thisNode.getKey().toString(16) << ")]\n"
1166  << " Error initializing own routingTable while joining!"
1167  << " Restarting ..."
1168  << endl;
1169 
1170  ret = false;
1171  }
1172  } else if (state == READY) {
1173  // merging single state (stateCache.msg)
1176  ret = false;
1177  }
1178  if (!leafSet->mergeState(stateCache.msg, NULL)) {
1179  ret = false;
1180  } else {
1181  newLeafs();
1182  }
1183  if (!routingTable->mergeState(stateCache.msg, NULL)) {
1184  ret = false;
1185  }
1186  }
1187 
1188  if (ret) lastStateChange = simTime();
1189  return ret;
1190 }
1191 
1192 
1194 {
1195  if (debugOutput) {
1196  EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
1197  << " (" << thisNode.getKey().toString(16) << ")]\n"
1198  << " new STATE message to process "
1199  << static_cast<void*>(msg) << " in state " <<
1200  ((state == READY)?"READY":((state == JOIN)?"JOIN":"INIT"))
1201  << endl;
1202  if (state == JOIN) {
1203  EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
1204  << " (" << thisNode.getKey().toString(16) << ")]\n"
1205  << " *** own joinHopCount: " << joinHopCount << endl
1206  << " *** already received: " << stReceived.size() << endl
1207  << " *** last-hop flag: "
1208  << (msg->getLastHop() ? "true" : "false") << endl
1209  << " *** msg joinHopCount: "
1210  << msg->getRow() << endl;
1211  }
1212  }
1213  if (state == INIT || state == DISCOVERY) {
1214  EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
1215  << " (" << thisNode.getKey().toString(16) << ")]\n"
1216  << " can't handle state messages until at least reaching JOIN state."
1217  << endl;
1218  delete msg;
1219  return;
1220  }
1221 
1222  PastryStateMsgHandle handle(msg);
1223 
1224  // in JOIN state, store all received state Messages, need them later:
1225  if (state == JOIN) {
1226  if (!(msg->getPastryStateMsgType() &
1228  delete msg;
1229  return;
1230  }
1231 
1232  if (joinHopCount && stReceived.size() == joinHopCount) {
1233  EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
1234  << " (" << thisNode.getKey().toString(16) << ")]\n"
1235  << " Warning: dropping state message received after "
1236  << "all needed state messages were collected in JOIN state."
1237  << endl;
1238  delete msg;
1239  return;
1240  }
1241 
1242  stReceived.push_back(handle);
1244 
1245  if (msg->getLastHop()) {
1246  if (joinHopCount) {
1247  EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
1248  << " (" << thisNode.getKey().toString(16) << ")]\n"
1249  << " Error: received a second `last' state message! Restarting ..."
1250  << endl;
1251 
1252  joinOverlay();
1253  return;
1254  }
1255 
1256  joinHopCount = msg->getRow();
1257 
1258  if (stReceived.size() < joinHopCount) {
1259  // some states still missing:
1260  cancelEvent(readyWait);
1261  scheduleAt(simTime() + readyWaitAmount, readyWait);
1262  }
1263  }
1264 
1265  if (joinHopCount) {
1266  if (stReceived.size() > joinHopCount) {
1267  EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
1268  << " (" << thisNode.getKey().toString(16) << ")]\n"
1269  << " Error: too many state messages received in JOIN state! ("
1270  << stReceived.size() << " > " << joinHopCount << ") Restarting ..."
1271  << endl;
1272 
1273  joinOverlay();
1274  return;
1275  }
1276  if (stReceived.size() == joinHopCount) {
1277  // all state messages are here, sort by hopcount:
1278  sort(stReceived.begin(), stReceived.end(),
1280 
1281  // start pinging the nodes found in the first state message:
1282  stReceivedPos = stReceived.begin();
1284  EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
1285  << " (" << thisNode.getKey().toString(16) << ")]\n"
1286  << " have all STATE messages, now pinging nodes."
1287  << endl;
1289  pingNodes();
1290  } else {
1291  mergeState();
1292  stateCache.msg = NULL;
1293  changeState(READY);
1294 
1295  EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
1296  << " (" << thisNode.getKey().toString(16) << ")]\n"
1297  << " changeState(READY) called"
1298  << endl;
1299  }
1300 
1301  // cancel timeout:
1302  if (readyWait->isScheduled()) cancelEvent(readyWait);
1303  } else {
1304  //TODO occasionally, here we got a wrong hop count in
1305  // iterative mode due to more than one it. lookup during join
1306  // procedure
1307  EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
1308  << " (" << thisNode.getKey().toString(16) << ")]\n"
1309  << " Still need some STATE messages."
1310  << endl;
1311  }
1312 
1313  }
1314  return;
1315  }
1316 
1317  if (debugOutput) {
1318  EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
1319  << " (" << thisNode.getKey().toString(16) << ")]\n"
1320  << " handling STATE message"
1321  << endl;
1322  EV << " type: " << ((msg->getPastryStateMsgType()
1323  == PASTRY_STATE_UPDATE) ? "update"
1324  :"standard")
1325  << endl;
1327  EV << " msg timestamp: " <<
1328  msg->getTimestamp() << endl;
1329  EV << " last state change: " <<
1330  lastStateChange << endl;
1331  }
1332  }
1333 
1334  if (((msg->getPastryStateMsgType() == PASTRY_STATE_UPDATE))
1335  && (msg->getTimestamp() <= lastStateChange)) {
1336  // if we received an update based on our outdated state,
1337  // mark handle for retrying later:
1338  EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
1339  << " (" << thisNode.getKey().toString(16) << ")]\n"
1340  << " outdated state from " << msg->getSender()
1341  << endl;
1342  handle.outdatedUpdate = true;
1343  }
1344 
1345  // determine aliveTable to prevent leafSet from merging nodes that are
1346  // known to be dead:
1347  determineAliveTable(msg);
1348 
1350  // try to repair leafset based on repair message right now
1351  const TransportAddress& askLs = leafSet->repair(msg, &aliveTable);
1352  if (! askLs.isUnspecified()) {
1353  //sendRequest(askLs, PASTRY_REQ_REPAIR);
1354  RequestRepairCall* call =
1355  new RequestRepairCall("REQUEST REPAIR Call");
1356  call->setBitLength(PASTRYREQUESTREPAIRCALL_L(call));
1357  RECORD_STATS(repairReqSent++; repairReqBytesSent += call->getByteLength());
1358  sendUdpRpcCall(askLs, call);
1359  }
1360 
1361  // while not really known, it's safe to assume that a repair
1362  // message changed our state:
1363  lastStateChange = simTime();
1364  newLeafs();
1365  } else if (leafSet->mergeState(msg, &aliveTable)) {
1366  // merged state into leafset right now
1367  lastStateChange = simTime();
1368  newLeafs();
1369  updateTooltip();
1370  }
1371  // in READY state, only ping nodes to get proximity metric:
1372  if (!stateCache.msg) {
1373  // no state message is processed right now, start immediately:
1374  assert(stateCache.prox == NULL);
1375  stateCache = handle;
1376  processState();
1377  } else {
1380  // enqueue message for later processing:
1381  stateCacheQueue.push(handle);
1382  if (stateCacheQueue.size() > 15) {
1383  delete stateCacheQueue.front().msg;
1384  stateCacheQueue.pop();
1385  EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
1386  << " (" << thisNode.getKey().toString(16) << ")]\n"
1387  << " stateCacheQueue full -> pop()" << endl;
1388  }
1389  prePing(msg);
1390  } else {
1391  bool temp = true;
1392  if (!neighborhoodSet->mergeState(msg, NULL)) {
1393  temp = false;
1394  }
1395  if (!leafSet->mergeState(msg, NULL)) {
1396  temp = false;
1397  } else {
1398  newLeafs();
1399  }
1400  if (!routingTable->mergeState(msg, NULL)) {
1401  temp = false;
1402  }
1403  if (temp) lastStateChange = simTime();
1404  delete msg;
1405  }
1406  }
1407 }
1408 
1409 
1411 {
1412  EV << "[Pastry::processState() @ " << thisNode.getIp()
1413  << " (" << thisNode.getKey().toString(16) << ")]\n"
1414  << " new \""
1415  << std::string(cEnum::find("PastryStateMsgType")
1416  ->getStringFor(stateCache.msg->getPastryStateMsgType())).erase(0, 13)
1417  << "\" STATE message " << static_cast<void*>(stateCache.msg)
1418  << " from " << stateCache.msg->getSender().getIp() << " to process "
1419  << endl;
1420 
1423  pingNodes();
1424  } else {
1425  mergeState();
1427  }
1428 }