OverSim
Gia.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2006 Institut fuer Telematik, Universitaet Karlsruhe (TH)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 //
18 
24 #include <assert.h>
25 
26 #include <UDPControlInfo_m.h>
27 #include <IPAddressResolver.h>
28 #include <GlobalStatistics.h>
29 #include <CommonMessages_m.h>
30 #include <ExtAPIMessages_m.h>
31 #include <InitStages.h>
32 #include <BootstrapList.h>
33 
34 #include "Gia.h"
35 
36 
38 
39 void Gia::initializeOverlay(int stage)
40 {
41  // wait until IPAddressResolver initialized all interfaces and assigns addresses
42  if(stage != MIN_STAGE_OVERLAY)
43  return;
44 
45  // Get parameters from omnetpp.ini
46  maxNeighbors = par("maxNeighbors");
47  minNeighbors = par("minNeighbors");
48  maxTopAdaptionInterval = par("maxTopAdaptionInterval");
49  topAdaptionAggressiveness = par("topAdaptionAggressiveness");
50  maxLevelOfSatisfaction = par("maxLevelOfSatisfaction");
51  updateDelay = par("updateDelay");
52  maxHopCount = par("maxHopCount"); //obsolete
53  messageTimeout = par("messageTimeout");
54  neighborTimeout = par("neighborTimeout");
55  sendTokenTimeout = par("sendTokenTimeout");
56  tokenWaitTime = par("tokenWaitTime");
57  keyListDelay = par("keyListDelay");
58  outputNodeDetails = par("outputNodeDetails");
59  optimizeReversePath = par("optimizeReversePath");
60 
61  // get references on necessary modules
62  keyListModule = check_and_cast<GiaKeyListModule*>
63  (getParentModule()->getSubmodule("keyListModule"));
64  neighbors = check_and_cast<GiaNeighbors*>
65  (getParentModule()->getSubmodule("neighbors"));
66  tokenFactory = check_and_cast<GiaTokenFactory*>
67  (getParentModule()->getSubmodule("tokenFactory"));
68 
70 
71  // clear neighbor candidate list
72  neighCand.clear();
73  knownNodes.clear();
74 
75  WATCH(thisGiaNode);
76  WATCH(bootstrapNode);
77  WATCH(levelOfSatisfaction);
78 
79  // self-messages
80  satisfaction_timer = new cMessage("satisfaction_timer");
81  update_timer = new cMessage("update_timer");
82  timedoutMessages_timer = new cMessage("timedoutMessages_timer");
83  timedoutNeighbors_timer = new cMessage("timedoutNeighbors_timer");
84  sendKeyList_timer = new cMessage("sendKeyList_timer");
85  sendToken_timer = new cMessage("sendToken_timer");
86 
87  // statistics
88  stat_joinCount = 0;
90  stat_joinREQ = 0;
92  stat_joinRSP = 0;
94  stat_joinACK = 0;
96  stat_joinDNY = 0;
102  stat_tokenMessages = 0;
106  stat_routeMessages = 0;
108  stat_maxNeighbors = 0;
114 }
115 
117 {
118  changeState(INIT);
119 
122 }
123 
124 void Gia::changeState(int toState)
125 {
126  switch (toState) {
127  case INIT: {
128  state = INIT;
129 
130  setOverlayReady(false);
132 
133  // set up local nodehandle
134  //thisGiaNode.setNodeHandle(thisNode);
136 
137  callUpdate(thisNode, true);
138 
139  // get possible bandwidth from ppp-Module
140  double capacity = 0;
141 
142  cModule* nodeModule = getParentModule()->getParentModule();
143 
144  if(!nodeModule->hasGate("pppg$i"))
145  capacity += uniform(1,800000);
146  else {
147  // this relies on IPv4Underlay
148  int gateSize = nodeModule->gateSize("pppg$i");
149  for (int i=0; i<gateSize; i++) {
150  cGate* currentGate = nodeModule->gate("pppg$i",i);
151  if (currentGate->isConnected())
152  capacity += check_and_cast<cDatarateChannel *>
153  (currentGate->getPreviousGate()->getChannel())->getDatarate()
154  - uniform(0,800000);
155  }
156  }
157 
158  thisGiaNode.setCapacity(capacity);
159  //thisGiaNode.setConnectionDegree(0);
160  //thisGiaNode.setReceivedTokens(0);
161  //thisGiaNode.setSentTokens(0);
162 
163  connectionDegree = 0;
164  receivedTokens = 0;
165  sentTokens = 0;
166 
167  if (outputNodeDetails)
168  EV << "(Gia) Node details: " << thisGiaNode << endl;
169 
170  // get our entry point to GIA-Overlay
174  //else {
175  //assert(!(thisGiaNode.getNodeHandle().isUnspecified()));
176  //globalNodeList->registerPeer(thisGiaNode.getNodeHandle());
177  //}
178 
179  // register node at TokenFactory
180  //tokenFactory->setNode(&thisGiaNode);
183 
184  if (debugOutput)
185  EV << "(Gia) Node " << thisGiaNode.getKey()
186  << " (" << thisGiaNode.getIp() << ":"
187  << thisGiaNode.getPort() << ") with capacity: "
188  << thisGiaNode.getCapacity() << " entered INIT state." << endl;
189 
190  getParentModule()->getParentModule()->bubble("Enter INIT state.");
191 
192  // schedule satisfaction_timer
193  cancelEvent(satisfaction_timer);
194  scheduleAt(simTime(), satisfaction_timer);
195 
196  // schedule timedoutMessages_timer
197  cancelEvent(timedoutMessages_timer);
198  scheduleAt(simTime() + messageTimeout,
200 
201  cancelEvent(timedoutNeighbors_timer);
202  scheduleAt(simTime() + neighborTimeout,
204 
205  cancelEvent(sendToken_timer);
206  scheduleAt(simTime() + sendTokenTimeout,
208 
209  break;
210  }
211  case READY:
212  state = READY;
213  setOverlayReady(true);
214  break;
215  }
216  updateTooltip();
217 }
218 
219 // void Gia::setBootstrapedIcon()
220 // {
221 // if (ev.isGUI()) {
222 // if (state == READY) {
223 // getParentModule()->getParentModule()->getDisplayString().
224 // setTagArg("i2", 1, "");
225 // getDisplayString().setTagArg("i", 1, "");
226 // } else {
227 // getParentModule()->getParentModule()->getDisplayString().
228 // setTagArg("i2", 1, "red");
229 // getDisplayString().setTagArg("i", 1, "red");
230 // }
231 // }
232 // }
233 
235 {
236  if (ev.isGUI()) {
237 // if (state == READY) {
238 // getParentModule()->getParentModule()->getDisplayString().
239 // setTagArg("i2", 1, "");
240 // getDisplayString().setTagArg("i", 1, "");
241 // } else {
242 // getParentModule()->getParentModule()->getDisplayString().
243 // setTagArg("i2", 1, "red");
244 // getDisplayString().setTagArg("i", 1, "red");
245 // }
246 
247  std::stringstream ttString;
248 
249  // show our predecessor and successor in tooltip
250  ttString << thisNode << "\n# Neighbors: "
251  << neighbors->getSize();
252 
253  getParentModule()->getParentModule()->getDisplayString().
254  setTagArg("tt", 0, ttString.str().c_str());
255  getParentModule()->getDisplayString().
256  setTagArg("tt", 0, ttString.str().c_str());
257  getDisplayString().setTagArg("tt", 0, ttString.str().c_str());
258  }
259 }
260 
261 void Gia::handleTimerEvent(cMessage* msg)
262 {
263  if (msg == sendToken_timer) {
265  } else if (msg->isName("satisfaction_timer")) {
266  // calculate level of satisfaction and select new neighbor candidate
267 
273 
274  // start again satisfaction_timer
275  scheduleAt(simTime() + (maxTopAdaptionInterval *
277  -(1 - levelOfSatisfaction))),
279 
280  // Only search a new neighbor if level of satisfaction is
281  // under level of maxLevelOfSatisfaction
283  if(knownNodes.getSize() == 0) {
284  if(neighbors->getSize() == 0 && neighCand.getSize() == 0)
285  knownNodes.add(globalNodeList->getBootstrapNode()/*bootstrapList->getBootstrapNode()*/);
286  else
287  return;
288  }
289 
290  NodeHandle possibleNeighbor = knownNodes.getRandomCandidate();
291 
292  if(!(possibleNeighbor.isUnspecified()) &&
293  thisGiaNode != possibleNeighbor &&
294  !neighbors->contains(possibleNeighbor) &&
295  !neighCand.contains(possibleNeighbor)) {
296  // try to add new neighbor
297  neighCand.add(possibleNeighbor);
298  sendMessage_JOIN_REQ(possibleNeighbor);
299  }
300  }
301  } else if (msg->isName("update_timer")) {
302  // send current capacity and degree to all neighbors
303  for (uint32_t i=0; i<neighbors->getSize(); i++) {
305  }
306  } else if (msg->isName("timedoutMessages_timer")) {
307  // remove timedout messages
309  scheduleAt(simTime() + messageTimeout,
311  } else if (msg->isName("timedoutNeighbors_timer")) {
312  // remove timedout neighbors
314  if (neighbors->getSize() == 0) {
315  changeState(INIT);
316  }
317  cancelEvent(timedoutNeighbors_timer);
318  scheduleAt(simTime() + neighborTimeout,
320  } else if (msg->isName("sendKeyList_timer")) {
321  if (keyList.getSize() > 0) {
322  // send keyList to all of our neighbors
323  for (uint32_t i=0; i<neighbors->getSize(); i++)
325  }
326  } else {
327  // other self-messages are notoken-self-messages
328  // with an encapsulated message
329  const std::string id = msg->getName();
330  if (id.substr(0, 16) == std::string("wait-for-token: ")) {
331  cPacket* packet = check_and_cast<cPacket*>(msg);
332  cMessage* decapsulatedMessage = packet->decapsulate();
333  if (dynamic_cast<GiaIDMessage*>(decapsulatedMessage) != NULL) {
334  GiaIDMessage* message = check_and_cast<GiaIDMessage*>
335  (decapsulatedMessage);
336  forwardMessage(message, false);
337  }
338  } else if (id.substr(0, 24) == std::string("wait-for-token-fromapp: ")) {
339  cPacket* packet = check_and_cast<cPacket*>(msg);
340  cMessage* decapsulatedMessage = packet->decapsulate();
341  if (dynamic_cast<GiaIDMessage*>(decapsulatedMessage) != NULL) {
342  GiaIDMessage* message = check_and_cast<GiaIDMessage*>
343  (decapsulatedMessage);
344  forwardMessage(message, true);
345  }
346  }
347  delete msg;
348  }
349 }
350 
352 {
353  if(debugOutput)
354  EV << "(Gia) " << thisGiaNode << " received udp message" << endl;
355 
356  cPolymorphic* ctrlInfo = msg->removeControlInfo();
357  if(ctrlInfo != NULL)
358  delete ctrlInfo;
359 
360  // Parse TokenMessages
361  if (dynamic_cast<TokenMessage*>(msg) != NULL) {
362  TokenMessage* giaMsg = check_and_cast<TokenMessage*>(msg);
363 
364  // Process TOKEN-Message
365  if ((giaMsg->getCommand() == TOKEN)) {
366  if(debugOutput)
367  EV << "(Gia) Received Tokenmessage from "
368  << giaMsg->getSrcNode() << endl;
369 
370  //neighbors->setReceivedTokens(giaMsg->getSrcNode(), giaMsg->getDstTokenNr());
372  updateNeighborList(giaMsg);
373  }
374  delete msg;
375  }
376 
377  // Process Route messages
378  else if (dynamic_cast<GiaRouteMessage*>(msg) != NULL) {
379  GiaRouteMessage* giaMsg = check_and_cast<GiaRouteMessage*>(msg);
380  GiaNode oppositeNode(giaMsg->getSrcNode(), giaMsg->getSrcCapacity(),
381  giaMsg->getSrcDegree());
382 
383  if((giaMsg->getCommand() == ROUTE)) {
384  if(debugOutput)
385  EV << "(Gia) Received ROUTE::IND from " << oppositeNode << endl;
386 
387  if(state == READY) {
388  //neighbors->decreaseReceivedTokens(giaMsg->getSrcNode());
389  updateNeighborList(giaMsg);
390  forwardMessage(giaMsg, false);
391  }
392  }
393  }
394 
395  // Process KeyList-Messages
396  else if (dynamic_cast<KeyListMessage*>(msg) != NULL) {
397  KeyListMessage* giaMsg = check_and_cast<KeyListMessage*>(msg);
398  GiaNode oppositeNode(giaMsg->getSrcNode(), giaMsg->getSrcCapacity(),
399  giaMsg->getSrcDegree());
400 
401  if (giaMsg->getCommand() == KEYLIST) {
402  if (debugOutput)
403  EV << "(Gia) " << thisGiaNode
404  << " received KEYLIST:IND message" << endl;
405  // update KeyList in neighborList
406  uint32_t keyListSize = giaMsg->getKeysArraySize();
407  GiaKeyList neighborKeyList;
408  for (uint32_t k = 0; k < keyListSize; k++)
409  neighborKeyList.addKeyItem(giaMsg->getKeys(k));
410  neighbors->setNeighborKeyList(giaMsg->getSrcNode(), neighborKeyList);
411  }
412  delete giaMsg;
413  }
414 
415  // Process Search-Messages
416  else if (dynamic_cast<SearchMessage*>(msg) != NULL) {
417  SearchMessage* giaMsg = check_and_cast<SearchMessage*>(msg);
418  GiaNode oppositeNode(giaMsg->getSrcNode(), giaMsg->getSrcCapacity(),
419  giaMsg->getSrcDegree());
420 
421  //neighbors->decreaseSentTokens(giaMsg->getSrcNode());
422  updateNeighborList(giaMsg);
423  processSearchMessage(giaMsg, false);
424  // } else {
425  // EV << "(Gia) Message " << msg << " dropped!" << endl;
426  // RECORD_STATS(numDropped++; bytesDropped += msg->getByteLength());
427  // delete msg;
428  // }
429  }
430 
431  // Process Search-Response-Messages
432  else if (dynamic_cast<SearchResponseMessage*>(msg) != NULL) {
433  SearchResponseMessage* responseMsg =
434  check_and_cast<SearchResponseMessage*>(msg);
435  forwardSearchResponseMessage(responseMsg);
436  }
437 
438  // Process Gia messages
439  else if (dynamic_cast<GiaMessage*>(msg) != NULL) {
440  GiaMessage* giaMsg = check_and_cast<GiaMessage*>(msg);
441 
442  //assert(giaMsg->getSrcNode().moduleId != -1);
443  GiaNode oppositeNode(giaMsg->getSrcNode(), giaMsg->getSrcCapacity(),
444  giaMsg->getSrcDegree());
445 
446  if (debugOutput)
447  EV << "(Gia) " << thisGiaNode << " received GIA- message from "
448  << oppositeNode << endl;
449  updateNeighborList(giaMsg);
450 
451  // Process JOIN:REQ messages
452  if (giaMsg->getCommand() == JOIN_REQUEST) {
453  if (debugOutput)
454  EV << "(Gia) " << thisGiaNode
455  << " received JOIN:REQ message" << endl;
456  if (acceptNode(oppositeNode, giaMsg->getSrcDegree())) {
457  neighCand.add(oppositeNode);
458  sendMessage_JOIN_RSP(oppositeNode);
459  } else {
460  if (debugOutput)
461  EV << "(Gia) " << thisGiaNode << " denies node "
462  << oppositeNode << endl;
463  sendMessage_JOIN_DNY(oppositeNode);
464  }
465  }
466 
467  // Process JOIN:RSP messages
468  else if (giaMsg->getCommand() == JOIN_RESPONSE) {
469  if(debugOutput)
470  EV << "(Gia) " << thisGiaNode << " received JOIN:RSP message"
471  << endl;
472  if(neighCand.contains(oppositeNode)) {
473  neighCand.remove(oppositeNode);
474  if(acceptNode(oppositeNode, giaMsg->getSrcDegree())) {
475  addNeighbor(oppositeNode, giaMsg->getSrcDegree());
476 
477  GiaNeighborMessage* msg =
478  check_and_cast<GiaNeighborMessage*>(giaMsg);
479  for(uint32_t i = 0; i < msg->getNeighborsArraySize(); i++) {
480  GiaNode temp = msg->getNeighbors(i);
481  if(temp != thisGiaNode && temp != oppositeNode)
482  knownNodes.add(temp);
483  }
484 
485  sendMessage_JOIN_ACK(oppositeNode);
486  } else {
487  if (debugOutput)
488  EV << "(Gia) " << thisGiaNode << " denies node "
489  << oppositeNode << endl;
490  sendMessage_JOIN_DNY(oppositeNode);
491  }
492  }
493  }
494 
495  // Process JOIN:ACK messages
496  else if (giaMsg->getCommand() == JOIN_ACK) {
497  if (debugOutput)
498  EV << "(Gia) " << thisGiaNode << " received JOIN:ACK message"
499  << endl;
500  if (neighCand.contains(oppositeNode) &&
502  neighCand.remove(oppositeNode);
503  addNeighbor(oppositeNode, giaMsg->getSrcDegree());
504 
505  GiaNeighborMessage* msg =
506  check_and_cast<GiaNeighborMessage*>(giaMsg);
507 
508  for(uint32_t i = 0; i < msg->getNeighborsArraySize(); i++) {
509  GiaNode temp = msg->getNeighbors(i);
510  if(temp != thisGiaNode && temp != oppositeNode)
511  knownNodes.add(msg->getNeighbors(i));
512  }
513  } else {
514  sendMessage_DISCONNECT(oppositeNode);
515  }
516 
517  }
518 
519  // Process JOIN:DNY messages
520  else if (giaMsg->getCommand() == JOIN_DENY) {
521  if (debugOutput)
522  EV << "(Gia) " << thisGiaNode << " received JOIN:DNY message"
523  << endl;
524 
525  if (neighCand.contains(oppositeNode))
526  neighCand.remove(oppositeNode);
527  knownNodes.remove(oppositeNode);
528 
529  }
530 
531 
532  // Process DISCONNECT-Message
533  else if (giaMsg->getCommand() == DISCONNECT) {
534  if (debugOutput)
535  EV << "(Gia) " << thisGiaNode << " received DISCONNECT:IND message" << endl;
536  removeNeighbor(giaMsg->getSrcNode());
537  }
538 
539  // Process UPDATE-Message
540  else if (giaMsg->getCommand() == UPDATE) {
541  if (debugOutput)
542  EV << "(Gia) " << thisGiaNode << " received UPDATE:IND message"
543  << endl;
544 
546  giaMsg->getSrcDegree());
547  //neighbors->setCapacity(giaMsg->getSrcNode(),
548  //giaMsg->getSrcCapacity());
549  } else {
550  // Show unknown gia-messages
551  if (debugOutput) {
552  EV << "(Gia) NODE: "<< thisGiaNode << endl
553  << " Command: " << giaMsg->getCommand() << endl
554  << " HopCount: " << giaMsg->getHopCount() << endl
555  << " SrcKey: " << giaMsg->getSrcNode().getKey() << endl
556  << " SrcIP: " << giaMsg->getSrcNode().getKey() << endl
557  << " SrcPort: " << giaMsg->getSrcNode().getPort() << endl
558  << " SrcCapacity: " << giaMsg->getSrcCapacity() << endl
559  << " SrcDegree: " << giaMsg->getSrcDegree() << endl;
560 
561  RECORD_STATS(numDropped++;bytesDropped += giaMsg->getByteLength());
562  }
563  }
564  delete giaMsg;
565  } else // PROCESS other messages than GiaMessages
566  delete msg; // delete them all
567 }
568 
569 bool Gia::acceptNode(const GiaNode& nNode, unsigned int degree)
570 {
571  if (neighbors->contains(nNode))
572  return false;
573 
574  if (neighbors->getSize() < maxNeighbors) {
575  // we have room for new node: accept node
576  return true;
577  }
578  // we need to drop a neighbor
579  NodeHandle dropCandidate = neighbors->getDropCandidate(nNode.getCapacity(), degree);
580  if(!dropCandidate.isUnspecified()) {
581  if (debugOutput)
582  EV << "(Gia) " << thisGiaNode << " drops "
583  << dropCandidate << endl;
584  neighbors->remove(dropCandidate);
585  sendMessage_DISCONNECT(dropCandidate);
586  return true;
587  }
588  return false;
589 }
590 
591 
592 void Gia::addNeighbor(GiaNode& node, unsigned int degree)
593 {
594  assert(neighbors->getSize() < maxNeighbors);
595 
597  EV << "(Gia) " << thisGiaNode << " accepted new neighbor " << node << endl;
598  getParentModule()->getParentModule()->bubble("New neighbor");
601  if (stat_maxNeighbors < neighbors->getSize()) {
603  }
604 
605  cancelEvent(update_timer);
606  scheduleAt(simTime() + updateDelay, update_timer);
607 
608  //node.setSentTokens(5);
609  //node.setReceivedTokens(5);
610 
611  // send keyList to new Neighbor
612  if (keyList.getSize() > 0)
613  sendKeyListToNeighbor(node);
614  neighbors->add(node, degree);
615 
616  showOverlayNeighborArrow(node, false,
617  "m=m,50,0,50,0;ls=red,1");
618  updateTooltip();
619 }
620 
621 void Gia::removeNeighbor(const GiaNode& node)
622 {
624 
625  if (debugOutput)
626  EV << "(Gia) " << thisGiaNode << " removes " << node
627  << " from neighborlist." << endl;
628  neighbors->remove(node);
629 
631 
632  if (neighbors->getSize() == 0) {
633  changeState(INIT);
634  }
635 
637  updateTooltip();
638 
639  cancelEvent(update_timer);
640  scheduleAt(simTime() + updateDelay, update_timer);
641 }
642 
644 {
645  uint32_t neighborsCount = neighbors->getSize();
646  if(neighborsCount < minNeighbors)
647  return 0.0;
648 
649  double total = 0.0;
650  double levelOfSatisfaction = 0.0;
651 
652  for (uint32_t i=0; i < neighborsCount; i++)
653  total += neighbors->get(i).getCapacity() / neighborsCount;
654 
655  assert(thisGiaNode.getCapacity() != 0);
656  levelOfSatisfaction = total / thisGiaNode.getCapacity();
657 
658  if ((levelOfSatisfaction > 1.0) || (neighborsCount >= maxNeighbors))
659  return 1.0;
660  return levelOfSatisfaction;
661 }
662 
663 
665 {
666  // send JOIN:REQ message
667  GiaMessage* msg = new GiaMessage("JOIN_REQ");
668  msg->setCommand(JOIN_REQUEST);
669  msg->setSrcNode(thisGiaNode);
672  msg->setBitLength(GIA_L(msg));
673 
674  stat_joinCount += 1;
675  stat_joinBytesSent += msg->getByteLength();
676  stat_joinREQ += 1;
677  stat_joinREQBytesSent += msg->getByteLength();
678 
679  sendMessageToUDP(dst, msg);
680 }
681 
683 {
684  // send JOIN:RSP message
685  GiaNeighborMessage* msg = new GiaNeighborMessage("JOIN_RSP");
687  msg->setSrcNode(thisGiaNode);
690 
692  //TODO: add parameter maxSendNeighbors
693  for(uint32_t i = 0; i < neighbors->getSize(); i++)
694  msg->setNeighbors(i, neighbors->get(i));
695 
696  msg->setBitLength(GIANEIGHBOR_L(msg));
697 
698  stat_joinCount += 1;
699  stat_joinBytesSent += msg->getByteLength();
700  stat_joinRSP += 1;
701  stat_joinRSPBytesSent += msg->getByteLength();
702 
703  sendMessageToUDP(dst, msg);
704 }
705 
707 {
708  // send JOIN:ACK message
709  GiaNeighborMessage* msg = new GiaNeighborMessage("JOIN_ACK");
710  msg->setCommand(JOIN_ACK);
711  msg->setSrcNode(thisGiaNode);
714 
716  //TODO: add parameter maxSendNeighbors
717  for(uint32_t i = 0; i < neighbors->getSize(); i++)
718  msg->setNeighbors(i, neighbors->get(i));
719 
720  msg->setBitLength(GIANEIGHBOR_L(msg));
721 
722  stat_joinCount += 1;
723  stat_joinBytesSent += msg->getByteLength();
724  stat_joinACK += 1;
725  stat_joinACKBytesSent += msg->getByteLength();
726 
727  sendMessageToUDP(dst, msg);
728 }
729 
731 {
732  // send JOIN:DNY message
733  GiaMessage* msg = new GiaMessage("JOIN_DENY");
734  msg->setCommand(JOIN_DENY);
735  msg->setSrcNode(thisGiaNode);
738  msg->setBitLength(GIA_L(msg));
739 
740  stat_joinCount += 1;
741  stat_joinBytesSent += msg->getByteLength();
742  stat_joinDNY += 1;
743  stat_joinDNYBytesSent += msg->getByteLength();
744 
745  sendMessageToUDP(dst, msg);
746 }
747 
749 {
750  // send DISCONNECT:IND message
751  GiaMessage* msg = new GiaMessage("DISCONNECT");
752  msg->setCommand(DISCONNECT);
753  msg->setSrcNode(thisGiaNode);
756  msg->setBitLength(GIA_L(msg));
757 
758  stat_disconnectMessagesBytesSent += msg->getByteLength();
760 
761  sendMessageToUDP(dst, msg);
762 }
763 
765 {
766  // send UPDATE:IND message
767  GiaMessage* msg = new GiaMessage("UPDATE");
768  msg->setCommand(UPDATE);
769  msg->setSrcNode(thisGiaNode);
772  msg->setBitLength(GIA_L(msg));
773 
774  stat_updateMessagesBytesSent += msg->getByteLength();
775  stat_updateMessages += 1;
776 
777  sendMessageToUDP(dst, msg);
778 }
779 
781 {
782  // send KEYLIST:IND message
783  KeyListMessage* msg = new KeyListMessage("KEYLIST");
784  msg->setCommand(KEYLIST);
785  msg->setSrcNode(thisGiaNode);
788 
790  for (uint32_t i=0; i<keyList.getSize(); i++)
791  msg->setKeys(i, keyList.get(i));
792 
793  msg->setBitLength(KEYLIST_L(msg));
794 
795  stat_keyListMessagesBytesSent += msg->getByteLength();
797 
798  sendMessageToUDP(dst, msg);
799 }
800 
801 void Gia::sendToken(const GiaNode& dst)
802 {
803  TokenMessage* tokenMsg = new TokenMessage("TOKEN");
804  tokenMsg->setCommand(TOKEN);
805  tokenMsg->setHopCount(maxHopCount);
806  tokenMsg->setSrcNode(thisGiaNode);
808  tokenMsg->setSrcDegree(connectionDegree);
809  tokenMsg->setSrcTokenNr(0/*dst.getReceivedTokens()*/);//???
810  tokenMsg->setDstTokenNr(/*dst.getSentTokens()+*/1);
811  tokenMsg->setBitLength(TOKEN_L(tokenMsg));
812 
813  stat_tokenMessagesBytesSent += tokenMsg->getByteLength();
814  stat_tokenMessages += 1;
815 
816  sendMessageToUDP(dst, tokenMsg);
817 }
818 
820 {
821  if(neighbors->contains(msg->getSrcNode().getKey())) {
823  //neighbors->setCapacity(msg->getSrcNode(), msg->getSrcCapacity());
825  }
826 }
827 
829 {
830  if (responseMsg->getHopCount() != 0) {
831  uint32_t reversePathArraySize = responseMsg->getReversePathArraySize();
832 
833  // reached node which started search?
834  if (reversePathArraySize == 0) {
835  deliverSearchResult(responseMsg);
836  }
837  // forward message to next node in reverse path list
838  else {
839  NodeHandle targetNode = neighbors->get
840  (responseMsg->getReversePath(
841  reversePathArraySize-1));
842 
843  if(!(targetNode.isUnspecified())) {
844  responseMsg->setDestKey(targetNode.getKey());
845  responseMsg->setReversePathArraySize(reversePathArraySize-1);
846  for (uint32_t i=0; i<reversePathArraySize-1; i++)
847  responseMsg->setReversePath(i, responseMsg->getReversePath(i));
848  responseMsg->setBitLength(responseMsg->getBitLength() - KEY_L);
849 
850  stat_routeMessagesBytesSent += responseMsg->getByteLength();
851  stat_routeMessages += 1;
852 
853  if(responseMsg->getHopCount() > 0)
855  responseMsg->getByteLength());
856 
857  sendMessageToUDP(targetNode, responseMsg);
858  } else {
859  EV << "(Gia) wrong reverse path in " << *responseMsg
860  << " ... deleted!" << endl;
862  bytesDropped += responseMsg->getByteLength());
863  delete responseMsg;
864  }
865  }
866  }
867  // max hopcount reached. delete message
868  else
869  delete responseMsg;
870 }
871 
872 void Gia::forwardMessage(GiaIDMessage* msg , bool fromApplication)
873 {
874  if (msg->getHopCount() == 0) {
875  // Max-Hopcount reached. Discard message
876  if (!fromApplication)
878  RECORD_STATS(numDropped++; bytesDropped += msg->getByteLength());
879  delete msg;
880  } else {
881  // local delivery?
882  if (msg->getDestKey() == thisGiaNode.getKey()) {
883  if(!fromApplication)
885 
886  if(debugOutput)
887  EV << "(Gia) Deliver messsage " << msg
888  << " to application at " << thisGiaNode << endl;
889 
890  OverlayCtrlInfo* overlayCtrlInfo =
891  new OverlayCtrlInfo();
892 
893  overlayCtrlInfo->setHopCount(msg->getHopCount());
894  overlayCtrlInfo->setSrcNode(msg->getSrcNode());
895  overlayCtrlInfo->setTransportType(ROUTE_TRANSPORT);
896  overlayCtrlInfo->setDestComp(TIER1_COMP); // workaround
897  overlayCtrlInfo->setSrcComp(TIER1_COMP);
898 
899  msg->setControlInfo(overlayCtrlInfo);
900  callDeliver(msg, msg->getDestKey());
901  } else {
902  // forward message to next hop
903 
904  // do we know the destination-node?
905  if (neighbors->contains(msg->getDestKey())) {
906  // yes, destination-Node is our neighbor
907  NodeHandle targetNode = neighbors->get(msg->getDestKey());
908  GiaNeighborInfo* targetInfo= neighbors->get(targetNode);
909 
910  if (targetInfo->receivedTokens == 0) {
911  // wait for free token
912  if (debugOutput)
913  EV << "(Gia) No free Node, wait for free token" << endl;
914 
915  //bad code
916  std::string id;
917  if (!fromApplication)
918  id = "wait-for-token: " + msg->getID().toString();
919  else
920  id = "wait-for-token-fromapp: " +
921  msg->getID().toString();
922  cPacket* wait_timer = new cPacket(id.c_str());
923  wait_timer->encapsulate(msg);
924  scheduleAt(simTime() + tokenWaitTime,wait_timer);
925  return;
926  } else {
927  // decrease nextHop-tokencount
928  neighbors->decreaseReceivedTokens(targetNode);
929  //targetInfo->receivedTokens = targetInfo->receivedTokens - 1;
930 
931  // forward msg to nextHop
932  msg->setHopCount(msg->getHopCount()-1);
933  msg->setSrcNode(thisGiaNode);
936  if (debugOutput)
937  EV << "(Gia) Forwarding message " << msg
938  << " to neighbor " << targetNode << endl;
939  if (!fromApplication)
941 
942  stat_routeMessagesBytesSent += msg->getByteLength();
943  stat_routeMessages += 1;
944 
945  if(msg->getHopCount() > 0)
947  msg->getByteLength());
948 
949  sendMessageToUDP(targetNode, msg);
950  }
951  } else {
952  // no => pick node with at least one token and highest capacity
953  if (!msgBookkeepingList->contains(msg))
955  NodeHandle nextHop = msgBookkeepingList->getNextHop(msg);
956  // do we have a neighbor who send us a token?
957  if(nextHop.isUnspecified()) {
958  // wait for free token
959  if (debugOutput)
960  EV << "(Gia) No free Node, wait for free token" << endl;
961  std::string id;
962  if (!fromApplication)
963  id = "wait-for-token: " + msg->getID().toString();
964  else
965  id = "wait-for-token-fromapp: " +
966  msg->getID().toString();
967  cPacket* wait_timer = new cPacket(id.c_str());
968  wait_timer->encapsulate(msg);
969  scheduleAt(simTime() + tokenWaitTime,wait_timer);
970  return;
971  } else {
972  GiaNeighborInfo* nextHopInfo = neighbors->get(nextHop);
973  if(nextHopInfo == NULL) {
974  delete msg;
975  return; //???
976  }
977  // decrease nextHop-tokencount
979  //nextHopInfo->receivedTokens--;
980 
981  // forward msg to nextHop
982  msg->setHopCount(msg->getHopCount()-1);
983  msg->setSrcNode(thisGiaNode);
986  if (debugOutput)
987  EV << "(Gia) Forwarding message " << msg
988  << " to " << nextHop << endl;
989  if (!fromApplication)
991 
992  stat_routeMessagesBytesSent += msg->getByteLength();
993  stat_routeMessages += 1;
994 
995  if(msg->getHopCount() > 0)
997  msg->getByteLength());
998 
999  sendMessageToUDP(nextHop, msg);
1000  }
1001  }
1002  }
1003  }
1004 }
1005 
1006 void Gia::getRoute(const OverlayKey& key, CompType destComp,
1007  CompType srcComp, cPacket* msg,
1008  const std::vector<TransportAddress>& sourceRoute,
1009  RoutingType routingType)
1010 {
1011  if ((destComp != TIER1_COMP) || (srcComp != TIER1_COMP)) {
1012  throw cRuntimeError("Gia::getRoute(): Works currently "
1013  "only with srcComp=destComp=TIER1_COMP!");
1014  }
1015 
1016  if (state == READY) {
1017  GiaRouteMessage* routeMsg = new GiaRouteMessage("ROUTE");
1018  routeMsg->setStatType(APP_DATA_STAT);
1019  routeMsg->setCommand(ROUTE);
1020  routeMsg->setHopCount(maxHopCount);
1021  routeMsg->setDestKey(key);
1022  routeMsg->setSrcNode(thisGiaNode);
1023  routeMsg->setSrcCapacity(thisGiaNode.getCapacity());
1024  routeMsg->setSrcDegree(connectionDegree);
1025  routeMsg->setOriginatorKey(thisGiaNode.getKey());
1026  routeMsg->setOriginatorIP(thisGiaNode.getIp());
1027  routeMsg->setOriginatorPort(thisGiaNode.getPort());
1028  routeMsg->setID(OverlayKey::random());
1029  routeMsg->setBitLength(GIAROUTE_L(routeMsg));
1030  routeMsg->encapsulate(msg);
1031 
1032 
1033  forwardMessage(routeMsg, true);
1034  } else {
1035  RECORD_STATS(numDropped++; bytesDropped += msg->getByteLength());
1036  delete msg;
1037  }
1038 }
1039 
1040 void Gia::handleAppMessage(cMessage* msg)
1041 {
1042  // do we receive a keylist?
1043  if (dynamic_cast<GIAput*>(msg) != NULL) {
1044  GIAput* putMsg = check_and_cast<GIAput*>(msg);
1045  uint32_t keyListSize = putMsg->getKeysArraySize();
1046  for (uint32_t k=0; k<keyListSize; k++)
1047  keyList.addKeyItem(putMsg->getKeys(k));
1048 
1049  // actualize vector in KeyListModule
1051 
1052  scheduleAt(simTime() + keyListDelay, sendKeyList_timer);
1053 
1054  delete putMsg;
1055  } else if (dynamic_cast<GIAsearch*>(msg) != NULL) {
1056  if (state == READY) {
1057  GIAsearch* getMsg = check_and_cast<GIAsearch*>(msg);
1058  SearchMessage* searchMsg = new SearchMessage("SEARCH");
1059  searchMsg->setCommand(SEARCH);
1060  searchMsg->setStatType(APP_DATA_STAT);
1061  searchMsg->setHopCount(maxHopCount);
1062  searchMsg->setDestKey(getMsg->getSearchKey());
1063  searchMsg->setSrcNode(thisGiaNode);
1064  searchMsg->setSrcCapacity(thisGiaNode.getCapacity());
1065  searchMsg->setSrcDegree(connectionDegree);
1066  searchMsg->setSearchKey(getMsg->getSearchKey());
1067  searchMsg->setMaxResponses(getMsg->getMaxResponses());
1068  searchMsg->setReversePathArraySize(0);
1069  searchMsg->setID(OverlayKey::random());
1070  searchMsg->setBitLength(SEARCH_L(searchMsg));
1071 
1072  processSearchMessage(searchMsg, true);
1073 
1074  delete getMsg;
1075  } else
1076  delete msg;
1077  } else {
1078  delete msg;
1079  EV << "(Gia) unkown message from app deleted!" << endl;
1080  }
1081 }
1082 
1083 
1085 {
1086  // does SearchMessage->foundNode[] already contain this node
1087  uint32_t foundNodeArraySize = msg->getFoundNodeArraySize();
1088  bool containsNode = false;
1089  for (uint32_t i=0; i<foundNodeArraySize; i++)
1090  if (srcNode.getKey() == msg->getFoundNode(i))
1091  containsNode = true;
1092 
1093  if (!containsNode && msg->getMaxResponses() > 0) {
1094  // add this node to SearchMessage->foundNode[]
1095  msg->setFoundNodeArraySize(foundNodeArraySize+1);
1096  msg->setFoundNode(foundNodeArraySize, srcNode.getKey());
1097 
1098  // decrease SearchMessage->maxResponses
1099  msg->setMaxResponses(msg->getMaxResponses()-1);
1100 
1101  // get first node in reverse-path
1102  uint32_t reversePathArraySize = msg->getReversePathArraySize();
1103 
1104  if (reversePathArraySize == 0) {
1105  // we have the key
1106  // deliver response to application
1107  SearchResponseMessage* responseMsg =
1108  new SearchResponseMessage("ANSWER");
1109  responseMsg->setCommand(ANSWER);
1110  responseMsg->setStatType(APP_DATA_STAT);
1111  responseMsg->setHopCount(maxHopCount);
1112  responseMsg->setSrcNode(thisGiaNode);
1113  responseMsg->setSrcCapacity(thisGiaNode.getCapacity());
1114  responseMsg->setSrcDegree(connectionDegree);
1115  responseMsg->setSearchKey(msg->getSearchKey());
1116  responseMsg->setFoundNode(srcNode);
1117  responseMsg->setID(OverlayKey::random());
1118  responseMsg->setSearchHopCount(0);
1119 
1120  responseMsg->setBitLength(SEARCHRESPONSE_L(responseMsg));
1121 
1122  deliverSearchResult(responseMsg);
1123  } else {
1124  uint32_t reversePathArraySize(msg->getReversePathArraySize());
1125  SearchResponseMessage* responseMsg =
1126  new SearchResponseMessage("ANSWER");
1127  responseMsg->setCommand(ANSWER);
1128  responseMsg->setHopCount(maxHopCount);
1129  responseMsg->setSrcNode(srcNode);
1130  responseMsg->setSrcCapacity(srcNode.getCapacity());
1131  responseMsg->setSrcDegree(connectionDegree);
1132  responseMsg->setSearchKey(msg->getSearchKey());
1133  responseMsg->setFoundNode(srcNode);
1134  responseMsg->setReversePathArraySize(reversePathArraySize);
1135  for (uint32_t i=0; i<reversePathArraySize; i++)
1136  responseMsg->setReversePath(i, msg->getReversePath(i));
1137  responseMsg->setID(OverlayKey::random());
1138  responseMsg->setSearchHopCount(reversePathArraySize);
1139  responseMsg->setBitLength(SEARCHRESPONSE_L(responseMsg));
1140 
1141  forwardSearchResponseMessage(responseMsg);
1142  }
1143  }
1144 }
1145 
1146 
1147 void Gia::processSearchMessage(SearchMessage* msg, bool fromApplication)
1148 {
1149  OverlayKey searchKey = msg->getSearchKey();
1150 
1151  if (keyList.contains(searchKey)) {
1152  // this node contains search key
1154  }
1155 
1156  // check if neighbors contain search key
1157  for (uint32_t i = 0; i < neighbors->getSize(); i++) {
1159  if (keyList->contains(searchKey))
1161  }
1162 
1163  // forward search-message to next hop
1164  if (msg->getMaxResponses() > 0) {
1165  // actualize reverse path
1166  uint32_t reversePathSize = msg->getReversePathArraySize();
1167 
1168  if (optimizeReversePath)
1169  for (uint32_t i=0; i<reversePathSize; i++) {
1170  if (msg->getReversePath(i) == thisGiaNode.getKey()) {
1171  // Our node already in ReversePath.
1172  // Delete successor nodes from ReversePath
1173  msg->setBitLength(msg->getBitLength() - (reversePathSize - i)*KEY_L);
1174  reversePathSize = i; // set new array size
1175  break;
1176  }
1177  }
1178 
1179  msg->setReversePathArraySize(reversePathSize+1);
1180  msg->setReversePath(reversePathSize, thisGiaNode.getKey());
1181  msg->setBitLength(msg->getBitLength() + KEY_L);
1182 
1183  forwardMessage(msg, fromApplication);
1184  } else {
1186  delete msg;
1187  }
1188 }
1189 
1191 {
1192  OverlayCtrlInfo* overlayCtrlInfo = new OverlayCtrlInfo();
1193  overlayCtrlInfo->setHopCount(msg->getSearchHopCount());
1194  overlayCtrlInfo->setSrcNode(msg->getSrcNode());
1195  overlayCtrlInfo->setTransportType(ROUTE_TRANSPORT);
1196 
1197  GIAanswer* deliverMsg = new GIAanswer("GIA-Answer");
1198  deliverMsg->setCommand(GIA_ANSWER);
1199  deliverMsg->setControlInfo(overlayCtrlInfo);
1200  deliverMsg->setSearchKey(msg->getSearchKey());
1201  deliverMsg->setNode(msg->getFoundNode());
1202  deliverMsg->setBitLength(GIAGETRSP_L(msg));
1203 
1204  send(deliverMsg, "appOut");
1205  if (debugOutput)
1206  EV << "(Gia) Deliver search response " << msg
1207  << " to application at " << thisGiaNode << endl;
1208 
1209  delete msg;
1210 }
1211 
1213 {
1214  // statistics
1215 
1216  globalStatistics->addStdDev("GIA: JOIN-Messages Count", stat_joinCount);
1217  globalStatistics->addStdDev("GIA: JOIN Bytes sent", stat_joinBytesSent);
1218  globalStatistics->addStdDev("GIA: JOIN::REQ Messages", stat_joinREQ);
1219  globalStatistics->addStdDev("GIA: JOIN::REQ Bytes sent", stat_joinREQBytesSent);
1220  globalStatistics->addStdDev("GIA: JOIN::RSP Messages", stat_joinRSP);
1221  globalStatistics->addStdDev("GIA: JOIN::RSP Bytes sent", stat_joinRSPBytesSent);
1222  globalStatistics->addStdDev("GIA: JOIN::ACK Messages", stat_joinACK);
1223  globalStatistics->addStdDev("GIA: JOIN::ACK Bytes sent", stat_joinACKBytesSent);
1224  globalStatistics->addStdDev("GIA: JOIN::DNY Messages", stat_joinDNY);
1225  globalStatistics->addStdDev("GIA: JOIN::DNY Bytes sent", stat_joinDNYBytesSent);
1226  globalStatistics->addStdDev("GIA: DISCONNECT:IND Messages", stat_disconnectMessages);
1227  globalStatistics->addStdDev("GIA: DISCONNECT:IND Bytes sent",
1229  globalStatistics->addStdDev("GIA: UPDATE:IND Messages", stat_updateMessages);
1230  globalStatistics->addStdDev("GIA: UPDATE:IND Bytes sent", stat_updateMessagesBytesSent);
1231  globalStatistics->addStdDev("GIA: TOKEN:IND Messages", stat_tokenMessages);
1232  globalStatistics->addStdDev("GIA: TOKEN:IND Bytes sent", stat_tokenMessagesBytesSent);
1233  globalStatistics->addStdDev("GIA: KEYLIST:IND Messages", stat_keyListMessages);
1234  globalStatistics->addStdDev("GIA: KEYLIST:IND Bytes sent", stat_keyListMessagesBytesSent);
1235  globalStatistics->addStdDev("GIA: ROUTE:IND Messages", stat_routeMessages);
1236  globalStatistics->addStdDev("GIA: ROUTE:IND Bytes sent", stat_routeMessagesBytesSent);
1237  globalStatistics->addStdDev("GIA: Neighbors max", stat_maxNeighbors);
1238  globalStatistics->addStdDev("GIA: Neighbors added", stat_addedNeighbors);
1239  globalStatistics->addStdDev("GIA: Neighbors removed", stat_removedNeighbors);
1240  globalStatistics->addStdDev("GIA: Level of satisfaction messages ",
1242  globalStatistics->addStdDev("GIA: Level of satisfaction max ",stat_maxLevelOfSatisfaction);
1243  globalStatistics->addStdDev("GIA: Level of satisfaction avg ",
1246 }
1247 
1249 {
1250  cancelAndDelete(satisfaction_timer);
1251  cancelAndDelete(update_timer);
1252  cancelAndDelete(timedoutMessages_timer);
1253  cancelAndDelete(timedoutNeighbors_timer);
1254  cancelAndDelete(sendKeyList_timer);
1255  cancelAndDelete(sendToken_timer);
1256  delete msgBookkeepingList;
1257 }
1258