43 stabilize_timer = fixfingers_timer = join_timer = NULL;
55 if (iterativeLookupConfig.merge ==
true) {
56 throw cRuntimeError(
"Chord::initializeOverlay(): "
57 "Chord doesn't work with iterativeLookupConfig.merge = true!");
64 useCommonAPIforward = par(
"useCommonAPIforward");
65 successorListSize = par(
"successorListSize");
66 joinRetry = par(
"joinRetry");
67 stabilizeRetry = par(
"stabilizeRetry");
68 joinDelay = par(
"joinDelay");
69 stabilizeDelay = par(
"stabilizeDelay");
70 fixfingersDelay = par(
"fixfingersDelay");
71 checkPredecessorDelay = par(
"checkPredecessorDelay");
72 aggressiveJoinMode = par(
"aggressiveJoinMode");
73 extendedFingerTable = par(
"extendedFingerTable");
74 numFingerCandidates = par(
"numFingerCandidates");
75 proximityRouting = par(
"proximityRouting");
76 memorizeFailedSuccessor = par(
"memorizeFailedSuccessor");
79 mergeOptimizationL1 = par(
"mergeOptimizationL1");
80 mergeOptimizationL2 = par(
"mergeOptimizationL2");
81 mergeOptimizationL3 = par(
"mergeOptimizationL3");
82 mergeOptimizationL4 = par(
"mergeOptimizationL4");
85 missingPredecessorStabRequests = 0;
92 newsuccessorhintCount = 0;
94 stabilizeBytesSent = 0;
96 fixfingersBytesSent = 0;
97 newsuccessorhintBytesSent = 0;
105 WATCH(predecessorNode);
107 WATCH(bootstrapNode);
109 WATCH(missingPredecessorStabRequests);
112 join_timer =
new cMessage(
"join_timer");
113 stabilize_timer =
new cMessage(
"stabilize_timer");
114 fixfingers_timer =
new cMessage(
"fixfingers_timer");
115 checkPredecessor_timer =
new cMessage(
"checkPredecessor_timer");
122 cancelAndDelete(join_timer);
123 cancelAndDelete(stabilize_timer);
124 cancelAndDelete(fixfingers_timer);
125 cancelAndDelete(checkPredecessor_timer);
139 Enter_Method_Silent();
150 call, NULL, routingType, joinDelay);
164 setOverlayReady(
false);
170 initializeFriendModules();
176 EV <<
"[Chord::changeState() @ " << thisNode.getIp()
177 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
178 <<
" Entered INIT stage"
182 getParentModule()->getParentModule()->bubble(
"Enter INIT state.");
189 cancelEvent(join_timer);
193 scheduleAt(simTime(), join_timer);
197 EV <<
"[Chord::changeState() @ " << thisNode.getIp()
198 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
199 <<
" Entered JOIN stage"
202 getParentModule()->getParentModule()->bubble(
"Enter JOIN state.");
205 bootstrapNode = bootstrapList->getBootstrapNode(overlayId);
208 if (bootstrapNode.isUnspecified()) {
210 assert(predecessorNode.isUnspecified());
211 bootstrapNode = thisNode;
220 setOverlayReady(
true);
223 cancelEvent(stabilize_timer);
224 scheduleAt(simTime() + stabilizeDelay, stabilize_timer);
227 cancelEvent(fixfingers_timer);
228 scheduleAt(simTime() + fixfingersDelay,
232 cancelEvent(checkPredecessor_timer);
233 if (checkPredecessorDelay > 0) {
234 scheduleAt(simTime() + checkPredecessorDelay,
235 checkPredecessor_timer);
240 EV <<
"[Chord::changeState() @ " << thisNode.getIp()
241 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
242 <<
" Entered READY stage"
245 getParentModule()->getParentModule()->bubble(
"Enter READY state.");
254 if (msg == join_timer) {
255 handleJoinTimerExpired(msg);
258 else if (msg == stabilize_timer) {
259 handleStabilizeTimerExpired(msg);
262 else if (msg == fixfingers_timer) {
263 handleFixFingersTimerExpired(msg);
266 else if (msg == checkPredecessor_timer) {
267 cancelEvent(checkPredecessor_timer);
268 scheduleAt(simTime() + checkPredecessorDelay,
269 checkPredecessor_timer);
270 if (!predecessorNode.isUnspecified()) pingNode(predecessorNode);
274 error(
"Chord::handleTimerEvent(): received self message of "
285 handleNewSuccessorHint(chordMsg);
288 error(
"handleUDPMessage(): Unknown message type!");
298 if (state != READY) {
299 EV <<
"[Chord::handleRpcCall() @ " << thisNode.getIp()
300 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
301 <<
" Received RPC call and state != READY"
319 cPolymorphic* context,
int rpcId,
324 handleRpcJoinResponse(_JoinResponse);
325 EV <<
"[Chord::handleRpcResponse() @ " << thisNode.getIp()
326 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
327 <<
" Received a Join RPC Response: id=" << rpcId <<
"\n"
328 <<
" msg=" << *_JoinResponse <<
" rtt=" << rtt
333 handleRpcNotifyResponse(_NotifyResponse);
334 EV <<
"[Chord::handleRpcResponse() @ " << thisNode.getIp()
335 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
336 <<
" Received a Notify RPC Response: id=" << rpcId <<
"\n"
337 <<
" msg=" << *_NotifyResponse <<
" rtt=" << rtt
342 handleRpcStabilizeResponse(_StabilizeResponse);
343 EV <<
"[Chord::handleRpcResponse() @ " << thisNode.getIp()
344 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
345 <<
" Received a Stabilize RPC Response: id=" << rpcId <<
"\n"
346 <<
" msg=" << *_StabilizeResponse <<
" rtt=" << rtt
351 handleRpcFixfingersResponse(_FixfingersResponse, SIMTIME_DBL(rtt));
352 EV <<
"[Chord::handleRpcResponse() @ " << thisNode.getIp()
353 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
354 <<
" Received a Fixfingers RPC Response: id=" << rpcId <<
"\n"
355 <<
" msg=" << *_FixfingersResponse <<
" rtt=" << rtt
364 cPolymorphic* context,
int rpcId,
369 EV <<
"[Chord::handleRpcTimeout() @ " << thisNode.getIp()
370 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
371 <<
" FindNode RPC Call timed out: id=" << rpcId <<
"\n"
372 <<
" msg=" << *_FindNodeCall
377 EV <<
"[Chord::handleRpcTimeout() @ " << thisNode.getIp()
378 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
379 <<
" Join RPC Call timed out: id=" << rpcId <<
"\n"
380 <<
" msg=" << *_JoinCall
385 EV <<
"[Chord::handleRpcTimeout() @ " << thisNode.getIp()
386 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
387 <<
" Notify RPC Call timed out: id=" << rpcId <<
"\n"
388 <<
" msg=" << *_NotifyCall
390 if (!handleFailedNode(dest)) join();
394 EV <<
"[Chord::handleRpcTimeout() @ " << thisNode.getIp()
395 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
396 <<
" Stabilize RPC Call timed out: id=" << rpcId <<
"\n"
397 <<
" msg=" << *_StabilizeCall
399 if (!handleFailedNode(dest)) join();
403 EV <<
"[Chord::handleRpcTimeout() @ " << thisNode.getIp()
404 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
405 <<
" Fixfingers RPC Call timed out: id=" << rpcId <<
"\n"
406 <<
" msg=" << *_FixfingersCall
415 return successorListSize;
420 return extendedFingerTable ? numFingerCandidates : 1;
430 error(
"Chord::isSiblingFor(): key is unspecified!");
432 if (state != READY) {
437 if (numSiblings > getMaxNumSiblings()) {
438 opp_error(
"Chord::isSiblingFor(): numSiblings too big!");
441 if (numSiblings == -1) numSiblings = getMaxNumSiblings();
444 if ((predecessorNode.isUnspecified()) && (node == thisNode)) {
445 if (successorList->isEmpty() || (node.
getKey() == key)) {
454 if ((node == thisNode)
455 && (key.
isBetweenR(predecessorNode.getKey(), thisNode.getKey()))) {
464 for (
int i = -1; i < (int)successorList->getSize();
465 i++, prevNode = curNode) {
470 curNode = successorList->getSuccessor(i);
473 if (node == curNode) {
476 if (numSiblings <= ((
int)successorList->getSize() - i)) {
486 if (numSiblings <= 1) {
506 Enter_Method_Silent();
508 if (!predecessorNode.isUnspecified() && failed == predecessorNode)
514 if (successorList->handleFailedNode(failed))
517 if (fingerTable != NULL)
518 fingerTable->handleFailedNode(failed);
523 if ((!predecessorNode.isUnspecified()) &&
524 oldSuccessor == predecessorNode) {
526 callUpdate(predecessorNode,
false);
529 if (failed == oldSuccessor) {
531 if (memorizeFailedSuccessor) {
532 failedSuccessor = oldSuccessor;
534 cancelEvent(stabilize_timer);
535 scheduleAt(simTime(), stabilize_timer);
538 if (state != READY)
return true;
540 if (successorList->isEmpty()) {
543 cancelEvent(stabilize_timer);
544 cancelEvent(fixfingers_timer);
547 return !(successorList->isEmpty());
551 int numRedundantNodes,
561 if (successorList->isEmpty() && !predecessorNode.isUnspecified()) {
562 throw new cRuntimeError(
"Chord: Node is READY, has a "
563 "predecessor but no successor!");
571 nextHop->push_back(thisNode);
575 else if (isSiblingFor(thisNode, key, 1, &err)) {
577 nextHop->push_back(thisNode);
578 for (uint32_t i = 0; i < successorList->getSize(); i++) {
579 nextHop->push_back(successorList->getSuccessor(i));
586 successorList->getSuccessor().getKey())) {
588 for (uint32_t i = 0; i < successorList->getSize(); i++) {
589 nextHop->push_back(successorList->getSuccessor(i));
596 nextHop = closestPreceedingNode(key);
609 for (
int j = successorList->getSize() - 1; j >= 0; j--) {
611 if (successorList->getSuccessor(j).getKey().isBetweenR(thisNode.getKey(), key)) {
612 tempHandle = successorList->getSuccessor(j);
618 std::stringstream temp;
619 temp <<
"Chord::closestPreceedingNode(): Successor list broken "
620 << thisNode.
getKey() <<
" " << key;
621 throw cRuntimeError(temp.str().c_str());
626 for (
int i = fingerTable->getSize() - 1; i >= 0; i--) {
628 if (fingerTable->getFinger(i).getKey().isBetweenLR(tempHandle.
getKey(), key)) {
629 if(!extendedFingerTable) {
631 nextHop->push_back(fingerTable->getFinger(i));
633 EV <<
"[Chord::closestPreceedingNode() @ " << thisNode.getIp()
634 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
635 <<
" ClosestPreceedingNode: node " << thisNode
636 <<
" for key " << key <<
"\n"
637 <<
" finger " << fingerTable->getFinger(i).getKey()
639 <<
" " << tempHandle.
getKey()
643 return fingerTable->getFinger(i, key);
649 EV <<
"[Chord::closestPreceedingNode() @ " << thisNode.getIp()
650 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
651 <<
" No finger found"
655 for (
int i = successorList->getSize() - 1; i >= 0
656 && nextHop->size() <= numFingerCandidates ; i--) {
657 if (successorList->getSuccessor(i).getKey().isBetween(thisNode.getKey(), key)) {
658 nextHop->push_back(successorList->getSuccessor(i));
662 if (nextHop->size() != 0) {
667 if ((predecessorNode.isUnspecified()) &&
668 (successorList->getSuccessor() == thisNode)) {
669 nextHop->push_back(thisNode);
674 throw cRuntimeError(
"Error in Chord::closestPreceedingNode()!");
682 innerMsg->getEncapsulatedPacket() != NULL) {
693 newsuccessorhintBytesSent += msg->getByteLength());
700 if ((dynamic_cast<StabilizeCall*>(innerMsg) != NULL) ||
703 msg->getByteLength());
704 }
else if ((dynamic_cast<NotifyCall*>(innerMsg) != NULL) ||
707 msg->getByteLength());
708 }
else if ((dynamic_cast<FixfingersCall*>(innerMsg) != NULL) ||
711 msg->getByteLength());
712 }
else if ((dynamic_cast<JoinCall*>(innerMsg) != NULL) ||
714 RECORD_STATS(joinCount++; joinBytesSent += msg->getByteLength());
723 throw cRuntimeError(
"Unknown message type!");
731 bootstrapList->removeBootstrapNode(thisNode);
733 simtime_t time = globalStatistics->calcMeasuredLifetime(creationTime);
736 globalStatistics->addStdDev(
"Chord: Sent JOIN Messages/s",
738 globalStatistics->addStdDev(
"Chord: Sent NEWSUCCESSORHINT Messages/s",
739 newsuccessorhintCount / time);
740 globalStatistics->addStdDev(
"Chord: Sent STABILIZE Messages/s",
741 stabilizeCount / time);
742 globalStatistics->addStdDev(
"Chord: Sent NOTIFY Messages/s",
744 globalStatistics->addStdDev(
"Chord: Sent FIX_FINGERS Messages/s",
745 fixfingersCount / time);
746 globalStatistics->addStdDev(
"Chord: Sent JOIN Bytes/s",
747 joinBytesSent / time);
748 globalStatistics->addStdDev(
"Chord: Sent NEWSUCCESSORHINT Bytes/s",
749 newsuccessorhintBytesSent / time);
750 globalStatistics->addStdDev(
"Chord: Sent STABILIZE Bytes/s",
751 stabilizeBytesSent / time);
752 globalStatistics->addStdDev(
"Chord: Sent NOTIFY Bytes/s",
753 notifyBytesSent / time);
754 globalStatistics->addStdDev(
"Chord: Sent FIX_FINGERS Bytes/s",
755 fixfingersBytesSent / time);
772 if (joinRetry == 0) {
773 joinRetry = par(
"joinRetry");
786 sendRouteRpcCall(
OVERLAY_COMP, bootstrapNode, thisNode.getKey(),
787 call, NULL, routingType, joinDelay);
790 cancelEvent(join_timer);
791 scheduleAt(simTime() + joinDelay, msg);
801 if ((checkPredecessorDelay == 0) &&
802 (missingPredecessorStabRequests >= stabilizeRetry)) {
807 missingPredecessorStabRequests = 0;
809 callUpdate(predecessorNode,
false);
812 if (!successorList->isEmpty()) {
817 sendUdpRpcCall(successorList->getSuccessor(), call);
819 missingPredecessorStabRequests++;
823 if (mergeOptimizationL4) {
825 for (uint32_t nextFinger = 0; nextFinger < thisNode.getKey().getLength();
830 if (offset > successorList->getSuccessor().getKey() - thisNode.getKey()) {
831 if ((fingerTable->getFinger(nextFinger)).isUnspecified()) {
834 pingNode(fingerTable->getFinger(nextFinger), -1, 0, NULL,
835 NULL, NULL, nextFinger);
842 cancelEvent(stabilize_timer);
843 scheduleAt(simTime() + stabilizeDelay, msg);
849 if ((state != READY) || successorList->isEmpty())
853 for (uint32_t nextFinger = 0; nextFinger < thisNode.getKey().getLength();
857 lookupKey = thisNode.getKey() + offset;
860 if (offset > successorList->getSuccessor().getKey() - thisNode.getKey()) {
870 fingerTable->removeFinger(nextFinger);
875 cancelEvent(fixfingers_timer);
876 scheduleAt(simTime() + fixfingersDelay, msg);
889 if (predecessor.getKey().isBetween(thisNode.getKey(),
890 successorList->getSuccessor().getKey())
891 || (thisNode.getKey() == successorList->getSuccessor().getKey())) {
893 successorList->addSuccessor(predecessor);
899 if (mergeOptimizationL3) {
900 if (successorList->getSuccessor() == predecessor) {
904 sendUdpRpcCall(predecessor, call);
906 if (successorList->getSuccessor() == newSuccessorHintMsg->
912 sendUdpRpcCall(predecessor, call);
927 int sucNum = successorList->getSize();
931 for (
int k = 0; k < sucNum; k++) {
932 joinResponse->
setSucNode(k, successorList->getSuccessor(k));
936 if (predecessorNode.isUnspecified() && successorList->isEmpty()) {
945 sendRpcResponse(joinCall, joinResponse);
947 if (aggressiveJoinMode) {
956 if (!predecessorNode.isUnspecified()) {
963 newSuccessorHintMsg->
966 sendMessageToUDP(predecessorNode, newSuccessorHintMsg);
969 if (predecessorNode.isUnspecified() || (predecessorNode != requestor)) {
972 predecessorNode = requestor;
976 callUpdate(oldPredecessor,
false);
978 callUpdate(predecessorNode,
true);
984 if (successorList->isEmpty())
985 successorList->addSuccessor(requestor);
993 int sucNum = successorListSize - 1;
995 if (joinResponse->
getSucNum() < successorListSize - 1) {
1000 for (
int k = 0; k < sucNum; k++) {
1002 successorList->addSuccessor(successor);
1006 successorList->addSuccessor(joinResponse->
getSrcNode());
1010 if (aggressiveJoinMode) {
1014 if (!predecessorNode.isUnspecified()) {
1018 if (mergeOptimizationL2) {
1024 newSuccessorHintMsg->
1027 sendMessageToUDP(predecessorNode, newSuccessorHintMsg);
1032 predecessorNode = joinResponse->
getPreNode();
1036 && oldPredecessor != joinResponse->
getPreNode()) {
1037 callUpdate(oldPredecessor,
false);
1039 callUpdate(predecessorNode,
true);
1048 cancelEvent(stabilize_timer);
1049 scheduleAt(simTime(), stabilize_timer);
1052 cancelEvent(fixfingers_timer);
1053 scheduleAt(simTime(), fixfingers_timer);
1060 if (!predecessorNode.isUnspecified() &&
1062 missingPredecessorStabRequests = 0;
1068 stabilizeResponse->
setPreNode(predecessorNode);
1071 sendRpcResponse(call, stabilizeResponse);
1076 if (state != READY) {
1084 if ((successorList->isEmpty() ||
1086 successorList->getSuccessor().getKey())) &&
1087 (failedSuccessor.isUnspecified() || failedSuccessor != predecessor)) {
1088 if (successorList->isEmpty() && predecessor.
isUnspecified()) {
1091 successorList->addSuccessor(stabilizeResponse->
getSrcNode());
1094 successorList->addSuccessor(predecessor);
1105 sendUdpRpcCall(successorList->getSuccessor(), notifyCall);
1111 if (!predecessorNode.isUnspecified() &&
1113 missingPredecessorStabRequests = 0;
1116 bool newPredecessorSet =
false;
1121 if (predecessorNode.isUnspecified() ||
1122 newPredecessor.
getKey().
isBetween(predecessorNode.getKey(), thisNode.getKey()) ||
1124 call->
getFailed() == predecessorNode)) {
1126 if ((predecessorNode.isUnspecified()) ||
1127 (newPredecessor != predecessorNode)) {
1131 predecessorNode = newPredecessor;
1133 if (successorList->isEmpty()) {
1134 successorList->addSuccessor(newPredecessor);
1137 newPredecessorSet =
true;
1142 callUpdate(oldPredecessor,
false);
1144 callUpdate(predecessorNode,
true);
1147 if (mergeOptimizationL1) {
1154 newSuccessorHintMsg->
setPreNode(predecessorNode);
1155 newSuccessorHintMsg->
1157 sendMessageToUDP(oldPredecessor, newSuccessorHintMsg);
1168 int sucNum = successorList->getSize();
1174 if (mergeOptimizationL3) {
1175 if (!newPredecessorSet && (predecessorNode != newPredecessor)) {
1184 for (
int k = 0; k < sucNum; k++) {
1185 notifyResponse->
setSucNode(k, successorList->getSuccessor(k));
1190 sendRpcResponse(call, notifyResponse);
1196 if (state != READY) {
1200 if (successorList->getSuccessor() != notifyResponse->
getSrcNode()) {
1201 EV <<
"[Chord::handleRpcNotifyResponse() @ " << thisNode.getIp()
1202 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
1203 <<
" The srcNode of the received NotifyResponse is not our "
1204 <<
" current successor"
1211 if (mergeOptimizationL3) {
1216 successorList->addSuccessor(notifyResponse->
getPreNode());
1217 if (successorList->getSuccessor() == notifyResponse->
getPreNode())
1218 sendUdpRpcCall(notifyResponse->
getPreNode(), call);
1224 successorList->updateList(notifyResponse);
1238 if (extendedFingerTable) {
1240 < numFingerCandidates + 1)
1241 ? successorList->getSize() + 1
1242 : numFingerCandidates + 1));
1243 for (
unsigned int i = 0;
1244 i < (((successorList->getSize()) < numFingerCandidates)
1245 ? (successorList->getSize()) : numFingerCandidates); i++) {
1247 assert(!successorList->getSuccessor(i).isUnspecified());
1249 successorList->getSuccessor(i));
1255 sendRpcResponse(call, fixfingersResponse);
1270 if (!extendedFingerTable) {
1271 fingerTable->setFinger(fixfingersResponse->
getFinger(),
1279 if (fixfingersResponse->
getSucNode(i) == thisNode)
1281 successors.insert(std::make_pair(MAXTIME,
1285 if (successors.size() == 0) {
1289 fingerTable->setFinger(fixfingersResponse->
getFinger(), successors);
1292 if (proximityRouting || globalParameters->getTopologyAdaptation()) {
1294 if (proximityRouting) {
1296 for (
unsigned int i = 0;
1301 if (fixfingersResponse->
getSucNode(i) == thisNode)
1307 neighborCache->getProx(fixfingersResponse->
getSucNode(i),
1312 fingerTable->removeFinger(fixfingersResponse->
getFinger());
1316 fingerTable->updateFinger(fixfingersResponse->
getFinger(),
1326 cPolymorphic *contextPointer,
Prox prox)
1330 handleFailedNode(node);
1338 int rpcId, simtime_t rtt)
1340 EV <<
"[Chord::pingResponse() @ " << thisNode.getIp()
1341 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
1342 <<
" Received a Ping RPC Response: id=" << rpcId <<
"\n"
1343 <<
" msg=" << *pingResponse <<
" rtt=" << rtt
1347 fingerTable->updateFinger(rpcId, pingResponse->
getSrcNode(), rtt);
1352 cPolymorphic* context,
int rpcId)
1354 EV <<
"[Chord::pingTimeout() @ " << thisNode.getIp()
1355 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
1356 <<
" Ping RPC timeout: id=" << rpcId << endl;
1359 handleFailedNode(dest);
1365 (getParentModule()->getSubmodule(
"fingerTable"));
1368 (getParentModule()->getSubmodule(
"successorList"));
1375 fingerTable->initializeTable(thisNode.getKey().getLength(), thisNode,
this);
1378 successorList->initializeList(par(
"successorListSize"), thisNode,
this);
1385 std::stringstream ttString;
1388 ttString << predecessorNode << endl << thisNode << endl
1389 << successorList->getSuccessor();
1391 getParentModule()->getParentModule()->getDisplayString().
1392 setTagArg(
"tt", 0, ttString.str().c_str());
1393 getParentModule()->getDisplayString().
1394 setTagArg(
"tt", 0, ttString.str().c_str());
1395 getDisplayString().setTagArg(
"tt", 0, ttString.str().c_str());
1398 showOverlayNeighborArrow(successorList->getSuccessor(),
true,
1399 "m=m,50,0,50,0;ls=red,1");
1400 showOverlayNeighborArrow(predecessorNode,
false,
1401 "m=m,50,100,50,100;ls=green,1");
1408 bool useAlternative)
const