26 #include <IPAddressResolver.h>
27 #include <IPvXAddress.h>
28 #include <IInterfaceTable.h>
29 #include <IPv4InterfaceData.h>
56 for (std::vector<PastryStateMsgHandle>::iterator it =
79 for (std::vector<PastrySendState*>::iterator it =
81 if ( (*it)->isScheduled() ) cancelEvent(*it);
137 (
useDiscovery ?
new cMessage(
"discoveryTimeout") : NULL);
177 NULL,
"PING bootstrapNode in discovery mode",
233 cPolymorphic* context,
int rpcId,
239 <<
" Pong (or Ping-context from NeighborCache) received (from "
264 <<
" joining despite some missing STATE messages."
270 <<
" timeout waiting for missing state messages"
271 <<
" in JOIN state, restarting..."
278 <<
" sending state updates to all nodes."
284 <<
" sending STATE requests to all nodes in"
285 <<
" second stage of initialization."
307 <<
" starting routing table maintenance"
312 }
else if (dynamic_cast<PastrySendState*>(msg)) {
315 std::vector<PastrySendState*>::iterator pos =
335 scheduleAt(simTime() + 0.0001, selfMsg);
347 <<
" incoming STATE message of type "
348 << cEnum::get(
"PastryStateMsgType")->getStringFor(type) << endl;
352 stateMsg->getByteLength());
365 <<
" Received RPC call and state != READY"
384 EV <<
"[Pastry::handlePastryJoinCall() @ " <<
thisNode.
getIp()
393 EV <<
"[Pastry::handlePastryJoinCall() @ " <<
thisNode.
getIp()
395 <<
" PastryJoinCall received by originator!"
398 EV <<
"[Pastry::handlePastryJoinCall() @ " <<
thisNode.
getIp()
400 <<
" received join message before reaching "
401 <<
"READY state, dropping message!"
405 EV <<
"[Pastry::handlePastryJoinCall() @ " <<
thisNode.
getIp()
407 <<
" PastryJoinCall gets dropped because it is "
408 <<
"outdated and has been received by originator!"
415 if ((joinHopCount > 1) &&
427 response->setTimestamp(simTime());
433 -1, joinHopCount,
true));
446 EV <<
"[Pastry::handleRequestStateCall() @ " <<
thisNode.
getIp()
454 EV <<
" received repair request before reaching"
455 <<
" READY state, dropping message!"
476 EV <<
"[Pastry::handleRequestRepairCall() @ " <<
thisNode.
getIp()
484 EV <<
" received repair request before reaching"
485 <<
" READY state, dropping message!"
506 EV <<
"[Pastry::handleRequestRepairResponse() @ " <<
thisNode.
getIp()
520 cPolymorphic* context,
int rpcId,
529 <<
" Received a JOIN RPC Response: id=" << rpcId <<
"\n"
530 <<
" msg=" << *_PastryJoinResponse <<
" rtt=" << SIMTIME_DBL(rtt)
538 <<
" Received a RequestState RPC Response: id=" << rpcId <<
"\n"
539 <<
" msg=" << *_RequestStateResponse <<
" rtt=" << SIMTIME_DBL(rtt)
545 EV <<
"[BasePastry::handleRpcResponse() @ " <<
thisNode.
getIp()
547 <<
" Received a Request Repair RPC Response: id=" << rpcId <<
"\n"
548 <<
" msg=" << *_RequestRepairResponse <<
" rtt=" << SIMTIME_DBL(rtt)
556 <<
" Received a RequestLeafSet RPC Response: id=" << rpcId <<
"\n"
557 <<
" msg=" << *_RequestLeafSetResponse <<
" rtt=" << SIMTIME_DBL(rtt)
565 <<
" Received a RequestRoutingRow RPC Response: id=" << rpcId <<
"\n"
566 <<
" msg=" << *_RequestRoutingRowResponse <<
" rtt=" << rtt
577 cPolymorphic* context,
int rpcId,
584 <<
" Timeout of RPC Call: id=" << rpcId <<
"\n"
585 <<
" msg=" << *call <<
" key=" << key
588 if (
state ==
DISCOVERY && dynamic_cast<RequestLeafSetCall*>(call)) {
596 EV <<
"[Pastry::handlePastryJoinResponse() @ " <<
thisNode.
getIp()
610 EV <<
"[Pastry::handleRequestStateResponse() @ " <<
thisNode.
getIp()
625 EV <<
"[Pastry::handleRequestLeafSetResponse() @ " <<
thisNode.
getIp()
639 NULL,
"PING received leaves for nearest node",
645 EV <<
" received leafset, waiting for pings"
656 EV <<
"[Pastry::handleRequestRoutingRowResponse() @ " <<
thisNode.
getIp()
672 for (uint32_t i = 0; i < nodesPerRow; i++) {
679 "PING received routing table for nearest node",
687 EV <<
" received routing table, waiting for pings"
717 check_and_cast<OverlayCtrlInfo*>(msg->getControlInfo())->getHopCount(),
732 if (msg && msg->hasObject(
"findNodeExt")) {
735 getObject(
"findNodeExt"));
766 std::vector<TransportAddress>::iterator nListPos;
768 for (std::vector<PastryStateMsgHandle>::iterator it =
772 it->msg->getTimestamp());
778 it->msg->getSender());
789 for (std::vector<TransportAddress>::iterator it =
806 getParentModule()->getParentModule()->bubble(
"entering SECOND STAGE");
824 <<
" stateCacheQueue full -> pop()" << endl;
839 for (std::vector<TransportAddress>::iterator it =
notifyList.begin();
845 <<
" second stage: requesting state from " << *it
864 assert(!dynamic_cast<const NodeHandle&>(ask4row).getKey().isUnspecified());
876 EV <<
"[Pastry::doRoutingTableMaintenance() @ "
879 <<
" could not send Message to Node in Row" << i
893 opp_error(
"Pastry::handleFailedNode(): failed is unspecified!");
928 <<
" lost connection to the network, trying to re-join."
955 throw cRuntimeError(
"ERROR in Pastry: stateCache.prox empty!");
970 <<
" all proximities for current STATE message from "
975 simtime_t now = simTime();
985 <<
" proximities for all STATE messages collected!"
992 <<
" [JOIN] starting to build own state from "
993 <<
stReceived.size() <<
" received state messages..."
1000 <<
" changeState(READY) called"
1005 <<
" Error initializing while joining! Restarting ..."
1013 <<
" NOT all proximities for all STATE messages collected!"
1018 throw cRuntimeError(
"stReceivedPos->msg = NULL");
1021 throw cRuntimeError(
"msg = NULL");
1053 <<
" Merging nodes into routing table"
1059 <<
" Merged nodes into routing table"
1110 <<
" [JOIN] starting to build own state from "
1111 <<
stReceived.size() <<
" received state messages..."
1119 <<
" [JOIN] initializing NeighborhoodSet from "
1120 <<
stReceived.front().msg->getRow() <<
". hop"
1127 <<
" Error initializing own neighborhoodSet"
1128 <<
" while joining! Restarting ..."
1136 <<
" [JOIN] initializing LeafSet from "
1137 <<
stReceived.back().msg->getRow() <<
". hop"
1145 <<
" Error initializing own leafSet while joining!"
1146 <<
" Restarting ..."
1156 <<
" [JOIN] initializing RoutingTable from all hops"
1166 <<
" Error initializing own routingTable while joining!"
1167 <<
" Restarting ..."
1196 EV <<
"[Pastry::handleStateMessage() @ " <<
thisNode.
getIp()
1198 <<
" new STATE message to process "
1199 <<
static_cast<void*
>(msg) <<
" in state " <<
1203 EV <<
"[Pastry::handleStateMessage() @ " <<
thisNode.
getIp()
1206 <<
" *** already received: " <<
stReceived.size() << endl
1207 <<
" *** last-hop flag: "
1208 << (msg->
getLastHop() ?
"true" :
"false") << endl
1209 <<
" *** msg joinHopCount: "
1210 << msg->
getRow() << endl;
1214 EV <<
"[Pastry::handleStateMessage() @ " <<
thisNode.
getIp()
1216 <<
" can't handle state messages until at least reaching JOIN state."
1233 EV <<
"[Pastry::handleStateMessage() @ " <<
thisNode.
getIp()
1235 <<
" Warning: dropping state message received after "
1236 <<
"all needed state messages were collected in JOIN state."
1247 EV <<
"[Pastry::handleStateMessage() @ " <<
thisNode.
getIp()
1249 <<
" Error: received a second `last' state message! Restarting ..."
1267 EV <<
"[Pastry::handleStateMessage() @ " <<
thisNode.
getIp()
1269 <<
" Error: too many state messages received in JOIN state! ("
1284 EV <<
"[Pastry::handleStateMessage() @ " <<
thisNode.
getIp()
1286 <<
" have all STATE messages, now pinging nodes."
1295 EV <<
"[Pastry::handleStateMessage() @ " <<
thisNode.
getIp()
1297 <<
" changeState(READY) called"
1307 EV <<
"[Pastry::handleStateMessage() @ " <<
thisNode.
getIp()
1309 <<
" Still need some STATE messages."
1318 EV <<
"[Pastry::handleStateMessage() @ " <<
thisNode.
getIp()
1320 <<
" handling STATE message"
1327 EV <<
" msg timestamp: " <<
1329 EV <<
" last state change: " <<
1338 EV <<
"[Pastry::handleStateMessage() @ " <<
thisNode.
getIp()
1340 <<
" outdated state from " << msg->
getSender()
1385 EV <<
"[Pastry::handleStateMessage() @ " <<
thisNode.
getIp()
1387 <<
" stateCacheQueue full -> pop()" << endl;
1415 << std::string(cEnum::find(
"PastryStateMsgType")
1417 <<
"\" STATE message " <<
static_cast<void*
>(
stateCache.
msg)