OverSim
Broose.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2007 Institut fuer Telematik, Universitaet Karlsruhe (TH)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 //
18 
24 #include "Broose.h"
25 #include <RpcMacros.h>
26 #include <GlobalStatistics.h>
27 #include <IPAddressResolver.h>
28 #include <BootstrapList.h>
29 #include <LookupListener.h>
30 
31 using namespace std;
32 
34 
36 {
37 private:
39 public:
41  {
42  this->overlay = overlay;
43  }
44 
45  virtual void lookupFinished(AbstractLookup *lookup)
46  {
47  delete this;
48  }
49 };
50 
52 {
53  join_timer = NULL;
54  bucket_timer = NULL;
55  rBucket = NULL;
56  lBucket = NULL;
57  bBucket = NULL;
58 }
60 {
61  // delete timers
62  cancelAndDelete(join_timer);
63  cancelAndDelete(bucket_timer);
64 }
65 
67 {
68  // because of IPAddressResolver, we need to wait until interfaces
69  // are registered, address auto-assignment takes place etc.
70  if (stage != MIN_STAGE_OVERLAY)
71  return;
72 
73  // Broose provides KBR services
74  kbr = true;
75 
76  // fetch some parameters
77  bucketSize = par("bucketSize"); // = k
78  rBucketSize = par("rBucketSize"); // = k'
79  joinDelay = par("joinDelay");
80  shiftingBits = par("brooseShiftingBits");
81  userDist = par("userDist");
82  refreshTime = par("refreshTime");
83  numberRetries = par("numberRetries");
84  stab1 = par("stab1");
85  stab2 = par("stab2");
86 
87  //statistics
88  bucketCount = 0;
89  bucketBytesSent = 0;
90 
91  //init local parameters
92  chooseLookup = 0;
93  receivedJoinResponse = 0;
94  receivedBBucketLookup = 0;
95  numberBBucketLookup = 0;
96  receivedLBucketLookup = 0;
97  numberLBucketLookup = 0;
98  powShiftingBits = 1 << shiftingBits;
99  keyLength = OverlayKey::getLength();
100  numFailedPackets = 0;
101  bucketRetries = 0;
102 
103  // add some watches
104  WATCH(receivedJoinResponse);
105  WATCH(receivedBBucketLookup);
106  WATCH(numberBBucketLookup);
107  WATCH(receivedLBucketLookup);
108  WATCH(numberLBucketLookup);
109  WATCH(state);
110 
111  // get module pointers for all buckets
112  rBucket = new BrooseBucket*[powShiftingBits];
113 
114  for (int i = 0; i < powShiftingBits; i++) {
115  rBucket[i] = check_and_cast<BrooseBucket*>
116  (getParentModule()->getSubmodule("rBucket",i));
117  bucketVector.push_back(rBucket[i]);
118  }
119 
120  lBucket = check_and_cast<BrooseBucket*>
121  (getParentModule()->getSubmodule("lBucket"));
122  bucketVector.push_back(lBucket);
123 
124  bBucket = check_and_cast<BrooseBucket*>
125  (getParentModule()->getSubmodule("bBucket"));
126  bucketVector.push_back(bBucket);
127 
128  // create join and bucket timer
129  join_timer = new cMessage("join_timer");
130  bucket_timer = new cMessage("bucket_timer");
131 }
132 
134 {
135  changeState(INIT);
136 
137  // if the bootstrap node is unspecified we are the only node in the network
138  // so we can skip the "normal" join protocol
139  if (bootstrapNode.isUnspecified()) {
140  changeState(READY);
141  }
142 }
143 
144 
145 void Broose::changeState(int toState)
146 {
147  switch (toState) {
148  case INIT: {
149  state = INIT;
150 
151  // find a new bootstrap node and enroll to the bootstrap list
152  bootstrapNode = bootstrapList->getBootstrapNode();
153 
154  cancelEvent(join_timer);
155  scheduleAt(simTime(), join_timer);
156 
157  // initialize respectively clear the buckets
158  for (int i = 0; i < powShiftingBits; i++) {
159  rBucket[i]->initializeBucket(shiftingBits, i, rBucketSize, this);
160  }
161 
162  lBucket->initializeBucket(-shiftingBits, 0, powShiftingBits*rBucketSize,
163  this);
164  bBucket->initializeBucket(0, 0, 7*bucketSize, this, true);
165 
166  // if we have restarted the join protocol reset parameters
167  receivedBBucketLookup = 0;
168  receivedLBucketLookup = 0;
169  receivedJoinResponse = 0;
170 
171  getParentModule()->getParentModule()->bubble("Enter INIT state.");
172  updateTooltip();
173  break;
174  }
175 
176  case RSET: {
177  state = RSET;
178 
179  BrooseBucket* tmpBucket = new BrooseBucket();
180  tmpBucket->initializeBucket(0, 0, powShiftingBits*rBucketSize, this);
181 
182  for (int i = 0; i < powShiftingBits; i++) {
183  int size = rBucket[i]->getSize();
184 
185  for (int j = 0; j < size; j++) {
186  tmpBucket->add(rBucket[i]->get(j));
187  }
188  }
189 
190  BucketCall** bCall = new BucketCall*[tmpBucket->getSize()];
191  for (uint32_t i = 0; i < tmpBucket->getSize(); i++) {
192  bCall[i] = new BucketCall("LBucketCall");
193  bCall[i]->setBucketType(LEFT);
194  bCall[i]->setProState(PRSET);
195  bCall[i]->setBitLength(BUCKETCALL_L(bcall[i]));
196 
197  sendUdpRpcCall(tmpBucket->get(i), bCall[i], NULL,
198  10);
199  }
200 
201  // half of the calls must return for a state change
202  numberBBucketLookup = ceil((double)tmpBucket->getSize() / 2);
203 
204  delete tmpBucket;
205 
206  getParentModule()->getParentModule()->bubble("Enter RSET state.");
207  break;
208  }
209 
210  case BSET: {
211  state = BSET;
212 
213  // half of the calls must return for a state change
214  numberLBucketLookup = ceil((double)bBucket->getSize() / 2);
215 
216  // send messages to all entries of the B Bucket
217  int size2 = bBucket->getSize();
218  BucketCall** bCall2 = new BucketCall*[size2];
219  for (int i = 0; i < size2; i++) {
220  bCall2[i] = new BucketCall("LBucketCall");
221  bCall2[i]->setBucketType(LEFT);
222  bCall2[i]->setProState(PBSET);
223  bCall2[i]->setBitLength(BUCKETCALL_L(bcall2[i]));
224 
225  sendUdpRpcCall(bBucket->get(i), bCall2[i], NULL,
226  10);
227  }
228 
229  getParentModule()->getParentModule()->bubble("Enter BSET state.");
230  break;
231  }
232 
233  case READY: {
234  state = READY;
235 
236  // fill the bucket also with this node
237  for (size_t i = 0; i < bucketVector.size(); i++) {
238  bucketVector[i]->add(thisNode);
239  }
240 
241  // to disable the ping protocol a pingDelay or
242  // refreshTime of zero was given
243  if (refreshTime != 0) {
244  cancelEvent(bucket_timer);
245  scheduleAt(simTime() + (refreshTime / 2.0), bucket_timer);
246  }
247 
248  getParentModule()->getParentModule()->bubble("Enter READY state.");
249 
250  updateTooltip();
251  break;
252  }
253 
254  }
255  setOverlayReady(state == READY);
256 }
257 
258 void Broose::handleTimerEvent(cMessage* msg)
259 {
260  if (msg == join_timer)
261  handleJoinTimerExpired(msg);
262  else if (msg == bucket_timer)
263  handleBucketTimerExpired(msg);
264  else
265  error("Broose::handleTimerEvent - no other timer currently in use!");
266 }
267 
269 {
270  if (state == READY)
271  return;
272 
273  if (!bootstrapNode.isUnspecified()) {
274  // create new lookup message
275 #if 0
276  BucketCall* bCall = new BucketCall();
277  bCall->setBucketType(BROTHER);
278  bCall->setProState(FAILED);
279  bCall->setBitLength(BUCKETCALL_L(call));
280  sendRouteRpcCall(OVERLAY_COMP, bootstrapNode, thisNode.getKey(),
281  bCall);
282 
283  BucketCall* lCall = new BucketCall();
284  lCall->setBucketType(BROTHER);
285  lCall->setProState(FAILED);
286  lCall->setBitLength(BUCKETCALL_L(call));
287  sendRouteRpcCall(OVERLAY_COMP, bootstrapNode,
288  thisNode.getKey() << shiftingBits, lCall);
289 #endif
290  // do lookups for key >> shiftingBits for each prefix
291  OverlayKey newKey = thisNode.getKey() >> shiftingBits;
292  BucketCall* bCallArray[powShiftingBits];
293  for (int i = 0; i < powShiftingBits; i++) {
294  OverlayKey add(i);
295  add = add << (keyLength - shiftingBits);
296  add += newKey;
297 
298  bCallArray[i] = new BucketCall("BBucketCall");
299  bCallArray[i]->setBucketType(BROTHER);
300  bCallArray[i]->setBucketIndex(i);
301  bCallArray[i]->setProState(PINIT);
302  bCallArray[i]->setBitLength(BUCKETCALL_L(bCallArray[i]));
303 
304  // restart join protocol if one call times out
305  // otherwise the node might be isolated
306  sendRouteRpcCall(OVERLAY_COMP, bootstrapNode, add,
307  bCallArray[i]);
308  }
309  //createLookup()->lookup(getThisNode().getKey() + 1, 0, 0, 0,
310  // new BrooseLookupListener(this));
311  } else {
312  // if the bootstrap node is unspecified we are the only node in the network
313  // so we can skip the "normal" join protocol
314  changeState(READY);
315  }
316 }
317 
319 {
320  BrooseBucket* tmpBucket = new BrooseBucket();
321  tmpBucket->initializeBucket(0, 0,
322  (2*powShiftingBits*rBucketSize + 7*bucketSize),
323  this);
324 
325  for (size_t i = 0; i < bucketVector.size(); i++) {
326  for(uint32_t j = 0; j < bucketVector[i]->getSize(); j++) {
327  if ((simTime() - bucketVector[i]->getLastSeen(
328  bucketVector[i]->get(j))) > refreshTime
329  || bucketVector[i]->getRTT(bucketVector[i]->get(j)) == -1) {
330 
331  tmpBucket->add(BrooseHandle(bucketVector[i]->get(j)));
332  }
333  }
334  }
335 
336  for (uint32_t i = 0; i < tmpBucket->getSize(); i++) {
337  pingNode(tmpBucket->get(i));
338  }
339 
340  delete tmpBucket;
341 
342  scheduleAt(simTime() + (refreshTime / 2.0), bucket_timer);
343 }
344 
345 
347 {
348  return bucketSize;
349 }
350 
352 {
353  return bucketSize;
354 }
355 
357  int dist)
358 {
359  for (uint i = 0; i < (uint)abs(dist); i++) {
360  if (node.sharedPrefixLength(key << i) >= (abs(dist) - i)) {
361  return i; // right shifting
362  }
363  if (key.sharedPrefixLength(node << i) >= (abs(dist) - i)) {
364  return -i; // left shifting
365  }
366  }
367 
368  if (((chooseLookup++) % 2) == 0) {
369  return -dist;
370  } else {
371  return dist;
372  }
373 }
374 
375 #if 0
376 // TODO: work in progress: new findNode() code which tries to calculate
377 // the distance approximation and new routing key in each routing step
379  int numRedundantNodes,
380  int numSiblings,
381  BaseOverlayMessage* msg)
382 {
383  BrooseFindNodeExtMessage *findNodeExt = NULL;
384  bool err;
385  bool isSibling = isSiblingFor(thisNode, key, numSiblings, &err);
386  int resultSize;
387 
388  if (numSiblings < 0) {
389  // exhaustive iterative doesn't care about siblings
390  resultSize = numRedundantNodes;
391  } else {
392  resultSize = isSibling ? (numSiblings ? numSiblings : 1)
393  : numRedundantNodes;
394  }
395  assert(numSiblings || numRedundantNodes);
396  NodeVector* result = new NodeVector(resultSize);
397 
398  if (isSibling) {
399  //return the closest nodes
400  // sort with XOR distance to key
403  result->setComparator(comp);
404 
405  bBucket->fillVector(result);
406  result->add(thisNode);
407 
408  delete comp;
409 
410 
411  std::cout << "key: " << key.toString(2).substr(0, 8)
412  << " ThisNode: " << thisNode.getKey().toString(2).substr(0, 8);
413  if (result->size() > 0) {
414  std::cout << " next hop (final): " << (*result)[0].getKey().toString(2).substr(0, 8);
415  } else {
416  std::cout << " no next hop! (final)";
417  }
418  std::cout << std::endl << std::endl;
419 
420 
421  return result;
422  }
423 
424  // estimate distance
425  int dist = max(rBucket[0]->longestPrefix(),
426  rBucket[1]->longestPrefix()) + 1 + userDist;
427 
428  if ((dist % shiftingBits) != 0)
429  dist += (shiftingBits - (dist % shiftingBits));
430 
431  if (dist > keyLength) {
432  if ((keyLength % shiftingBits) == 0) {
433  dist = keyLength;
434  } else {
435  dist = (keyLength - keyLength % shiftingBits);
436  }
437  }
438 
439  if (msg != NULL) {
440  if (!msg->hasObject("findNodeExt")) {
441  findNodeExt = new BrooseFindNodeExtMessage("findNodeExt");
442 
443  findNodeExt->setMaxDistance(dist);
444 
445  //add contact for next Hop
446  findNodeExt->setLastNode(thisNode);
447  findNodeExt->setBitLength(BROOSEFINDNODEEXTMESSAGE_L);
448 
449  msg->addObject( findNodeExt );
450  }
451 
452  findNodeExt = (BrooseFindNodeExtMessage*) msg->getObject("findNodeExt");
453  }
454 
455  // update buckets with last hop
456  routingAdd(findNodeExt->getLastNode(), true);
457 
458  // replace last hop contact information with
459  // this hop contact information
460  findNodeExt->setLastNode(thisNode);
461 
462  //findNodeExt->setMaxDistance(max(findNodeExt->getMaxDistance(), dist));
463 
464  int step = getRoutingDistance(key, thisNode.getKey(),
465  findNodeExt->getMaxDistance());
466 
467  bool leftShifting;
468  if (step < 0) {
469  leftShifting = true;
470  step *= -1;
471  }
472 
473  if ((step % shiftingBits) != 0)
474  step += (shiftingBits - (step % shiftingBits));
475 
476  if (step > keyLength) {
477  if ((keyLength % shiftingBits) == 0) {
478  step = keyLength;
479  } else {
480  step = (keyLength - keyLength % shiftingBits);
481  }
482  }
483 
484  if (leftShifting) {
485  step *= -1;
486  }
487 
488  // check for messages which couldn't be routed
489  if (step == 0) {
490  //return the closest nodes
491  // sort with XOR distance to key
494  result->setComparator(comp);
495 
496  bBucket->fillVector(result);
497  result->add(thisNode);
498 
499 
500  std::cout << "key: " << key.toString(2).substr(0, 8)
501  << " dist: " << step << " (max: " << findNodeExt->getMaxDistance() << ")"
502  << " rtkey: " << thisNode.getKey().toString(2).substr(0, 8)
503  << " ThisNode: " << thisNode.getKey().toString(2).substr(0, 8);
504  if (result->size() > 0) {
505  std::cout << " next hop: " << (*result)[0].getKey().toString(2).substr(0, 8);
506  } else {
507  std::cout << " no next hop!";
508  }
509  std::cout << std::endl << std::endl;
510 
511 
512  delete comp;
513  return result;
514  } else if (step < 0) {
515  if (state == BSET) {
516  return result;
517  }
518  // Left Shifting Lookup
519  OverlayKey routingKey = key >> (-step - 1);
520  for (int i = 0; i < (-step - 1); i++) {
521  routingKey.setBit(OverlayKey::getLength() - i - 1,
522  thisNode.getKey().getBit(
523  OverlayKey::getLength() - i - 2));
524  }
525 
527  new KeyDistanceComparator<KeyXorMetric>(routingKey);
528 
529  result->setComparator(comp);
530  lBucket->fillVector(result);
531  result->add(thisNode);
532  delete comp;
533 
534  std::cout << "key: " << key.toString(2).substr(0, 8)
535  << " dist: " << step << " (max: " << findNodeExt->getMaxDistance() << ")"
536  << " rtkey: " << routingKey.toString(2).substr(0, 8)
537  << " ThisNode: " << thisNode.getKey().toString(2).substr(0, 8);
538  if (result->size() > 0) {
539  std::cout << " next hop: " << (*result)[0].getKey().toString(2).substr(0, 8);
540  } else {
541  std::cout << " no next hop!";
542  }
543  std::cout << std::endl << std::endl;
544 
545 
546  } else {
547  // Right Shifting Lookup
549  comp = new KeyDistanceComparator<KeyXorMetric>(key << (step - shiftingBits));
550 
551  result->setComparator(comp);
552  rBucket[key.getBitRange(key.getLength() - step - 1,
553  shiftingBits)]->fillVector(result);
554  result->add(thisNode);
555  delete comp;
556 
557  std::cout << "key: " << key.toString(2).substr(0, 8)
558  << " dist: " << step << " (max: " << findNodeExt->getMaxDistance() << ")"
559  << " rtkey: " << (key >> step).toString(2).substr(0, 8)
560  << " ThisNode: " << thisNode.getKey().toString(2).substr(0, 8);
561  if (result->size() > 0) {
562  std::cout << " next hop: " << (*result)[0].getKey().toString(2).substr(0, 8);
563  } else {
564  std::cout << " no next hop!";
565  }
566  std::cout << std::endl << std::endl;
567 
568  }
569 
570  return result;
571 }
572 #endif
573 
575  int numRedundantNodes,
576  int numSiblings,
577  BaseOverlayMessage* msg)
578 {
579  if ((state == INIT) || (state == RSET) || (state == FAILED))
580  return new NodeVector();
581 
582  BrooseFindNodeExtMessage *findNodeExt = NULL;
583  bool err;
584  bool isSibling = isSiblingFor(thisNode, key, numSiblings, &err);
585  int resultSize;
586 
587  if (numSiblings < 0) {
588  // exhaustive iterative doesn't care about siblings
589  resultSize = numRedundantNodes;
590  } else {
591  resultSize = isSibling ? (numSiblings ? numSiblings : 1)
592  : numRedundantNodes;
593  }
594  assert(numSiblings || numRedundantNodes);
595  NodeVector* result = new NodeVector(resultSize);
596 
597  if (isSibling) {
598  //return the closest nodes
599  // sort with XOR distance to key
602  result->setComparator(comp);
603 
604  bBucket->fillVector(result);
605  result->add(thisNode);
606 
607  delete comp;
608 
609  /*
610  std::cout << "key: " << key.toString(2).substr(0, 8)
611  << " ThisNode: " << thisNode.getKey().toString(2).substr(0, 8);
612  if (result->size() > 0) {
613  std::cout << " next hop (final): " << (*result)[0].getKey().toString(2).substr(0, 8);
614  } else {
615  std::cout << " no next hop! (final)";
616  }
617  std::cout << std::endl << std::endl;
618  */
619 
620  return result;
621  }
622 
623  if (msg != NULL) {
624  if (!msg->hasObject("findNodeExt")) {
625  findNodeExt = new BrooseFindNodeExtMessage("findNodeExt");
626 
627  OverlayKey routeKey = thisNode.getKey();
628  // estimate distance
629  int dist = max(rBucket[0]->longestPrefix(),
630  rBucket[1]->longestPrefix()) + 1 + userDist;
631 
632  if ((dist % shiftingBits) != 0)
633  dist += (shiftingBits - (dist % shiftingBits));
634 
635  if (dist > keyLength) {
636  if ((keyLength % shiftingBits) == 0) {
637  dist = keyLength;
638  } else {
639  dist = (keyLength - keyLength % shiftingBits);
640  }
641  }
642 
643  if ((chooseLookup++) % 2 == 0) {
644  // init left shifting lookup
645  findNodeExt->setRightShifting(false);
646 
647  int prefix = 0;
648  for (int i = 0; i < dist; i++) {
649  prefix += thisNode.getKey().getBit(thisNode.getKey().getLength() - i - 1) << (dist - i - 1);
650  }
651 
652  OverlayKey pre(prefix);
653  routeKey = key >> dist;
654  routeKey += (pre << key.getLength() - dist);
655 
656  dist = -dist;
657  } else {
658  // init right shifting lookup
659  findNodeExt->setRightShifting(true);
660  }
661 
662  //add contact for next Hop
663  findNodeExt->setLastNode(thisNode);
664  findNodeExt->setRouteKey(routeKey);
665  findNodeExt->setStep(dist);
666  findNodeExt->setBitLength(BROOSEFINDNODEEXTMESSAGE_L);
667 
668  msg->addObject( findNodeExt );
669  }
670 
671  findNodeExt = (BrooseFindNodeExtMessage*) msg->getObject("findNodeExt");
672  }
673 
674  // update buckets with last hop
675  addNode(findNodeExt->getLastNode());
676  setLastSeen(findNodeExt->getLastNode());
677 
678  // replace last hop contact information with
679  // this hop contact information
680  findNodeExt->setLastNode(thisNode);
681 
682  // brother lookup
683  if (findNodeExt->getStep() == 0) {
684  // return the closest nodes sorted by XOR distance to key
687  result->setComparator(comp);
688 
689  bBucket->fillVector(result);
690  result->add(thisNode);
691 
692  delete comp;
693  return result;
694  }
695 
696  if (findNodeExt->getRightShifting() == false) {
697  // Left Shifting Lookup
698 
699  // can't handle left shifting lookup in BSET-State
700  if (state == BSET)
701  return result;
702 
703  // calculate routing key
704  findNodeExt->setRouteKey((findNodeExt->getRouteKey()) << shiftingBits);
705  findNodeExt->setStep(findNodeExt->getStep() + shiftingBits);
706 
709  findNodeExt->getRouteKey());
710 
711  result->setComparator(comp);
712  lBucket->fillVector(result);
713  result->add(thisNode);
714  delete comp;
715  /*
716  std::cout << "key: " << key.toString(2).substr(0, 8)
717  << " dist: " << findNodeExt->getStep()
718  << " rtkey: " << findNodeExt->getRouteKey().toString(2).substr(0, 8)
719  << " ThisNode: " << thisNode.getKey().toString(2).substr(0, 8);
720  if (result->size() > 0) {
721  std::cout << " next hop: " << (*result)[0].getKey().toString(2).substr(0, 8);
722  } else {
723  std::cout << " no next hop!";
724  }
725  std::cout << std::endl << std::endl;
726  */
727 
728  } else {
729  // Right Shifting Lookup
730 
731 
732  // calculate routing key
733  int prefix = 0;
734  int dist = findNodeExt->getStep();
735  OverlayKey routeKey = findNodeExt->getRouteKey() >> shiftingBits;
736  for (int i = 0; i < shiftingBits; i++)
737  prefix += ((int)key.getBit(key.getLength() - dist + i) << i);
738  OverlayKey pre(prefix);
739  routeKey += (pre << (routeKey.getLength()-shiftingBits));
740 
741  findNodeExt->setRouteKey(routeKey);
742  findNodeExt->setStep(dist - shiftingBits);
743 
745  comp = new KeyDistanceComparator<KeyXorMetric>(routeKey);
746 
747  result->setComparator(comp);
748  rBucket[prefix]->fillVector(result);
749  result->add(thisNode);
750  delete comp;
751  /*
752  std::cout << "key: " << key.toString(2).substr(0, 8)
753  << " dist: " << findNodeExt->getStep()
754  << " rtkey: " << findNodeExt->getRouteKey().toString(2).substr(0, 8)
755  << " ThisNode: " << thisNode.getKey().toString(2).substr(0, 8);
756  if (result->size() > 0) {
757  std::cout << " next hop: " << (*result)[0].getKey().toString(2).substr(0, 8);
758  } else {
759  std::cout << " no next hop!";
760  }
761  std::cout << std::endl << std::endl;
762  */
763  }
764 
765  if ((*result)[0] == thisNode) {
766  delete result;
767  return (findNode(key, numRedundantNodes, numSiblings, msg));
768  } else
769  return result;
770 }
771 
773 {
774  // store statistics
775  simtime_t time = globalStatistics->calcMeasuredLifetime(creationTime);
776  if (time < GlobalStatistics::MIN_MEASURED) return;
777 
778  globalStatistics->addStdDev("Broose: Number of non-routable packets/s", numFailedPackets / time);
779  globalStatistics->addStdDev("Broose: Sent BUCKET Messages/s", bucketCount / time);
780  globalStatistics->addStdDev("Broose: Sent BUCKET Byte/s", bucketBytesSent / time);
781  globalStatistics->addStdDev("Broose: Bucket retries at join", bucketRetries);
782 
783 }
784 
786 {
787  BaseOverlayMessage* innerMsg = msg;
788  while (innerMsg->getType() != APPDATA &&
789  innerMsg->getEncapsulatedPacket() != NULL) {
790  innerMsg =
791  static_cast<BaseOverlayMessage*>(innerMsg->getEncapsulatedPacket());
792  }
793 
794  switch (innerMsg->getType()) {
795  case RPC:
796  if ((dynamic_cast<BucketCall*>(innerMsg) != NULL) ||
797  (dynamic_cast<BucketResponse*>(innerMsg) != NULL)) {
798  RECORD_STATS(bucketCount++; bucketBytesSent +=
799  msg->getByteLength());
800  }
801  break;
802  }
803 }
804 
806 {
807  EV << "[Broose::displayBucketState() @ " << thisNode.getIp()
808  << " (" << thisNode.getKey().toString(16) << ")]" << endl;
809 
810  for (int i = 0; i < powShiftingBits; i++) {
811  EV << " Content of rBucket[" << i << "]: ";
812  rBucket[i]->output();
813  }
814 
815  EV << " Content of lBucket: ";
816  lBucket->output();
817  EV << " Content of bBucket: ";
818  bBucket->output();
819  EV << endl;
820 }
821 
822 
824  const OverlayKey& key,
825  int numSiblings,
826  bool* err)
827 {
828 // TODO: node != thisNode doesn't work yet
829  if (key.isUnspecified())
830  error("Broose::isSiblingFor(): key is unspecified!");
831 
832  if (node != thisNode)
833  error("Broose::isSiblingsFor(): "
834  "node != thisNode is not implemented!");
835 
836  if (numSiblings > getMaxNumSiblings()) {
837  opp_error("Broose::isSiblingFor(): numSiblings too big!");
838  }
839  // set default number of siblings to consider
840  if (numSiblings == -1) numSiblings = getMaxNumSiblings();
841 
842  if (numSiblings == 0) {
843  *err = false;
844  return (node.getKey() == key);
845  }
846 
847  if (state != READY) {
848  *err = true;
849  return false;
850  }
851 
852  // TODO: handle numSibling parameter
853  return bBucket->keyInRange(key);
854 }
855 
857 {
858  if (ev.isGUI()) {
859  std::stringstream ttString;
860 
861  // show our ip and key in tooltip
862  ttString << thisNode.getIp() << " " << thisNode.getKey();
863 
864  getParentModule()->getParentModule()->getDisplayString().
865  setTagArg("tt", 0, ttString.str().c_str());
866  getParentModule()->getDisplayString().
867  setTagArg("tt", 0, ttString.str().c_str());
868  getDisplayString().setTagArg("tt", 0, ttString.str().c_str());
869 
870  }
871 }
872 
874 {
875  if (state == BSET || state == READY) {
876  // delegate messages
877  RPC_SWITCH_START(msg)
878  RPC_DELEGATE(Bucket, handleBucketRequestRpc);
879  RPC_ON_CALL(Ping) {
880  // add pinging node to all buckets and update lastSeen of node
881  routingAdd(msg->getSrcNode(), true);
882  return false;
883  break;
884  }
885  RPC_ON_CALL(FindNode) {
886  // add pinging node to all buckets and update lastSeen of node
887  routingAdd(msg->getSrcNode(), true);
888  return false;
889  break;
890  }
892  return RPC_HANDLED;
893  } else {
894  RPC_SWITCH_START(msg)
895  // don't answer PING and FIND_NODE calls, if the node can't route yet
896  RPC_ON_CALL(Ping) {
897  delete msg;
898  return true;
899  break;
900  }
901  RPC_ON_CALL(FindNode) {
902  delete msg;
903  return true;
904  break;
905  }
907  return RPC_HANDLED;
908  }
909 }
910 
912  const RpcState& rpcState,
913  simtime_t rtt)
914 {
915  // add sender to all buckets and update lastSeen of node
916  routingAdd(msg->getSrcNode(), true, rtt);
917 
918  RPC_SWITCH_START(msg)
919  RPC_ON_RESPONSE( Bucket ) {
920  handleBucketResponseRpc(_BucketResponse, rpcState);
921  EV << "[Broose::handleRpcResponse() @ " << thisNode.getIp()
922  << " (" << thisNode.getKey().toString(16) << ")]\n"
923  << " Bucket RPC Response received: id=" << rpcState.getId() << "\n"
924  << " msg=" << *_BucketResponse << " rtt=" << rtt
925  << endl;
926  break;
927  }
928  RPC_ON_RESPONSE(FindNode)
929  {
930  // add inactive nodes
931  for (uint32_t i=0; i<_FindNodeResponse->getClosestNodesArraySize(); i++)
932  routingAdd(_FindNodeResponse->getClosestNodes(i), false);
933  break;
934  }
935  RPC_SWITCH_END( )
936 }
937 
938 void Broose::handleRpcTimeout(const RpcState& rpcState)
939 {
940  RPC_SWITCH_START(rpcState.getCallMsg())
941  RPC_ON_CALL(FindNode) {
942  handleFindNodeTimeout(_FindNodeCall, rpcState.getDest(), rpcState.getDestKey());
943  EV << "[Broose::handleRpcTimeout() @ " << thisNode.getIp()
944  << " (" << thisNode.getKey().toString(16) << ")]\n"
945  << " Find Node RPC Call timed out: id=" << rpcState.getId() << "\n"
946  << " msg=" << *_FindNodeCall
947  << endl;
948  break;
949  }
950  RPC_ON_CALL(Bucket) {
951  handleBucketTimeout(_BucketCall);
952  EV << "[Broose::handleRpcTimeout() @ " << thisNode.getIp()
953  << " (" << thisNode.getKey().toString(16) << ")]\n"
954  << " Bucket RPC Call timed out: id=" << rpcState.getId() << "\n"
955  << " msg=" << *_BucketCall
956  << endl;
957  break;
958  }
960 }
961 
963 {
964  if (msg->getBucketType() == LEFT) {
965  // TODO: dependent on the churn scenarios this may give better
966  // or worse results
967  if (stab1 && (state == BSET)) {
968  // can't handle LBucketRequest in BSET-State
969  delete msg;
970  return;
971  }
972 
973  // return L-Bucket
974  int size = lBucket->getSize();
975  BucketResponse* bResponse = new BucketResponse("LBucketResponse");
976  bResponse->setNodesArraySize(size);
977 
978  for (int i = 0; i < size; i++) {
979  bResponse->setNodes(i, lBucket->get(i));
980  }
981 
982  bResponse->setBitLength(BUCKETRESPONSE_L(bResponse));
983 
984  // only add, if the originator is already in the BSET state
985  // in which the node already is able to do right shifting lookups
986  // TODO: this leads to lower lookup success rates in some scenarios
987  // but helps to prevent deadlock situations with high churn rates
988  if (stab2 || (msg->getProState() == PBSET)) {
989  routingAdd(msg->getSrcNode(), true);
990  }
991 
992  sendRpcResponse(msg, bResponse);
993  } else if (msg->getBucketType() == BROTHER) {
994  // return B-Bucket
995  int size = bBucket->getSize();
996  BucketResponse* bResponse = new BucketResponse("BBucketResponse");
997  bResponse->setNodesArraySize(size);
998 
999  for (int i = 0; i < size; i++) {
1000  bResponse->setNodes(i, bBucket->get(i));
1001  }
1002  bResponse->setBitLength(BUCKETRESPONSE_L(bResponse));
1003 
1004  sendRpcResponse(msg, bResponse);
1005  } else
1006  error("Broose::handleBucketRequestRpc() - Wrong Bucket Type!");
1007 }
1008 
1010  const RpcState& rpcState)
1011 {
1012  BucketCall* call = check_and_cast<BucketCall*>(rpcState.getCallMsg());
1013 
1014  for (uint i = 0; i < msg->getNodesArraySize(); i++) {
1015  routingAdd(msg->getNodes(i), false);
1016  }
1017 
1018  if (call->getBucketType() == LEFT) {
1019  switch (state) {
1020  case RSET:
1021  if (call->getProState() == PRSET) {
1022  receivedBBucketLookup++;
1023 
1024  if (receivedBBucketLookup == numberBBucketLookup)
1025  changeState(BSET);
1026  }
1027  break;
1028  case BSET:
1029  if (call->getProState() == PBSET) {
1030  receivedLBucketLookup++;
1031 
1032  if (receivedLBucketLookup == numberLBucketLookup)
1033  changeState(READY);
1034  }
1035  break;
1036  default:
1037  break;
1038  }
1039  } else if (call->getBucketType() == BROTHER) {
1040  switch(state) {
1041  case INIT:
1042  if (call->getProState() == PINIT) {
1043  receivedJoinResponse++;
1044  if (receivedJoinResponse == powShiftingBits)
1045  changeState(RSET);
1046  }
1047  default:
1048  break;
1049  }
1050  } else
1051  error("Broose::handleBucketRequestRpc() - unknown error.");
1052 }
1053 
1054 
1056 {
1057  if (state == READY)
1058  return;
1059  else {
1060  bucketRetries++;
1061  changeState(INIT);
1062  }
1063 }
1064 
1065 void Broose::pingResponse(PingResponse* pingResponse, cPolymorphic* context,
1066  int rpcId, simtime_t rtt) {
1067  // if node respond reset failedResponses and add lastSeen to node
1068  routingAdd(pingResponse->getSrcNode(), true, rtt);
1069 }
1070 
1072 {
1073  for (size_t i = 0; i < bucketVector.size(); i++) {
1074  if (bucketVector[i]->getFailedResponses(handle) == numberRetries)
1075  bucketVector[i]->remove(handle);
1076  else
1077  bucketVector[i]->increaseFailedResponses(handle);
1078  }
1079  // TODO: if we loose the last node (despite ourself) from the
1080  // B bucket, we should call join() to rejoin the network
1081 }
1082 
1084  const TransportAddress& dest,
1085  const OverlayKey& destKey)
1086 {
1087  routingTimeout(dynamic_cast<const NodeHandle&>(dest));
1088 }
1089 
1091  const TransportAddress& dest,
1092  cPolymorphic* context, int rpcId)
1093 {
1094  routingTimeout(dynamic_cast<const NodeHandle&>(dest));
1095 }
1096 
1097 bool Broose::routingAdd(const NodeHandle& node, bool isAlive,
1098  simtime_t rtt)
1099 {
1100  bool added = false;
1101 
1102  for (size_t i = 0; i < bucketVector.size(); i++) {
1103  added |= bucketVector[i]->add(node, isAlive, rtt);
1104  }
1105 
1106  return added;
1107 }
1108 
1110 {
1111  for (size_t i = 0; i < bucketVector.size(); i++) {
1112  bucketVector[i]->setLastSeen(node, simTime());
1113  }
1114 }
1115 
1116 void Broose::addNode(const NodeHandle& node)
1117 {
1118  // add node to all buckets
1119  for (size_t i = 0; i < bucketVector.size(); i++) {
1120  bucketVector[i]->add(node);
1121  }
1122 }
1123 
1125 {
1126  for (size_t i = 0; i < bucketVector.size(); i++) {
1127  bucketVector[i]->resetFailedResponses(node);
1128  }
1129 }
1130 
1131 void Broose::setRTT(const NodeHandle& node, simtime_t rtt)
1132 {
1133  for (size_t i = 0; i < bucketVector.size(); i++) {
1134  bucketVector[i]->setRTT(node, rtt);
1135  }
1136 }
1137 
1138