29 #include <IPAddressResolver.h>
30 #include <IPvXAddress.h>
31 #include <IInterfaceTable.h>
32 #include <IPv4InterfaceData.h>
41 #define BUCKET_CONSISTENCY(msg) \
45 if (siblingTable != NULL && siblingTable->size() > 0) {\
46 stFull = siblingTable->isFull();\
47 z = routingBucketIndex(siblingTable->back().getKey()) - 1;\
48 if (routingTable[z - 1] != NULL && routingTable[z - 1]->size())\
51 for (int y = 0; y < (z - 2); ++y) {\
52 if ((routingTable[y] != NULL && routingTable[y]->size() > k) ||\
53 (routingTable[y] != NULL && routingTable[y]->size() && !stFull))\
58 #define BUCKET_CONSISTENCY(msg)
301 return (i /
b) * ((1 <<
b) - 1) + (pow(2,
b) - 2);
313 if (bucket == NULL && ensure)
321 simtime_t rtt,
bool maintenanceLookup)
331 <<
" inserting node " << handle <<
" (rtt = ";
332 if (rtt == MAXTIME) {
335 EV << SIMTIME_DBL(rtt);
337 EV <<
", isAlive = " << (isAlive?
"true":
"false")
343 bool authenticated = (isAlive && (rtt != MAXTIME));
345 bool needsRtt = (
activePing && ((rtt == MAXTIME) ?
true :
false));
358 if (kadHandle.
getRtt() == MAXTIME) {
359 kadHandle.
setRtt(i->getRtt());
364 if (maintenanceLookup) {
368 if ((i->getIp() != kadHandle.
getIp()) ||
369 (i->getPort() != kadHandle.
getPort())) {
380 <<
" ... node is already in the sibling table." << endl;
391 if (kadHandle.
getRtt() == MAXTIME) {
392 kadHandle.
setRtt(i->getRtt());
396 if (needsRtt && (kadHandle.
getRtt() == MAXTIME)) {
414 " ... node not added, but ping sent." << endl;
422 bucket->push_back(kadHandle);
424 if (maintenanceLookup) {
428 if ((i->getIp() != kadHandle.
getIp()) ||
429 (i->getPort() != kadHandle.
getPort())) {
440 <<
" ... node is already in bucket "
448 if (!maintenanceLookup || (isAlive && (rtt == MAXTIME))) {
479 <<
" ... node not added (node timeout)." << endl;
484 bool finished =
false;
497 <<
" ... node added to sibling table, replacing "
501 kadHandle = oldHandle;
516 <<
" ... node added to sibling table." << endl;
521 assert(siblingPos > -1);
527 "m=m,50,100,50,100;ls=green,1");
540 if ((isAlive && (rtt == MAXTIME))) {
568 bucket->push_back(kadHandle);
572 <<
" ... node added to bucket "
574 <<
" which was not yet full." << endl;
575 }
else if (isAlive) {
580 kickHim = it = bucket->begin();
582 while (it != bucket->end()) {
583 if (it->getRtt() > kickHim->getRtt()) {
588 if (kickHim->getRtt() > kadHandle.
getRtt()) {
590 bucket->erase(kickHim);
591 bucket->push_back(kadHandle);
595 <<
" ... added to bucket "
597 <<
" (PNS: replacing another node)." << endl;
609 while (it != bucket->end() && (it->getPingSent() ==
true)) {
612 if (it != bucket->end()) {
614 it->setPingSent(
true);
644 i->setPingSent(
false);
674 if (bucket != NULL && (i = bucket->
findIterator(key) ) != bucket->end() ) {
677 i->setPingSent(
false);
718 for (uint32_t i = 0; i <
routingTable[index]->size(); ++i) {
727 "m=m,50,100,50,100;ls=green,1");
733 findIterator(sortedBucket.front().getKey()));
756 int numSiblings,
bool* err)
759 error(
"Kademlia::isSiblingFor(): key is unspecified!");
762 EV <<
"[Kademlia::isSiblingFor()] @ "
772 opp_error(
"Kademlia::isSiblingFor(): numSiblings too big!");
776 if (numSiblings == -1) {
780 if (numSiblings == 0) {
782 return (node.
getKey() == key);
793 EV <<
"[Kademlia::isSiblingFor()] @ "
796 <<
" Not sure if I am sibling for " << key <<
" !\n"
797 <<
" (" << key <<
" is not closer to me than "
821 if (result->contains(node.
getKey())) {
826 assert(!(numSiblings == 1 && key == node.
getKey()));
853 if (failed == *i)
break;
902 kadRoutingInfoMsg->setSrcNode(
thisNode);
903 kadRoutingInfoMsg->setDestKey(msg->
getDestKey());
904 kadRoutingInfoMsg->setNextHopsArraySize(nextHops->size());
905 kadRoutingInfoMsg->setName(
"KadRoutingInfoMsg");
906 for (uint32_t i = 0; i < nextHops->size(); i++) {
907 kadRoutingInfoMsg->setNextHops(i, (*nextHops)[i]);
908 if (
thisNode == kadRoutingInfoMsg->getNextHops(i)) {
909 kadRoutingInfoMsg->getNextHops(i).setIsAlive(
true);
920 std::vector<TransportAddress> sourceRoute;
931 dynamic_cast<KademliaRoutingInfoMessage*>(msg->
942 for (uint32_t i = 0; i < nextHops->size(); i++) {
943 temp.push_back((*nextHops)[i]);
954 for (uint32_t i = 0; i < temp.size(); ++i) {
962 msg->encapsulate(infoMsg);
973 opp_error(
"(Kademlia::findNode()) numRedundantNodes or numSiblings "
978 if (numRedundantNodes < 2) {
979 throw cRuntimeError(
"Kademlia::findNode(): For Kademlia "
980 "redundantNodes must be at least 2 "
981 "and lookupMerge should be true!");
993 if (numSiblings < 0) {
995 resultSize = numRedundantNodes;
998 (numSiblings ? numSiblings : 1) : numRedundantNodes;
1000 assert(numSiblings || numRedundantNodes);
1011 bool returnProxNodes =
false;
1014 (!dynamic_cast<FindNodeCall*>(msg->getEncapsulatedPacket()) &&
1015 !dynamic_cast<FindNodeCall*>(msg))) {
1016 returnProxNodes =
true;
1022 if (returnProxNodes) {
1024 resultProx =
new ProxNodeVector(resultSize, NULL, NULL, compProx, 0, resultSize);
1034 if (mainIndex != -1) {
1036 if (bucket != NULL && bucket->size()) {
1039 if (returnProxNodes)
1040 resultProx->
add(*i);
1048 if (startIndex >= endIndex || !result->
isFull()) {
1049 for (index = startIndex; index >= endIndex; --index) {
1051 if (index == mainIndex)
continue;
1053 if (bucket != NULL && bucket->size()) {
1056 if (returnProxNodes)
1057 resultProx->
add(*i);
1068 if (returnProxNodes)
1069 resultProx->
add(*i);
1073 if (returnProxNodes) {
1075 if (!result->size() || (*result)[0] ==
thisNode) {
1077 resultProx->
add(temp);
1080 resultProx->
add(temp);
1090 if (bucket != NULL && bucket->size()) {
1093 if (returnProxNodes)
1094 resultProx->
add(*i);
1102 if (returnProxNodes && resultProx->size() && result->size() &&
1106 for (uint32_t i = 0; i < resultProx->size(); ++i) {
1107 result->push_back((*resultProx)[i]);
1147 routingAdd(kadRoutingInfoMsg->getSrcNode(),
true);
1149 for (uint32_t i = 0; i < kadRoutingInfoMsg->getNextHopsArraySize(); i++) {
1150 routingAdd(kadRoutingInfoMsg->getNextHops(i),
1151 kadRoutingInfoMsg->getNextHops(i).getIsAlive());
1184 cPolymorphic* context,
int rpcId,
1210 routingAdd(srcRoute,
true, rtt, maintenanceLookup);
1211 for (uint32_t i=0; i<_FindNodeResponse->getClosestNodesArraySize(); i++)
1212 routingAdd(_FindNodeResponse->getClosestNodes(i),
true,
1213 MAXTIME-1, maintenanceLookup);
1238 for (uint32_t i=0; i<_FindNodeResponse->getClosestNodesArraySize(); i++)
1239 routingAdd(_FindNodeResponse->getClosestNodes(i),
false,
1240 MAXTIME, maintenanceLookup);
1246 routingAdd(srcRoute,
true, rtt, maintenanceLookup);
1252 cPolymorphic* context,
int rpcId,
1283 <<
" ERROR: RPC timeout without key ("
1284 << msg <<
" -> " << dest <<
")" << endl;
1291 cPolymorphic *contextPointer,
Prox prox)
1293 Enter_Method_Silent();
1370 int bucketsRefreshedPerTask = 0;
1372 for (int32_t d=0; d < ((1 <<
b) - 1); d++) {
1373 int32_t index = (i /
b) * ((1 <<
b) - 1) + d;
1374 if (index < 0)
continue;
1403 ++bucketsRefreshedPerTask;
1410 "Kademlia: Buckets Refreshed Per Task",
1411 bucketsRefreshedPerTask));
1423 bool useAlternative)
const
1425 if (!useAlternative)
return x^y;
1432 std::stringstream ttString;
1435 ttString <<
"This: " <<
thisNode << endl <<
"Siblings: "
1438 getParentModule()->getParentModule()->getDisplayString().
1439 setTagArg(
"tt", 0, ttString.str().c_str());
1440 getParentModule()->getDisplayString().
1441 setTagArg(
"tt", 0, ttString.str().c_str());
1442 getDisplayString().setTagArg(
"tt", 0, ttString.str().c_str());