27 #include <IPAddressResolver.h>
42 this->overlay = overlay;
62 cancelAndDelete(join_timer);
63 cancelAndDelete(bucket_timer);
77 bucketSize = par(
"bucketSize");
78 rBucketSize = par(
"rBucketSize");
79 joinDelay = par(
"joinDelay");
80 shiftingBits = par(
"brooseShiftingBits");
81 userDist = par(
"userDist");
82 refreshTime = par(
"refreshTime");
83 numberRetries = par(
"numberRetries");
93 receivedJoinResponse = 0;
94 receivedBBucketLookup = 0;
95 numberBBucketLookup = 0;
96 receivedLBucketLookup = 0;
97 numberLBucketLookup = 0;
98 powShiftingBits = 1 << shiftingBits;
100 numFailedPackets = 0;
104 WATCH(receivedJoinResponse);
105 WATCH(receivedBBucketLookup);
106 WATCH(numberBBucketLookup);
107 WATCH(receivedLBucketLookup);
108 WATCH(numberLBucketLookup);
114 for (
int i = 0; i < powShiftingBits; i++) {
116 (getParentModule()->getSubmodule(
"rBucket",i));
117 bucketVector.push_back(rBucket[i]);
121 (getParentModule()->getSubmodule(
"lBucket"));
122 bucketVector.push_back(lBucket);
125 (getParentModule()->getSubmodule(
"bBucket"));
126 bucketVector.push_back(bBucket);
129 join_timer =
new cMessage(
"join_timer");
130 bucket_timer =
new cMessage(
"bucket_timer");
139 if (bootstrapNode.isUnspecified()) {
152 bootstrapNode = bootstrapList->getBootstrapNode();
154 cancelEvent(join_timer);
155 scheduleAt(simTime(), join_timer);
158 for (
int i = 0; i < powShiftingBits; i++) {
159 rBucket[i]->initializeBucket(shiftingBits, i, rBucketSize,
this);
162 lBucket->initializeBucket(-shiftingBits, 0, powShiftingBits*rBucketSize,
164 bBucket->initializeBucket(0, 0, 7*bucketSize,
this,
true);
167 receivedBBucketLookup = 0;
168 receivedLBucketLookup = 0;
169 receivedJoinResponse = 0;
171 getParentModule()->getParentModule()->bubble(
"Enter INIT state.");
182 for (
int i = 0; i < powShiftingBits; i++) {
183 int size = rBucket[i]->getSize();
185 for (
int j = 0; j < size; j++) {
186 tmpBucket->
add(rBucket[i]->
get(j));
191 for (uint32_t i = 0; i < tmpBucket->
getSize(); i++) {
197 sendUdpRpcCall(tmpBucket->
get(i), bCall[i], NULL,
202 numberBBucketLookup = ceil((
double)tmpBucket->
getSize() / 2);
206 getParentModule()->getParentModule()->bubble(
"Enter RSET state.");
214 numberLBucketLookup = ceil((
double)bBucket->getSize() / 2);
217 int size2 = bBucket->
getSize();
219 for (
int i = 0; i < size2; i++) {
225 sendUdpRpcCall(bBucket->get(i), bCall2[i], NULL,
229 getParentModule()->getParentModule()->bubble(
"Enter BSET state.");
237 for (
size_t i = 0; i < bucketVector.size(); i++) {
238 bucketVector[i]->add(thisNode);
243 if (refreshTime != 0) {
244 cancelEvent(bucket_timer);
245 scheduleAt(simTime() + (refreshTime / 2.0), bucket_timer);
248 getParentModule()->getParentModule()->bubble(
"Enter READY state.");
255 setOverlayReady(state == READY);
260 if (msg == join_timer)
261 handleJoinTimerExpired(msg);
262 else if (msg == bucket_timer)
263 handleBucketTimerExpired(msg);
265 error(
"Broose::handleTimerEvent - no other timer currently in use!");
273 if (!bootstrapNode.isUnspecified()) {
280 sendRouteRpcCall(
OVERLAY_COMP, bootstrapNode, thisNode.getKey(),
285 lCall->setProState(FAILED);
288 thisNode.getKey() << shiftingBits, lCall);
291 OverlayKey newKey = thisNode.getKey() >> shiftingBits;
293 for (
int i = 0; i < powShiftingBits; i++) {
295 add = add << (keyLength - shiftingBits);
298 bCallArray[i] =
new BucketCall(
"BBucketCall");
302 bCallArray[i]->setBitLength(
BUCKETCALL_L(bCallArray[i]));
322 (2*powShiftingBits*rBucketSize + 7*bucketSize),
325 for (
size_t i = 0; i < bucketVector.size(); i++) {
326 for(uint32_t j = 0; j < bucketVector[i]->getSize(); j++) {
327 if ((simTime() - bucketVector[i]->getLastSeen(
328 bucketVector[i]->
get(j))) > refreshTime
329 || bucketVector[i]->getRTT(bucketVector[i]->
get(j)) == -1) {
336 for (uint32_t i = 0; i < tmpBucket->
getSize(); i++) {
337 pingNode(tmpBucket->
get(i));
342 scheduleAt(simTime() + (refreshTime / 2.0), bucket_timer);
359 for (uint i = 0; i < (uint)abs(dist); i++) {
368 if (((chooseLookup++) % 2) == 0) {
379 int numRedundantNodes,
385 bool isSibling = isSiblingFor(thisNode, key, numSiblings, &err);
388 if (numSiblings < 0) {
390 resultSize = numRedundantNodes;
392 resultSize = isSibling ? (numSiblings ? numSiblings : 1)
395 assert(numSiblings || numRedundantNodes);
405 bBucket->fillVector(result);
406 result->
add(thisNode);
411 std::cout <<
"key: " << key.
toString(2).substr(0, 8)
412 <<
" ThisNode: " << thisNode.getKey().toString(2).substr(0, 8);
413 if (result->size() > 0) {
414 std::cout <<
" next hop (final): " << (*result)[0].getKey().toString(2).substr(0, 8);
416 std::cout <<
" no next hop! (final)";
418 std::cout << std::endl << std::endl;
425 int dist = max(rBucket[0]->longestPrefix(),
426 rBucket[1]->longestPrefix()) + 1 + userDist;
428 if ((dist % shiftingBits) != 0)
429 dist += (shiftingBits - (dist % shiftingBits));
431 if (dist > keyLength) {
432 if ((keyLength % shiftingBits) == 0) {
435 dist = (keyLength - keyLength % shiftingBits);
440 if (!msg->hasObject(
"findNodeExt")) {
449 msg->addObject( findNodeExt );
464 int step = getRoutingDistance(key, thisNode.getKey(),
473 if ((step % shiftingBits) != 0)
474 step += (shiftingBits - (step % shiftingBits));
476 if (step > keyLength) {
477 if ((keyLength % shiftingBits) == 0) {
480 step = (keyLength - keyLength % shiftingBits);
496 bBucket->fillVector(result);
497 result->
add(thisNode);
500 std::cout <<
"key: " << key.
toString(2).substr(0, 8)
501 <<
" dist: " << step <<
" (max: " << findNodeExt->
getMaxDistance() <<
")"
502 <<
" rtkey: " << thisNode.getKey().toString(2).substr(0, 8)
503 <<
" ThisNode: " << thisNode.getKey().toString(2).substr(0, 8);
504 if (result->size() > 0) {
505 std::cout <<
" next hop: " << (*result)[0].getKey().toString(2).substr(0, 8);
507 std::cout <<
" no next hop!";
509 std::cout << std::endl << std::endl;
514 }
else if (step < 0) {
520 for (
int i = 0; i < (-step - 1); i++) {
522 thisNode.getKey().getBit(
530 lBucket->fillVector(result);
531 result->
add(thisNode);
534 std::cout <<
"key: " << key.
toString(2).substr(0, 8)
535 <<
" dist: " << step <<
" (max: " << findNodeExt->
getMaxDistance() <<
")"
536 <<
" rtkey: " << routingKey.
toString(2).substr(0, 8)
537 <<
" ThisNode: " << thisNode.getKey().toString(2).substr(0, 8);
538 if (result->size() > 0) {
539 std::cout <<
" next hop: " << (*result)[0].getKey().toString(2).substr(0, 8);
541 std::cout <<
" no next hop!";
543 std::cout << std::endl << std::endl;
553 shiftingBits)]->fillVector(result);
554 result->
add(thisNode);
557 std::cout <<
"key: " << key.
toString(2).substr(0, 8)
558 <<
" dist: " << step <<
" (max: " << findNodeExt->
getMaxDistance() <<
")"
559 <<
" rtkey: " << (key >> step).toString(2).substr(0, 8)
560 <<
" ThisNode: " << thisNode.getKey().toString(2).substr(0, 8);
561 if (result->size() > 0) {
562 std::cout <<
" next hop: " << (*result)[0].getKey().toString(2).substr(0, 8);
564 std::cout <<
" no next hop!";
566 std::cout << std::endl << std::endl;
575 int numRedundantNodes,
579 if ((state == INIT) || (state == RSET) || (state == FAILED))
584 bool isSibling = isSiblingFor(thisNode, key, numSiblings, &err);
587 if (numSiblings < 0) {
589 resultSize = numRedundantNodes;
591 resultSize = isSibling ? (numSiblings ? numSiblings : 1)
594 assert(numSiblings || numRedundantNodes);
604 bBucket->fillVector(result);
605 result->
add(thisNode);
624 if (!msg->hasObject(
"findNodeExt")) {
629 int dist = max(rBucket[0]->longestPrefix(),
630 rBucket[1]->longestPrefix()) + 1 + userDist;
632 if ((dist % shiftingBits) != 0)
633 dist += (shiftingBits - (dist % shiftingBits));
635 if (dist > keyLength) {
636 if ((keyLength % shiftingBits) == 0) {
639 dist = (keyLength - keyLength % shiftingBits);
643 if ((chooseLookup++) % 2 == 0) {
648 for (
int i = 0; i < dist; i++) {
649 prefix += thisNode.getKey().getBit(thisNode.getKey().getLength() - i - 1) << (dist - i - 1);
653 routeKey = key >> dist;
654 routeKey += (pre << key.
getLength() - dist);
668 msg->addObject( findNodeExt );
683 if (findNodeExt->
getStep() == 0) {
689 bBucket->fillVector(result);
690 result->
add(thisNode);
712 lBucket->fillVector(result);
713 result->
add(thisNode);
734 int dist = findNodeExt->
getStep();
736 for (
int i = 0; i < shiftingBits; i++)
739 routeKey += (pre << (routeKey.
getLength()-shiftingBits));
742 findNodeExt->
setStep(dist - shiftingBits);
748 rBucket[prefix]->fillVector(result);
749 result->
add(thisNode);
765 if ((*result)[0] == thisNode) {
767 return (findNode(key, numRedundantNodes, numSiblings, msg));
775 simtime_t time = globalStatistics->calcMeasuredLifetime(creationTime);
778 globalStatistics->addStdDev(
"Broose: Number of non-routable packets/s", numFailedPackets / time);
779 globalStatistics->addStdDev(
"Broose: Sent BUCKET Messages/s", bucketCount / time);
780 globalStatistics->addStdDev(
"Broose: Sent BUCKET Byte/s", bucketBytesSent / time);
781 globalStatistics->addStdDev(
"Broose: Bucket retries at join", bucketRetries);
789 innerMsg->getEncapsulatedPacket() != NULL) {
796 if ((dynamic_cast<BucketCall*>(innerMsg) != NULL) ||
799 msg->getByteLength());
807 EV <<
"[Broose::displayBucketState() @ " << thisNode.getIp()
808 <<
" (" << thisNode.getKey().toString(16) <<
")]" << endl;
810 for (
int i = 0; i < powShiftingBits; i++) {
811 EV <<
" Content of rBucket[" << i <<
"]: ";
812 rBucket[i]->output();
815 EV <<
" Content of lBucket: ";
817 EV <<
" Content of bBucket: ";
830 error(
"Broose::isSiblingFor(): key is unspecified!");
832 if (node != thisNode)
833 error(
"Broose::isSiblingsFor(): "
834 "node != thisNode is not implemented!");
836 if (numSiblings > getMaxNumSiblings()) {
837 opp_error(
"Broose::isSiblingFor(): numSiblings too big!");
840 if (numSiblings == -1) numSiblings = getMaxNumSiblings();
842 if (numSiblings == 0) {
844 return (node.
getKey() == key);
847 if (state != READY) {
853 return bBucket->keyInRange(key);
859 std::stringstream ttString;
862 ttString << thisNode.getIp() <<
" " << thisNode.getKey();
864 getParentModule()->getParentModule()->getDisplayString().
865 setTagArg(
"tt", 0, ttString.str().c_str());
866 getParentModule()->getDisplayString().
867 setTagArg(
"tt", 0, ttString.str().c_str());
868 getDisplayString().setTagArg(
"tt", 0, ttString.str().c_str());
875 if (state == BSET || state == READY) {
920 handleBucketResponseRpc(_BucketResponse, rpcState);
921 EV <<
"[Broose::handleRpcResponse() @ " << thisNode.getIp()
922 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
923 <<
" Bucket RPC Response received: id=" << rpcState.
getId() <<
"\n"
924 <<
" msg=" << *_BucketResponse <<
" rtt=" << rtt
931 for (uint32_t i=0; i<_FindNodeResponse->getClosestNodesArraySize(); i++)
932 routingAdd(_FindNodeResponse->getClosestNodes(i),
false);
943 EV <<
"[Broose::handleRpcTimeout() @ " << thisNode.getIp()
944 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
945 <<
" Find Node RPC Call timed out: id=" << rpcState.
getId() <<
"\n"
946 <<
" msg=" << *_FindNodeCall
951 handleBucketTimeout(_BucketCall);
952 EV <<
"[Broose::handleRpcTimeout() @ " << thisNode.getIp()
953 <<
" (" << thisNode.getKey().toString(16) <<
")]\n"
954 <<
" Bucket RPC Call timed out: id=" << rpcState.
getId() <<
"\n"
955 <<
" msg=" << *_BucketCall
967 if (stab1 && (state == BSET)) {
974 int size = lBucket->getSize();
978 for (
int i = 0; i < size; i++) {
979 bResponse->
setNodes(i, lBucket->get(i));
992 sendRpcResponse(msg, bResponse);
995 int size = bBucket->getSize();
999 for (
int i = 0; i < size; i++) {
1000 bResponse->
setNodes(i, bBucket->get(i));
1004 sendRpcResponse(msg, bResponse);
1006 error(
"Broose::handleBucketRequestRpc() - Wrong Bucket Type!");
1015 routingAdd(msg->
getNodes(i),
false);
1022 receivedBBucketLookup++;
1024 if (receivedBBucketLookup == numberBBucketLookup)
1030 receivedLBucketLookup++;
1032 if (receivedLBucketLookup == numberLBucketLookup)
1043 receivedJoinResponse++;
1044 if (receivedJoinResponse == powShiftingBits)
1051 error(
"Broose::handleBucketRequestRpc() - unknown error.");
1066 int rpcId, simtime_t rtt) {
1068 routingAdd(pingResponse->
getSrcNode(),
true, rtt);
1073 for (
size_t i = 0; i < bucketVector.size(); i++) {
1074 if (bucketVector[i]->getFailedResponses(handle) == numberRetries)
1075 bucketVector[i]->
remove(handle);
1077 bucketVector[i]->increaseFailedResponses(handle);
1087 routingTimeout(dynamic_cast<const NodeHandle&>(dest));
1092 cPolymorphic* context,
int rpcId)
1094 routingTimeout(dynamic_cast<const NodeHandle&>(dest));
1102 for (
size_t i = 0; i < bucketVector.size(); i++) {
1103 added |= bucketVector[i]->add(node, isAlive, rtt);
1111 for (
size_t i = 0; i < bucketVector.size(); i++) {
1112 bucketVector[i]->setLastSeen(node, simTime());
1119 for (
size_t i = 0; i < bucketVector.size(); i++) {
1120 bucketVector[i]->add(node);
1126 for (
size_t i = 0; i < bucketVector.size(); i++) {
1127 bucketVector[i]->resetFailedResponses(node);
1133 for (
size_t i = 0; i < bucketVector.size(); i++) {
1134 bucketVector[i]->setRTT(node, rtt);