OverSim
I3.cc
Go to the documentation of this file.
1 // Copyright (C) 2006 Institut fuer Telematik, Universitaet Karlsruhe (TH)
2 //
3 // This program is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU General Public License
5 // as published by the Free Software Foundation; either version 2
6 // of the License, or (at your option) any later version.
7 //
8 // This program is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 // GNU General Public License for more details.
12 //
13 // You should have received a copy of the GNU General Public License
14 // along with this program; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
16 //
17 
23 #include <UDPAppBase.h>
24 #include <UDPControlInfo_m.h>
25 #include "UDPSocket.h"
26 #include "UDPControlInfo_m.h"
27 
28 #include <IPAddressResolver.h>
29 #include <CommonMessages_m.h>
30 #include <GlobalNodeListAccess.h>
32 
33 #include <omnetpp.h>
34 #include <OverlayKey.h>
35 #include "SHA1.h"
36 
37 #include "TriggerTable.h"
38 #include "I3Identifier.h"
39 #include "I3IPAddress.h"
40 #include "I3SubIdentifier.h"
41 #include "I3IdentifierStack.h"
42 #include "I3Trigger.h"
43 #include "I3Message_m.h"
44 #include "I3Message.h"
45 #include "I3.h"
46 
47 using namespace std;
48 
49 std::ostream& operator<<(std::ostream& os, const I3TriggerSet &t)
50 {
51  os << endl;
52 
53  I3TriggerSet::iterator it;
54  for (it = t.begin(); it != t.end(); it++) {
55  os << *it << " in " << it->getInsertionTime() << "(" <<
56  (simTime() - it->getInsertionTime()) << ")" << endl;
57  }
58  return os;
59 }
60 
62 
64 {
65  int prevDist = 0, nextDist = 0;
66 
67 
68  if (triggerTable.size() == 0) return 0;
69 
70  /* find the closest identifier to t */
71  /* if no match exists, it gets the next identifier with a bigger key */
72  I3TriggerTable::const_iterator it = triggerTable.lower_bound(t);
73 
74  if (it == triggerTable.end()) {
75  it--; // if at the end, check last
76  }
77 
78  if (it->first == t) return &it->first; // if we found an exact match, no need to bother
79 
80  if (it != triggerTable.begin()) { // if no smaller keys, begin() is the candidate itself
81 
82  /* no exact match, check which is closer: */
83  /* either where the iterator points to (the next biggest) or the previous one. */
84  /* see I3Identifier::distanceTo for distance definition */
85 
86  nextDist = it->first.distanceTo(t);
87  it--;
88  prevDist = it->first.distanceTo(t);
89 
90  // if the next one is closest, put iterator back in place
91  if (nextDist < prevDist) {
92  it++;
93  }
94  }
95 
96  /* now check if they match in the I3 sense (first prefixLength bits) */
97  return (it->first.isMatch(t)) ? &it->first : 0;
98 }
99 
101 {
102 
103  if (t.getIdentifierStack().size() == 0) {
104  /* don't bother */
105  cout << "Warning: Got trigger " << t << " with size 0 in " << thisNode.getIp()<< endl;
106  return;
107  }
108 
109  t.setInsertionTime(simTime());
110 
111  /* insert packet in triggerTable; */
112  /* if it was already there, remove and insert updated copy */
113 
114  triggerTable[t.getIdentifier()].erase(t);
115  triggerTable[t.getIdentifier()].insert(t);
116 
117  updateTriggerTableString();
118 }
119 
121 {
122 
123  //cout << "Removing trigger at " << getId() << endl;
124  //getParentModule()->getParentModule()->bubble("Removing trigger");
125 
126  if (triggerTable.count(t.getIdentifier()) == 0) return;
127 
128  set<I3Trigger> &s = triggerTable[t.getIdentifier()];
129 
130  s.erase(t);
131 
132  if (s.size() == 0) triggerTable.erase(t.getIdentifier());
133 
134  updateTriggerTableString();
135 }
136 
138 {
139  I3IPAddress address;
140  /* re-route message to a client node */
141  address = imsg->getIdentifierStack().peek().getIPAddress();
142  imsg->getIdentifierStack().pop(); // pop ip address
143  sendMessageToUDP(address, imsg);
144 }
145 
147 {
148 
149  I3IdentifierStack &idStack = msg->getIdentifierStack();
150 
151  if (idStack.size() == 0) {
152  /* no identifiers left! drop packet */
153  /* shouldn't happen (how'd it get here anyway?) */
154  numDroppedPackets++;
155  byteDroppedPackets += msg->getBitLength();
156  delete msg;
157  return;
158  }
159 
160  I3SubIdentifier id = idStack.peek();
161 
162  if (id.getType() == I3SubIdentifier::IPAddress) {
163  /* shouldn't happen (how'd they find us anyway?) but just in case */
164  sendToNode(msg);
165  } else {
166 
167  /* if we were asked to reply, send it now */
168  if (msg->getSendReply()) {
169  sendQueryReply(id.getIdentifier(), msg->getSource());
170  }
171 
172  const I3Identifier *i3id = findClosestMatch(id.getIdentifier());
173 
174  /* eliminate top of the stack */
175  idStack.pop();
176 
177  if (!i3id) {
178  /* no matching ids found in this server, re-route to next id */
179  if (idStack.size() == 0) {
180  /* no identifiers left! drop packet */
181  numDroppedPackets++;
182  byteDroppedPackets += msg->getBitLength();
183  cout << "Dropped packet at" << thisNode.getIp()<< " to unknown id " << id.getIdentifier() << endl;
184  delete msg;
185  return;
186  } else {
187  msg->setBitLength(SEND_PACKET_L(msg)); /* stack size changed, recalculate */
188  if (idStack.peek().getType() == I3SubIdentifier::IPAddress) {
189  msg->getMatchedTrigger().clear(); // not trigger, but direct IP match
190  sendToNode(msg);
191  } else {
192  OverlayKey key = idStack.peek().getIdentifier().asOverlayKey();
193  callRoute(key, msg);
194  }
195  }
196 
197  } else {
198  /* some id found, send to all friends */
199  set<I3Trigger> &s = triggerTable[*i3id];
200  set<I3Trigger>::iterator it;
201 
202  for (it = s.begin(); it != s.end(); it++) {
203  I3SendPacketMessage *newMsg;
204  cPacket *dupMsg;
205 
206  newMsg = new I3SendPacketMessage();
207  newMsg->setIdentifierStack(idStack); /* create copy */
208  newMsg->getIdentifierStack().push(it->getIdentifierStack()); /* append our stuff to the top of the stack */
209  dupMsg = check_and_cast<cPacket*>(msg->getEncapsulatedPacket()->dup()); /* dup msg */
210  newMsg->setBitLength(SEND_PACKET_L(newMsg)); /* stack size changed, recalculate */
211  newMsg->encapsulate(dupMsg);
212 
213  I3SubIdentifier &top = newMsg->getIdentifierStack().peek();
214 
215  if (top.getType() == I3SubIdentifier::IPAddress) {
216  newMsg->setMatchedTrigger(*it);
217  sendToNode(newMsg);
218  } else {
219  OverlayKey key = top.getIdentifier().asOverlayKey();
220  callRoute(key, newMsg);
221  }
222  }
223 
224  /* copies sent, erase original */
225  delete msg;
226 
227  }
228  }
229 }
230 
231 void I3::forward(OverlayKey *key, cPacket **msg, NodeHandle *hint)
232 {
233  numForwardedPackets++;
234  numForwardedBytes += (*msg)->getByteLength();
235 
236  BaseApp::forward(key, msg, hint);
237 }
238 
239 
240 void I3::handleUDPMessage(cMessage *msg)
241 {
242  I3Message *i3msg;
243 
244  i3msg = dynamic_cast<I3Message*>(msg);
245 
246  if (!i3msg) {
247  delete msg;
248  return;
249  }
250 
251  OverlayKey key;
252 
253  msg->removeControlInfo();
254  switch (i3msg->getType()) {
255  case INSERT_TRIGGER:
257 
258  imsg = check_and_cast<I3InsertTriggerMessage*>(i3msg);
259  key = imsg->getTrigger().getIdentifier().asOverlayKey();
260  callRoute(key, imsg);
261 
262  /* if ((imsg->getSource().address.d[0] & 0xff) == 56) {
263  cout << "UDP Server " << thisNode.getIp()<< " trigger " << imsg->getTrigger() << " key " << key << endl;
264  }*/
265 
266  break;
267  case REMOVE_TRIGGER:
269 
270  rmsg = check_and_cast<I3RemoveTriggerMessage*>(i3msg);
271  key = rmsg->getTrigger().getIdentifier().asOverlayKey();
272  callRoute(key, rmsg);
273 
274  break;
275  case SEND_PACKET:
276  {
277  I3SendPacketMessage *smsg;
278 
279  smsg = check_and_cast<I3SendPacketMessage*>(i3msg);
280  if (smsg->getIdentifierStack().size() == 0) {
281  /* got an empty identifier stack - nothing to do */
282  delete msg;
283  return;
284  }
285  I3SubIdentifier &subId = smsg->getIdentifierStack().peek();
286  if (subId.getType() == I3SubIdentifier::IPAddress) {
287  /* why didn't they send it directly?! */
288  sendToNode(smsg);
289  } else {
290  key = subId.getIdentifier().asOverlayKey();
291  callRoute(key, smsg);
292  }
293  break;
294  }
295  default:
296  /* should't happen */
297  delete msg;
298  break;
299  }
300 }
301 
302 void I3::sendQueryReply(const I3Identifier &id, const I3IPAddress &add) {
303  I3QueryReplyMessage *pmsg;
304  I3IPAddress myAddress(thisNode.getIp(), par("serverPort"));
305 
306  pmsg = new I3QueryReplyMessage();
307  pmsg->setSource(myAddress);
308  pmsg->setSendingTime(simTime());
309  pmsg->setIdentifier(id);
310  pmsg->setBitLength(QUERY_REPLY_L(pmsg));
311  sendMessageToUDP(add, pmsg);
312 }
313 
314 void I3::deliver(OverlayKey& key, cMessage* msg)
315 {
316  I3Message *i3msg;
317 
318  i3msg = dynamic_cast<I3Message*>(msg);
319  if (!i3msg) {
320  cout << "Delivered non I3 Message!" << endl;
321  delete msg;
322  return;
323  }
324 
325  switch (i3msg->getType()) {
326  case INSERT_TRIGGER:
328 
329  imsg = check_and_cast<I3InsertTriggerMessage*>(i3msg);
330  insertTrigger(imsg->getTrigger());
331 
332  if (imsg->getSendReply()) {
333  sendQueryReply(imsg->getTrigger().getIdentifier(), imsg->getSource());
334  }
335 
336  delete msg;
337  break;
338  case REMOVE_TRIGGER:
340 
341  rmsg = check_and_cast<I3RemoveTriggerMessage*>(i3msg);
342  removeTrigger(rmsg->getTrigger());
343  delete msg;
344  break;
345  case SEND_PACKET:
346  I3SendPacketMessage *smsg;
347 
348  smsg = check_and_cast<I3SendPacketMessage*>(i3msg);
349  sendPacket(smsg);
350 
351  break;
352  default:
353  delete msg;
354  break;
355  }
356 }
357 
358 int I3::numInitStages() const
359 {
360  return MIN_STAGE_APP + 1;
361 }
362 
363 void I3::initializeApp(int stage)
364 {
365  if (stage != MIN_STAGE_APP)
366  return;
367 
368  numDroppedPackets = 0;
369  WATCH(numDroppedPackets);
370  byteDroppedPackets = 0;
371  WATCH(byteDroppedPackets);
372 
373  numForwardedPackets = 0;
374  numForwardedBytes = 0;
375 
376  WATCH(numForwardedPackets);
377  WATCH(numForwardedBytes);
378 
379  triggerTimeToLive = par("triggerTimeToLive");
380  WATCH(triggerTimeToLive);
381 
382  expirationTimer = new cMessage("expiration timer");
383  scheduleAt(simTime() + triggerTimeToLive, expirationTimer);
384 
385  getDisplayString() = "i=i3";
386 
387  thisNode.setPort(par("serverPort"));
388  bindToPort(thisNode.getPort());
389 
390 }
391 
392 void I3::handleTimerEvent(cMessage* msg)
393 {
394  bool updateString = false;
395 
396  if (msg == expirationTimer) {
397  scheduleAt(simTime() + triggerTimeToLive, expirationTimer);
398 
399  for (I3TriggerTable::iterator it = triggerTable.begin(); it != triggerTable.end(); it++) {
400  set<I3Trigger> &triggerSet = it->second;
401  for (set<I3Trigger>::const_iterator sit = triggerSet.begin(); sit != triggerSet.end(); sit++) {
402  //cout << "Trigger " << *sit << " has
403  if (simTime() - sit->getInsertionTime() > triggerTimeToLive) {
404  //if ((bool)par("debugOutput")) {
405  // cout << "Erasing trigger " << *sit << " in " <<
406  // thisNode.getIp()<< ", insertion time is " << sit->getInsertionTime()<< endl;
407  //}
408  triggerSet.erase(sit);
409  updateString = true;
410  }
411  }
412  if (it->second.size() == 0) {
413  triggerTable.erase(it);
414  }
415  }
416  if (updateString) updateTriggerTableString();
417  } else delete msg;
418 }
419 
421 {
422  TriggerTable *table = check_and_cast<TriggerTable*>(getParentModule()->getSubmodule("triggerTable"));
423  table->updateDisplayString();
424 }
425 
427 {
428  return triggerTable;
429 }
430 
432 {
433  recordScalar("I3 Packets dropped", numDroppedPackets);
434  recordScalar("I3 Bytes dropped", byteDroppedPackets);
435 }