32 #include <UDPAppBase.h>
33 #include <UDPSocket.h>
34 #include <IPAddressResolver.h>
35 #include <NotificationBoard.h>
59 globalNodeList = NULL;
60 underlayConfigurator = NULL;
61 notificationBoard = NULL;
62 globalParameters = NULL;
84 registerComp(getThisCompType(),
this);
92 notificationBoard = NotificationBoardAccess().get();
94 bootstrapList = check_and_cast<
BootstrapList*>(getParentModule()->
95 getParentModule()->getSubmodule(
"bootstrapList", 0));
97 udpGate = gate(
"udpIn");
98 appGate = gate(
"appIn");
101 overlayId = par (
"overlayId");
102 debugOutput = par(
"debugOutput");
103 collectPerHopDelay = par(
"collectPerHopDelay");
104 localPort = par(
"localPort");
105 hopCountMax = par(
"hopCountMax");
106 drawOverlayTopology = par(
"drawOverlayTopology");
107 rejoinOnFailure = par(
"rejoinOnFailure");
108 sendRpcResponseToLastHop = par(
"sendRpcResponseToLastHop");
109 dropFindNodeAttack = par(
"dropFindNodeAttack");
110 isSiblingAttack = par(
"isSiblingAttack");
111 invalidNodesAttack = par(
"invalidNodesAttack");
112 dropRouteMessageAttack = par(
"dropRouteMessageAttack");
113 measureAuthBlock = par(
"measureAuthBlock");
114 restoreContext = par(
"restoreContext");
120 std::string temp = par(
"routingType").stdstringValue();
121 if (temp ==
"iterative")
123 else if (temp ==
"exhaustive-iterative")
125 else if (temp ==
"semi-recursive")
127 else if (temp ==
"full-recursive")
129 else if (temp ==
"source-routing-recursive")
131 else throw cRuntimeError((std::string(
"Wrong routing type: ")
134 useCommonAPIforward = par(
"useCommonAPIforward");
135 routeMsgAcks = par(
"routeMsgAcks");
136 recNumRedundantNodes = par(
"recNumRedundantNodes");
137 recordRoute = par(
"recordRoute");
140 iterativeLookupConfig.redundantNodes = par(
"lookupRedundantNodes");
141 iterativeLookupConfig.parallelPaths = par(
"lookupParallelPaths");
142 iterativeLookupConfig.parallelRpcs = par(
"lookupParallelRpcs");
143 iterativeLookupConfig.verifySiblings = par(
"lookupVerifySiblings");
144 iterativeLookupConfig.majoritySiblings = par(
"lookupMajoritySiblings");
145 iterativeLookupConfig.merge = par(
"lookupMerge");
146 iterativeLookupConfig.failedNodeRpcs = par(
"lookupFailedNodeRpcs");
147 iterativeLookupConfig.strictParallelRpcs =
148 par(
"lookupStrictParallelRpcs");
149 iterativeLookupConfig.useAllParallelResponses =
150 par(
"lookupUseAllParallelResponses");
151 iterativeLookupConfig.newRpcOnEveryTimeout =
152 par(
"lookupNewRpcOnEveryTimeout");
153 iterativeLookupConfig.newRpcOnEveryResponse =
154 par(
"lookupNewRpcOnEveryResponse");
155 iterativeLookupConfig.finishOnFirstUnchanged =
156 par(
"lookupFinishOnFirstUnchanged");
157 iterativeLookupConfig.visitOnlyOnce =
158 par(
"lookupVisitOnlyOnce");
159 iterativeLookupConfig.acceptLateSiblings =
160 par(
"lookupAcceptLateSiblings");
162 recursiveLookupConfig.redundantNodes = par(
"lookupRedundantNodes");
163 recursiveLookupConfig.numRetries = 0;
167 bytesAppDataSent = 0;
168 numAppLookupSent = 0;
169 bytesAppLookupSent = 0;
170 numMaintenanceSent = 0;
171 bytesMaintenanceSent = 0;
172 numAppDataReceived = 0;
173 bytesAppDataReceived = 0;
174 numAppLookupReceived = 0;
175 bytesAppLookupReceived = 0;
176 numMaintenanceReceived = 0;
177 bytesMaintenanceReceived = 0;
178 numAppDataForwarded = 0;
179 bytesAppDataForwarded = 0;
180 numAppLookupForwarded = 0;
181 bytesAppLookupForwarded = 0;
182 numMaintenanceForwarded = 0;
183 bytesMaintenanceForwarded = 0;
184 bytesAuthBlockSent = 0;
189 bytesFindNodeSent = 0;
190 numFindNodeResponseSent = 0;
191 bytesFindNodeResponseSent = 0;
192 numFailedNodeSent = 0;
193 bytesFailedNodeSent = 0;
194 numFailedNodeResponseSent = 0;
195 bytesFailedNodeResponseSent = 0;
200 bytesInternalSent = 0;
201 numInternalReceived = 0;
202 bytesInternalReceived = 0;
204 WATCH(numAppDataSent);
205 WATCH(bytesAppDataSent);
206 WATCH(numAppLookupSent);
207 WATCH(bytesAppLookupSent);
208 WATCH(numMaintenanceSent);
209 WATCH(bytesMaintenanceSent);
210 WATCH(numAppDataReceived);
211 WATCH(bytesAppDataReceived);
212 WATCH(numAppLookupReceived);
213 WATCH(bytesAppLookupReceived);
214 WATCH(numMaintenanceReceived);
215 WATCH(bytesMaintenanceReceived);
216 WATCH(numAppDataForwarded);
217 WATCH(bytesAppDataForwarded);
218 WATCH(numAppLookupForwarded);
219 WATCH(bytesAppLookupForwarded);
220 WATCH(numMaintenanceForwarded);
221 WATCH(bytesMaintenanceForwarded);
225 WATCH(numFindNodeSent);
226 WATCH(bytesFindNodeSent);
227 WATCH(numFindNodeResponseSent);
228 WATCH(bytesFindNodeResponseSent);
229 WATCH(numFailedNodeSent);
230 WATCH(bytesFailedNodeSent);
231 WATCH(numFailedNodeResponseSent);
232 WATCH(bytesFailedNodeResponseSent);
236 if (isInSimpleMultiOverlayHost()) {
237 WATCH(numInternalSent);
238 WATCH(bytesInternalSent);
239 WATCH(numInternalReceived);
240 WATCH(bytesInternalReceived);
244 thisNode.setIp(IPAddressResolver().
245 addressOf(getParentModule()->getParentModule()));
249 internalReadyState =
false;
251 getDisplayString().setTagArg(
"i", 1,
"red");
252 globalNodeList->setOverlayReadyIcon(getThisNode(),
false);
255 bindToPort(localPort);
258 notificationBoard->subscribe(
this, NF_OVERLAY_TRANSPORTADDRESS_CHANGED);
259 notificationBoard->subscribe(
this, NF_OVERLAY_NODE_LEAVE);
260 notificationBoard->subscribe(
this, NF_OVERLAY_NODE_GRACEFUL_LEAVE);
263 if (drawOverlayTopology)
264 initVis(getParentModule()->getParentModule());
271 setTcpOut(gate(
"tcpOut"));
274 creationTime = simTime();
279 initializeOverlay(stage);
287 (getParentModule()->getParentModule()
288 ->getSubmodule(
"overlay", 0)->gate(
"appIn")
289 ->getNextGate()->getOwnerModule());
291 throw cRuntimeError(
"BaseOverlay.cc: "
292 "Couldn't obtain bootstrap gate");
309 simtime_t time = globalStatistics->calcMeasuredLifetime(creationTime);
313 if (collectPerHopDelay) {
314 std::ostringstream singleHopName;
317 for (
size_t i = 0; i < singleHopDelays.size();) {
318 hdrl = singleHopDelays[i++];
320 for (
size_t j = 1; j <= i; ++j) {
321 if (hdr->
count == 0)
continue;
322 singleHopName.str(
"");
323 singleHopName <<
"BaseOverlay: Average Delay in Hop "
325 globalStatistics->addStdDev(singleHopName.str(),
326 SIMTIME_DBL(hdr->
val / hdr->
count));
331 singleHopDelays.clear();
334 globalStatistics->addStdDev(
"BaseOverlay: Join Retries", joinRetries);
336 globalStatistics->addStdDev(
"BaseOverlay: Sent App Data Messages/s",
337 numAppDataSent / time);
338 globalStatistics->addStdDev(
"BaseOverlay: Sent App Data Bytes/s",
339 bytesAppDataSent / time);
340 if (isInSimpleMultiOverlayHost()) {
341 globalStatistics->addStdDev(
"BaseOverlay: Internal Sent Messages/s",
342 numInternalReceived / time);
343 globalStatistics->addStdDev(
"BaseOverlay: Internal Sent Bytes/s",
344 bytesInternalReceived / time);
346 globalStatistics->addStdDev(
"BaseOverlay: Sent App Lookup Messages/s",
347 numAppLookupSent / time);
348 globalStatistics->addStdDev(
"BaseOverlay: Sent App Lookup Bytes/s",
349 bytesAppLookupSent / time);
350 globalStatistics->addStdDev(
"BaseOverlay: Sent Maintenance Messages/s",
351 numMaintenanceSent / time);
352 globalStatistics->addStdDev(
"BaseOverlay: Sent Maintenance Bytes/s",
353 bytesMaintenanceSent / time);
355 globalStatistics->addStdDev(
"BaseOverlay: Sent Total Messages/s",
356 (numAppDataSent + numAppLookupSent +
357 numMaintenanceSent) / time);
358 globalStatistics->addStdDev(
"BaseOverlay: Sent Total Bytes/s",
359 (bytesAppDataSent + bytesAppLookupSent +
360 bytesMaintenanceSent) / time);
361 globalStatistics->addStdDev(
"BaseOverlay: Sent FindNode Messages/s",
362 numFindNodeSent / time);
363 globalStatistics->addStdDev(
"BaseOverlay: Sent FindNode Bytes/s",
364 bytesFindNodeSent / time);
366 globalStatistics->addStdDev(
"BaseOverlay: Sent FindNodeResponse Messages/s",
367 numFindNodeResponseSent / time);
368 globalStatistics->addStdDev(
"BaseOverlay: Sent FindNodeResponse Bytes/s",
369 bytesFindNodeResponseSent / time);
370 globalStatistics->addStdDev(
"BaseOverlay: Sent FailedNode Messages/s",
371 numFailedNodeSent / time);
372 globalStatistics->addStdDev(
"BaseOverlay: Sent FailedNode Bytes/s",
373 bytesFailedNodeSent / time);
374 globalStatistics->addStdDev(
"BaseOverlay: Sent FailedNodeResponse Messages/s",
375 numFailedNodeResponseSent / time);
376 globalStatistics->addStdDev(
"BaseOverlay: Sent FailedNodeResponse Bytes/s",
377 bytesFailedNodeResponseSent / time);
378 globalStatistics->addStdDev(
"BaseOverlay: Received App Data Messages/s",
379 numAppDataReceived / time);
380 globalStatistics->addStdDev(
"BaseOverlay: Received App Data Bytes/s",
381 bytesAppDataReceived / time);
382 if (isInSimpleMultiOverlayHost()) {
383 globalStatistics->addStdDev(
"BaseOverlay: Internal Received Messages/s",
384 numInternalReceived / time);
385 globalStatistics->addStdDev(
"BaseOverlay: Internal Received Bytes/s",
386 bytesInternalReceived / time);
388 globalStatistics->addStdDev(
"BaseOverlay: Received App Lookup Messages/s",
389 numAppLookupReceived / time);
390 globalStatistics->addStdDev(
"BaseOverlay: Received App Lookup Bytes/s",
391 bytesAppLookupReceived / time);
392 globalStatistics->addStdDev(
"BaseOverlay: Received Maintenance Messages/s",
393 numMaintenanceReceived / time);
394 globalStatistics->addStdDev(
"BaseOverlay: Received Maintenance Bytes/s",
395 bytesMaintenanceReceived / time);
397 globalStatistics->addStdDev(
"BaseOverlay: Received Total Messages/s",
398 (numAppDataReceived + numAppLookupReceived +
399 numMaintenanceReceived)/time);
400 globalStatistics->addStdDev(
"BaseOverlay: Received Total Bytes/s",
401 (bytesAppDataReceived + bytesAppLookupReceived +
402 bytesMaintenanceReceived)/time);
403 globalStatistics->addStdDev(
"BaseOverlay: Forwarded App Data Messages/s",
404 numAppDataForwarded / time);
405 globalStatistics->addStdDev(
"BaseOverlay: Forwarded App Data Bytes/s",
406 bytesAppDataForwarded / time);
407 globalStatistics->addStdDev(
"BaseOverlay: Forwarded App Lookup Messages/s",
408 numAppLookupForwarded / time);
409 globalStatistics->addStdDev(
"BaseOverlay: Forwarded App Lookup Bytes/s",
410 bytesAppLookupForwarded / time);
411 globalStatistics->addStdDev(
"BaseOverlay: Forwarded Maintenance Messages/s",
412 numMaintenanceForwarded / time);
413 globalStatistics->addStdDev(
"BaseOverlay: Forwarded Maintenance Bytes/s",
414 bytesMaintenanceForwarded / time);
415 globalStatistics->addStdDev(
"BaseOverlay: Forwarded Total Messages/s",
416 (numAppDataForwarded + numAppLookupForwarded +
417 numMaintenanceForwarded) / time);
418 globalStatistics->addStdDev(
"BaseOverlay: Forwarded Total Bytes/s",
419 (bytesAppDataForwarded + bytesAppLookupForwarded +
420 bytesMaintenanceForwarded) / time);
422 globalStatistics->addStdDev(
"BaseOverlay: Dropped Messages/s",
424 globalStatistics->addStdDev(
"BaseOverlay: Dropped Bytes/s",
425 bytesDropped / time);
427 globalStatistics->addStdDev(
"BaseOverlay: Measured Session Time",
428 SIMTIME_DBL(simTime() - creationTime));
430 globalStatistics->addStdDev(
"BaseOverlay: Sent Ping Messages/s",
432 globalStatistics->addStdDev(
"BaseOverlay: Sent Ping Bytes/s",
433 bytesPingSent / time);
434 globalStatistics->addStdDev(
"BaseOverlay: Sent Ping Response Messages/s",
435 numPingResponseSent / time);
436 globalStatistics->addStdDev(
"BaseOverlay: Sent Ping Response Bytes/s",
437 bytesPingResponseSent / time);
439 if (getMeasureAuthBlock()) {
440 globalStatistics->addStdDev(
"BaseOverlay: Sent AuthBlock Bytes/s",
441 bytesAuthBlockSent / time);
455 return globalNodeList->isMalicious(getThisNode());
468 EV <<
"[BaseOverlay::bindToPort() @ " << thisNode.getIp()
469 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
470 <<
" Binding to UDP port " << port
473 thisNode.setPort(port);
477 cMessage *msg =
new cMessage(
"UDP_C_BIND", UDP_C_BIND);
478 socketId = UDPSocket::generateSocketId();
479 UDPControlInfo *ctrl =
new UDPControlInfo();
480 ctrl->setSrcPort(port);
481 ctrl->setSockId(socketId);
482 msg->setControlInfo(ctrl);
502 if (appDataMsg != NULL) {
503 overlayCtrlInfo->
setSrcComp(appDataMsg->getSrcComp());
504 overlayCtrlInfo->
setDestComp(appDataMsg->getDestComp());
507 deliverMsg->setControlInfo(overlayCtrlInfo);
509 deliverMsg->encapsulate(msg->decapsulate());
512 cGate* destGate = getCompRpcGate(static_cast<CompType>(
515 if (destGate == NULL) {
516 throw cRuntimeError(
"BaseOverlay::callDeliver(): Unknown destComp!");
519 sendDirect(deliverMsg, destGate);
531 forwardMsg->encapsulate(msg->getEncapsulatedPacket()->decapsulate());
539 overlayCtrlInfo->
setSrcComp(check_and_cast<BaseAppDataMessage*>
540 (msg->getEncapsulatedPacket())->getSrcComp());
541 overlayCtrlInfo->
setDestComp(check_and_cast<BaseAppDataMessage*>
542 (msg->getEncapsulatedPacket())->getDestComp());
544 if (msg->getControlInfo() != NULL) {
553 forwardMsg->setControlInfo(overlayCtrlInfo);
557 send(forwardMsg,
"appOut");
565 Enter_Method(
"local_lookup()");
568 throw cRuntimeError(
"BaseOverlay::local_lookup(): "
569 "safe flag is not implemented!");
572 if (num < 0) num = INT_MAX;
573 NodeVector* nodeVector = findNode(key, min(num, getMaxNumRedundantNodes()),
574 min(num,getMaxNumSiblings()));
576 if (((
int)nodeVector->size()) > num)
577 nodeVector->resize(num);
584 throw cRuntimeError(
"BaseOverlays::estimate_mean_distance(): "
585 "Function not implemented for this Overlay!");
595 while (count >= start) {
604 Enter_Method(
"join()");
608 if (((state == READY) || (state == FAILED)) && !rejoinOnFailure) {
613 if (state != READY) {
616 IPAddressResolver().addressOf(getParentModule()->getParentModule()));
619 thisNode.setKey(nodeID);
620 }
else if (thisNode.getKey().isUnspecified()) {
621 std::string nodeIdStr = par(
"nodeId").stdstringValue();
623 if (nodeIdStr.size()) {
631 cMessage *msg =
new cMessage(
"UDP_C_UNBIND", UDP_C_UNBIND);
632 UDPControlInfo *ctrl =
new UDPControlInfo();
633 ctrl->setSrcPort(thisNode.getPort());
634 ctrl->setSockId(socketId);
635 msg->setControlInfo(ctrl);
637 bindToPort((thisNode.getPort() + 10) % 0xFFFF);
638 thisNode.setKey(nodeID);
640 cObject** context = globalNodeList->getContext(getThisNode());
641 if (restoreContext && context) {
642 if (*context == NULL) {
653 throw cRuntimeError(
"BaseOverlay::joinForeignPartition(): "
654 "This overlay doesn't support merging!");
664 Enter_Method(
"neighborSet()");
666 return local_lookup(thisNode.getKey(), num,
false);
673 EV <<
"[BaseOverlay::callUpdate() @ " << thisNode.getIp()
674 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
675 <<
" (" << node <<
", " << joined <<
") joined"
678 EV <<
"[BaseOverlay::callUpdate() @ " << thisNode.
getIp()
679 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
680 <<
" (" << node <<
", " << joined <<
") left"
692 send(updateMsg,
"appOut");
696 int numSiblings,
bool* err)
698 Enter_Method(
"isSiblingFor()");
700 throw cRuntimeError(
"isSiblingFor: Not implemented!");
707 Enter_Method(
"getMaxNumSiblings()");
709 throw cRuntimeError(
"getMaxNumSiblings: Not implemented!");
716 Enter_Method(
"getMaxNumRedundantNodes()");
718 throw cRuntimeError(
"getMaxNumRedundantNodes: Not implemented!");
731 if (msg->getArrivalGate() == udpGate) {
732 UDPControlInfo* udpControlInfo =
733 check_and_cast<UDPControlInfo*>(msg->removeControlInfo());
736 udpControlInfo->getSrcAddr(),
737 udpControlInfo->getSrcPort()));
738 overlayCtrlInfo->setSrcRoute(overlayCtrlInfo->getLastHop());
741 msg->setControlInfo(overlayCtrlInfo);
742 delete udpControlInfo;
746 EV <<
"[BaseOverlay:handleMessage() @ " << thisNode.getIp()
747 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
748 <<
" Received " << *msg <<
" from "
749 << overlayCtrlInfo->getLastHop().getIp() << endl;
755 if (baseOverlayMsg == NULL) {
756 cPacket* packet = check_and_cast<cPacket*>(msg);
757 RECORD_STATS(numDropped++; bytesDropped += packet->getByteLength());
763 if (overlayCtrlInfo->getLastHop() != thisNode) {
766 RECORD_STATS(numAppDataReceived++; bytesAppDataReceived +=
767 baseOverlayMsg->getByteLength());
769 RECORD_STATS(numAppLookupReceived++;bytesAppLookupReceived +=
770 baseOverlayMsg->getByteLength());
773 bytesMaintenanceReceived +=
774 baseOverlayMsg->getByteLength());
776 if (overlayCtrlInfo->getLastHop().getIp() == thisNode.getIp()) {
778 RECORD_STATS(numInternalReceived++; bytesInternalReceived +=
779 baseOverlayMsg->getByteLength());
780 }
else overlayCtrlInfo->setHopCount(1);
783 if (!internalHandleMessage(msg)) {
784 handleBaseOverlayMessage(baseOverlayMsg);
789 else if (internalHandleMessage(msg))
return;
792 else if (dynamic_cast<CommonAPIMessage*>(msg) != NULL) {
793 if (dynamic_cast<KBRroute*>(msg) != NULL) {
796 std::vector<TransportAddress> sourceRoute;
801 static_cast<CompType>(apiMsg->
getSrcComp()), apiMsg->decapsulate(),
803 }
else if (dynamic_cast<KBRforward*>(msg) != NULL) {
807 (msg->removeControlInfo());
813 dataMsg->setName(apiMsg->getEncapsulatedPacket()->getName());
814 dataMsg->encapsulate(apiMsg->decapsulate());
815 dataMsg->setSrcComp(overlayCtrlInfo->
getSrcComp());
816 dataMsg->setDestComp(overlayCtrlInfo->
getDestComp());
822 routeMsg->encapsulate(dataMsg);
829 routeMsg->setControlInfo(overlayCtrlInfo);
832 routeMsg->setContextPointer(
this);
834 std::vector<TransportAddress> sourceRoute;
836 sendToKey(apiMsg->
getDestKey(), routeMsg, 1, sourceRoute);
843 else if (msg->getArrivalGate() == appGate) {
844 handleAppMessage(msg);
845 }
else if(msg->arrivedOn(
"tcpIn")) {
846 handleTCPMessage(msg);
847 }
else if (dynamic_cast<CompReadyMessage*>(msg)) {
849 if (((
bool)par(
"joinOnApplicationRequest") ==
false) &&
852 cObject** context = globalNodeList->getContext(getThisNode());
853 if (restoreContext && context && *context) {
855 globalNodeList->setMalicious(getThisNode(),
857 join(overlayContext->
key);
871 throw cRuntimeError(
"BaseOverlay::handleMessage(): Received msg with "
882 handleUDPMessage(msg);
890 internalHandleRpcMessage(rpcMsg);
902 callDeliver(baseAppDataMsg, destKey);
911 if (collectPerHopDelay) {
913 getHopDelayArraySize() + 1);
920 ->removeControlInfo());
925 std::vector<TransportAddress> sourceRoute;
931 ->getVisitedHopsArraySize() + 1);
933 ->getVisitedHopsArraySize() - 1,
952 if ((sourceRoute.size() == 0) &&
954 isSiblingFor(thisNode, baseRouteMsg->
getDestKey(), 1, &err)
993 tmpMsg->setControlInfo(overlayCtrlInfo);
996 if (collectPerHopDelay) {
999 for (i = singleHopDelays.size();
1007 for (
size_t j = 0; j <= i; ++j) {
1018 || recursiveRoutingHook(thisNode, baseRouteMsg)) {
1019 handleBaseOverlayMessage(tmpMsg, baseRouteMsg->
getDestKey());
1020 delete baseRouteMsg;
1025 baseRouteMsg->setControlInfo(overlayCtrlInfo);
1028 if (isMalicious() && dropRouteMessageAttack) {
1029 EV <<
"[BaseOverlay::handleBaseOverlayMessage() @ " << thisNode.getIp()
1030 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
1031 <<
" BaseRouteMessage gets dropped because this node is malicious"
1035 bytesDropped += baseRouteMsg->getByteLength());
1036 delete baseRouteMsg;
1040 sendToKey(baseRouteMsg->
getDestKey(), baseRouteMsg, 1, sourceRoute);
1047 EV <<
"[BaseOverlay::handleBaseOverlayMessage() @ " << thisNode.getIp()
1048 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
1049 <<
" Received unknown message from UDP of type " << msg->getName()
1057 Enter_Method_Silent();
1058 if (category == NF_OVERLAY_TRANSPORTADDRESS_CHANGED) {
1059 handleTransportAddressChangedNotification();
1060 }
else if (category == NF_OVERLAY_NODE_LEAVE) {
1061 handleNodeLeaveNotification();
1062 }
else if (category == NF_OVERLAY_NODE_GRACEFUL_LEAVE) {
1063 handleNodeGracefulLeaveNotification();
1070 thisNode.setIp(IPAddressResolver().addressOf(
1071 getParentModule()->getParentModule()));
1106 if ((ready && internalReadyState) || (!ready && !internalReadyState)) {
1110 internalReadyState = ready;
1112 getDisplayString().setTagArg(
"i", 1, ready ?
"" :
"red");
1113 if (isMalicious()) {
1114 getDisplayString().setTagArg(
"i", 1, ready ?
"green" :
"yellow");
1117 globalNodeList->setOverlayReadyIcon(getThisNode(), ready);
1120 bootstrapList->registerBootstrapNode(thisNode, overlayId);
1122 bootstrapList->removeBootstrapNode(thisNode, overlayId);
1125 if (globalParameters->getPrintStateToStdOut()) {
1126 std::cout <<
"OVERLAY STATE: " << (ready ?
"READY (" :
"OFFLINE (")
1127 << thisNode <<
")" << std::endl;
1155 bytesAppDataForwarded += msg->getByteLength());
1158 bytesAppLookupForwarded += msg->getByteLength());
1161 bytesMaintenanceForwarded += msg->getByteLength());
1167 if (msg && (dest != thisNode)) {
1171 sendMessageToUDP(dest, msg);
1175 nextHopCall->encapsulate(msg);
1180 uint8_t routeRetries = 0;
1181 sendUdpRpcCall(dest, nextHopCall, NULL, -1, routeRetries);
1187 cPacket* msg, simtime_t delay)
1190 cPolymorphic* ctrlInfo = msg->removeControlInfo();
1191 if (ctrlInfo != NULL)
1196 EV <<
"[BaseOverlay::sendMessageToUDP() @ " << thisNode.getIp()
1197 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
1198 <<
" Sending " << *msg <<
" to " << dest.
getIp()
1202 msg->setKind(UDP_C_DATA);
1203 UDPControlInfo* udpControlInfo =
new UDPControlInfo();
1204 udpControlInfo->setSrcAddr(thisNode.getIp());
1205 udpControlInfo->setSrcPort(thisNode.getPort());
1206 udpControlInfo->setDestAddr(dest.
getIp());
1207 udpControlInfo->setDestPort(dest.
getPort());
1208 msg->setControlInfo(udpControlInfo);
1210 if (dest != thisNode) {
1216 bytesAppDataSent += msg->getByteLength());
1219 msg->getByteLength());
1221 RECORD_STATS(numMaintenanceSent++; bytesMaintenanceSent +=
1222 msg->getByteLength());
1224 recordOverlaySentStats(baseOverlayMsg);
1226 if (dynamic_cast<BaseResponseMessage*>(msg) && getMeasureAuthBlock()) {
1231 if (dest.
getIp() == thisNode.getIp()) {
1232 RECORD_STATS(numInternalSent++; bytesInternalSent += msg->getByteLength());
1235 sendDelayed(msg, delay,
"udpOut");
1251 while (lookups.size() > 0) {
1252 (*lookups.begin())->abortLookup();
1265 this->overlay = overlay;
1281 if (dynamic_cast<BaseRouteMessage*>(msg)) {
1285 EV <<
"[SendToKeyListener::lookupFinished()]\n"
1286 " [ERROR] SendToKeyListener: Valid result, "
1287 "but empty array." << endl;
1292 for (uint32_t i=0; i<lookup->
getResult().size(); i++) {
1293 overlay->sendRouteMessage(lookup->
getResult()[i],
1296 overlay->routeMsgAcks);
1300 EV <<
"[SendToKeyListener::lookupFinished()]\n"
1301 <<
" Lookup failed - dropping message"
1314 overlay->bytesDropped += routeMsg->getByteLength());
1316 }
else if (dynamic_cast<LookupCall*>(msg)) {
1324 for (uint32_t i=0; i<lookup->
getResult().size(); i++) {
1328 EV <<
"[SendToKeyListener::lookupFinished() @ "
1329 << overlay->thisNode.getIp()
1330 <<
" (" << overlay->thisNode.getKey().toString(16) <<
")]\n"
1333 <<
" failed! (size=0)" << endl;
1337 EV <<
"[SendToKeyListener::lookupFinished() @ "
1338 << overlay->thisNode.getIp()
1339 <<
" (" << overlay->thisNode.getKey().toString(16) <<
")]\n"
1342 <<
" failed!" << endl;
1344 overlay->sendRpcResponse(call, response);
1347 throw cRuntimeError(
"SendToKeyListener::lookupFinished(): "
1348 "Unknown message type!");
1356 const std::vector<TransportAddress>& sourceRoute,
1360 (!sourceRoute.size() || sourceRoute[0].isUnspecified()))
1361 throw cRuntimeError(
"route(): Key and hint unspecified!");
1370 baseAppDataMsg->setBitLength(
BASEAPPDATA_L(baseAppDataMsg));
1371 baseAppDataMsg->setName(msg->getName());
1374 baseAppDataMsg->encapsulate(msg);
1378 EV <<
"[BaseOverlay::route() @ " << thisNode.getIp()
1379 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
1380 <<
" Received message from application"
1385 sendMessageToUDP(sourceRoute[0], baseAppDataMsg);
1387 if (internalReadyState ==
false) {
1389 EV <<
"[BaseOverlay::route() @ "
1390 << getThisNode().getIp()
1391 <<
" (" << getThisNode().getKey().toString(16) <<
")]\n"
1392 <<
" Couldn't route application message to key "
1394 <<
" because the overlay module is not ready!" << endl;
1396 bytesDropped += baseAppDataMsg->getByteLength());
1397 delete baseAppDataMsg;
1401 sendToKey(key, baseAppDataMsg, 1, sourceRoute, routingType);
1413 const std::vector<TransportAddress>& sourceRoute,
1421 EV <<
"[BaseOverlay::sendToKey() @ " << thisNode.getIp()
1422 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
1423 <<
" Sending " << msg <<
" to " << key
1428 !(sourceRoute.size() && !sourceRoute[0].isUnspecified()))
1429 throw cRuntimeError(
"BaseOverlay::sendToKey(): "
1430 "unspecified destination address and key!");
1433 assert(!msg->getControlInfo());
1436 routeMsg->setRoutingType(routingType);
1437 routeMsg->setDestKey(key);
1438 routeMsg->setSrcNode(thisNode);
1441 routeMsg->setName(msg->getName());
1443 routeMsg->encapsulate(msg);
1449 routeMsg->setControlInfo(routeCtrlInfo);
1452 routeMsg->setContextPointer(NULL);
1459 if (collectPerHopDelay) {
1463 if (sourceRoute.size() && !sourceRoute[0].isUnspecified()) {
1466 (routeMsg->getControlInfo());
1470 for (uint32_t i = 0; i < sourceRoute.size(); ++i)
1472 if (recursiveRoutingHook(sourceRoute[0], routeMsg)) {
1473 sendRouteMessage(sourceRoute[0], routeMsg, routeMsgAcks);
1483 AbstractLookup* lookup = createLookup(routingType, routeMsg, NULL,
1490 recNumRedundantNodes,
1491 numSiblings, routeMsg);
1493 if (nextHops->size() == 0) {
1494 EV <<
"[BaseOverlay::sendToKey() @ " << thisNode.getIp()
1495 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
1496 <<
" FindNode() returned NULL - dropping message"
1504 RECORD_STATS(numDropped++; bytesDropped += routeMsg->getByteLength());
1510 EV <<
"[BaseOverlay::sendToKey() @ " << thisNode.getIp()
1511 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
1512 <<
" Discards " << routeMsg->getName() <<
" from "
1514 <<
" The hop count maximum has been exceeded ("
1516 << hopCountMax <<
")"
1529 bytesDropped += routeMsg->getByteLength());
1537 assert(overlayCtrlInfo);
1541 bool err, isSibling;
1542 isSibling = isSiblingFor(thisNode, routeMsg->
getDestKey(),
1546 std::set<TransportAddress> visitedHops;
1551 for (uint32_t index = 0; nextHop == NULL && nextHops->size() > index;
1553 nextHop = &((*nextHops)[index]);
1555 if (((overlayCtrlInfo->
getLastHop() == *nextHop) &&
1556 (*nextHop != thisNode)) ||
1557 (visitedHops.find(*nextHop) != visitedHops.end()) ||
1562 ((*nextHop == thisNode) && (!isSibling))) {
1567 if (nextHop == NULL) {
1568 if (!checkFindNode(routeMsg)) {
1569 EV <<
"[BaseOverlay::sendToKey() @ " << thisNode.
getIp()
1570 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
1571 <<
" Discards " << routeMsg->getName() <<
" from "
1573 <<
" No useful nextHop found!"
1580 bytesDropped += routeMsg->getByteLength());
1590 if (useCommonAPIforward &&
1591 dynamic_cast<BaseAppDataMessage*>(
1592 routeMsg->getEncapsulatedPacket()) &&
1593 routeMsg->getContextPointer() == NULL) {
1594 callForward(routeMsg->
getDestKey(), routeMsg, *nextHop);
1599 routeMsg->setContextPointer(NULL);
1602 if (*nextHop == thisNode) {
1603 if (isSibling && !err) {
1610 assert(routeMsg->getControlInfo());
1611 handleBaseOverlayMessage(routeMsg, key);
1614 throw cRuntimeError(
"isSiblingsFor() is true with an "
1615 "error: Erroneous method "
1621 if (recursiveRoutingHook(*nextHop, routeMsg)) {
1622 sendRouteMessage(*nextHop, routeMsg, routeMsgAcks);
1631 if (dynamic_cast<FindNodeCall*>(routeMsg->getEncapsulatedPacket())) {
1635 ->setControlInfo(check_and_cast<OverlayCtrlInfo*>
1636 (routeMsg->removeControlInfo()));
1637 if (findNodeCall->
getSrcNode() != thisNode) {
1638 findNodeRpc(findNodeCall);
1648 const cPacket* findNodeExt,
1654 routingType = defaultRoutingType;
1657 switch (routingType) {
1661 iterativeLookupConfig, findNodeExt,
1668 recursiveLookupConfig,
1672 throw cRuntimeError(
"BaseOverlay::createLookup():"
1673 " Unknown routingType!");
1677 lookups.insert(newLookup);
1683 lookups.erase(lookup);
1689 bool useAlternative)
const
1691 throw cRuntimeError(
"BaseOverlay::distance(): Not implemented!");
1697 int numRedundantNodes,
1701 throw cRuntimeError(
"findNode: Not implemented!");
1738 cPolymorphic* context,
1739 int rpcId, simtime_t rtt)
1746 cPolymorphic* context,
int rpcId,
1755 assert(!tempMsg->getControlInfo());
1756 if (!tempMsg->getControlInfo()) {
1763 tempMsg->setControlInfo(overlayCtrlInfo);
1768 if (handleFailedNode(dest)) {
1773 EV <<
"[BaseOverlay::internalHandleRpcTimeout() @ "
1775 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
1776 <<
" Resend msg for key " << destKey
1778 handleBaseOverlayMessage(tempMsg, destKey);
1784 EV <<
"[BaseOverlay::internalHandleRpcTimeout() @ "
1786 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
1787 <<
" Resend msg to next available node in nextHops[]: "
1790 handleBaseOverlayMessage(tempMsg);
1792 EV <<
"[BaseOverlay::internalHandleRpcTimeout() @ "
1794 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
1795 <<
" dropping msg for " << dest
1798 bytesDropped += tempMsg->getByteLength());
1803 bytesDropped += tempMsg->getByteLength());
1817 const std::vector<TransportAddress>&
1821 uint32_t numSiblings = 1;
1822 if ((findNodeCall = dynamic_cast<FindNodeCall*>(message)))
1825 sendToKey(destKey, message, numSiblings, sourceRoute, routingType);
1837 if (sendRpcResponseToLastHop) {
1864 }
else if ((static_cast<RoutingType> (overlayCtrlInfo->
getRoutingType())
1872 sendRpcResponse(transportType,
1873 static_cast<CompType>(overlayCtrlInfo->
getSrcComp()),
1874 *destNode, *destKey, call, response);
1881 bytesFindNodeSent += call->getByteLength());
1887 bytesFailedNodeSent += call->getByteLength());
1894 if (isMalicious() && dropFindNodeAttack) {
1895 EV <<
"[BaseOverlay::findNodeRpc() @ " << thisNode.getIp()
1896 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
1897 <<
" Node ignores findNodeCall because this node is malicious"
1912 for (uint32_t i=0; i < nextHops->size(); i++) {
1923 if (isMalicious() && invalidNodesAttack) {
1924 if (isSiblingAttack) {
1934 for (
int i = 0; i < resultSize; i++) {
1937 isSiblingAttack ? (424242+i) : intuniform(42,123123))), 42));
1940 if ((i == 0) && isSiblingAttack) {
1945 }
else if (isMalicious() && isSiblingAttack) {
1953 if (call->hasObject(
"findNodeExt")) {
1954 cPacket* findNodeExt = check_and_cast<cPacket*>(call->removeObject(
"findNodeExt"));
1955 findNodeResponse->addObject(findNodeExt);
1956 findNodeResponse->addBitLength(findNodeExt->getBitLength());
1959 RECORD_STATS(numFindNodeResponseSent++; bytesFindNodeResponseSent +=
1960 findNodeResponse->getByteLength());
1964 sendRpcResponse(call, findNodeResponse);
1975 if (call->hasObject(
"findNodeExt")) {
1976 cPacket* findNodeExt = check_and_cast<cPacket*>(
1977 call->removeObject(
"findNodeExt"));
1978 failedNodeResponse->addObject(findNodeExt);
1979 failedNodeResponse->addBitLength(findNodeExt->getBitLength());
1982 RECORD_STATS(numFailedNodeResponseSent++; bytesFailedNodeResponseSent +=
1983 failedNodeResponse->getByteLength());
1985 sendRpcResponse(call, failedNodeResponse);
1992 if (numSiblings < 0) {
1993 numSiblings = getMaxNumSiblings();
1996 if (internalReadyState ==
false) {
1998 EV <<
"[BaseOverlay::lookupRpc() @ "
1999 << getThisNode().getIp()
2000 <<
" (" << getThisNode().getKey().toString(16) <<
")]\n"
2003 <<
" failed, because overlay module is not ready!" << endl;
2009 sendRpcResponse(call, response);
2017 lookup->
lookup(call->
getKey(), numSiblings, hopCountMax,
2023 if (state != READY) {
2036 overlayCtrlInfo->setSrcNode(routeMsg->
getSrcNode());
2039 routeMsg->setControlInfo(overlayCtrlInfo);
2040 assert(routeMsg->getControlInfo());
2042 std::string temp(
"ACK: [");
2043 (temp += routeMsg->getName()) +=
"]";
2045 handleBaseOverlayMessage(routeMsg, routeMsg->
getDestKey());
2050 sendRpcResponse(call, response);
2057 if (module != NULL) {
2058 gate = module->gate(
"direct_in");
2060 throw cRuntimeError(
"BaseOverlay::registerComp(): The module "
2061 "which tried to register has "
2062 "no direct_in gate!");
2066 compModuleList[compType] = make_pair<cModule*, cGate*>(module, gate);
2071 CompModuleList::iterator it = compModuleList.find(compType);
2073 if (it != compModuleList.end())
2074 return it->second.first;
2081 CompModuleList::iterator it = compModuleList.find(compType);
2083 if (it != compModuleList.end())
2084 return it->second.second;
2091 Enter_Method_Silent();
2094 for (CompModuleList::iterator it = compModuleList.begin();
2095 it != compModuleList.end(); it++) {
2098 if (it->first != srcComp)
2099 sendDirect((cMessage*)msg->dup(), it->second.second);
2107 return isVector() || getParentModule()->isVector();