OverSim
DHTTestApp.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2007 Institut fuer Telematik, Universitaet Karlsruhe (TH)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 //
18 
24 #include <IPAddressResolver.h>
25 #include <GlobalNodeListAccess.h>
26 #include <GlobalStatisticsAccess.h>
28 #include <RpcMacros.h>
29 #include "CommonMessages_m.h"
30 
31 #include <GlobalDhtTestMap.h>
32 
33 #include "DHTTestApp.h"
34 
36 
37 using namespace std;
38 
40 {
41  cancelAndDelete(dhttestput_timer);
42  cancelAndDelete(dhttestget_timer);
43  cancelAndDelete(dhttestmod_timer);
44 }
45 
47 {
48  dhttestput_timer = NULL;
49  dhttestget_timer = NULL;
50  dhttestmod_timer = NULL;
51 }
52 
54 {
55  if (stage != MIN_STAGE_APP)
56  return;
57 
58  // fetch parameters
59  debugOutput = par("debugOutput");
60  activeNetwInitPhase = par("activeNetwInitPhase");
61 
62  mean = par("testInterval");
63  p2pnsTraffic = par("p2pnsTraffic");
64  deviation = mean / 10;
65 
66  if (p2pnsTraffic) {
67  ttl = 3600*24*365;
68  } else {
69  ttl = par("testTtl");
70  }
71 
72  globalNodeList = GlobalNodeListAccess().get();
73  underlayConfigurator = UnderlayConfiguratorAccess().get();
74  globalStatistics = GlobalStatisticsAccess().get();
75 
76  globalDhtTestMap = dynamic_cast<GlobalDhtTestMap*>(simulation.getModuleByPath(
77  "globalObserver.globalFunctions[0].function"));
78 
79  if (globalDhtTestMap == NULL) {
80  throw cRuntimeError("DHTTestApp::initializeApp(): "
81  "GlobalDhtTestMap module not found!");
82  }
83 
84  // statistics
85  numSent = 0;
86  numGetSent = 0;
87  numGetError = 0;
88  numGetSuccess = 0;
89  numPutSent = 0;
90  numPutError = 0;
91  numPutSuccess = 0;
92 
93  //initRpcs();
94  WATCH(numSent);
95  WATCH(numGetSent);
96  WATCH(numGetError);
97  WATCH(numGetSuccess);
98  WATCH(numPutSent);
99  WATCH(numPutError);
100  WATCH(numPutSuccess);
101 
102  nodeIsLeavingSoon = false;
103 
104  // initiate test message transmission
105  dhttestput_timer = new cMessage("dhttest_put_timer");
106  dhttestget_timer = new cMessage("dhttest_get_timer");
107  dhttestmod_timer = new cMessage("dhttest_mod_timer");
108 
109  if (mean > 0) {
110  scheduleAt(simTime() + truncnormal(mean, deviation),
111  dhttestput_timer);
112  scheduleAt(simTime() + truncnormal(mean + mean / 3,
113  deviation),
114  dhttestget_timer);
115  scheduleAt(simTime() + truncnormal(mean + 2 * mean / 3,
116  deviation),
117  dhttestmod_timer);
118  }
119 }
120 
122  const RpcState& state, simtime_t rtt)
123 {
124  RPC_SWITCH_START(msg)
125  RPC_ON_RESPONSE( DHTputCAPI ) {
126  handlePutResponse(_DHTputCAPIResponse,
127  check_and_cast<DHTStatsContext*>(state.getContext()));
128  EV << "[DHTTestApp::handleRpcResponse()]\n"
129  << " DHT Put RPC Response received: id=" << state.getId()
130  << " msg=" << *_DHTputCAPIResponse << " rtt=" << rtt
131  << endl;
132  break;
133  }
134  RPC_ON_RESPONSE(DHTgetCAPI)
135  {
136  handleGetResponse(_DHTgetCAPIResponse,
137  check_and_cast<DHTStatsContext*>(state.getContext()));
138  EV << "[DHTTestApp::handleRpcResponse()]\n"
139  << " DHT Get RPC Response received: id=" << state.getId()
140  << " msg=" << *_DHTgetCAPIResponse << " rtt=" << rtt
141  << endl;
142  break;
143  }
145 }
146 
148  DHTStatsContext* context)
149 {
150  DHTEntry entry = {context->value, simTime() + ttl, simTime()};
151 
152  globalDhtTestMap->insertEntry(context->key, entry);
153 
154  if (context->measurementPhase == false) {
155  // don't count response, if the request was not sent
156  // in the measurement phase
157  delete context;
158  return;
159  }
160 
161  if (msg->getIsSuccess()) {
162  RECORD_STATS(numPutSuccess++);
163  RECORD_STATS(globalStatistics->addStdDev("DHTTestApp: PUT Latency (s)",
164  SIMTIME_DBL(simTime() - context->requestTime)));
165  } else {
166  //cout << "DHTTestApp: PUT failed" << endl;
167  RECORD_STATS(numPutError++);
168  }
169 
170  delete context;
171 }
172 
174  DHTStatsContext* context)
175 {
176  if (context->measurementPhase == false) {
177  // don't count response, if the request was not sent
178  // in the measurement phase
179  delete context;
180  return;
181  }
182 
183  RECORD_STATS(globalStatistics->addStdDev("DHTTestApp: GET Latency (s)",
184  SIMTIME_DBL(simTime() - context->requestTime)));
185 
186  if (!(msg->getIsSuccess())) {
187  //cout << "DHTTestApp: success == false" << endl;
188  RECORD_STATS(numGetError++);
189  delete context;
190  return;
191  }
192 
193  const DHTEntry* entry = globalDhtTestMap->findEntry(context->key);
194 
195  if (entry == NULL) {
196  //unexpected key
197  RECORD_STATS(numGetError++);
198  //cout << "DHTTestApp: unexpected key" << endl;
199  delete context;
200  return;
201  }
202 
203  if (simTime() > entry->endtime) {
204  //this key doesn't exist anymore in the DHT, delete it in our hashtable
205 
206  globalDhtTestMap->eraseEntry(context->key);
207  delete context;
208 
209  if (msg->getResultArraySize() > 0) {
210  RECORD_STATS(numGetError++);
211  //cout << "DHTTestApp: deleted key still available" << endl;
212  return;
213  } else {
214  RECORD_STATS(numGetSuccess++);
215  //cout << "DHTTestApp: success (1)" << endl;
216  return;
217  }
218  } else {
219  delete context;
220  if ((msg->getResultArraySize() > 0) &&
221  (msg->getResult(0).getValue() == entry->value)) {
222  RECORD_STATS(numGetSuccess++);
223  //cout << "DHTTestApp: success (2)" << endl;
224  return;
225  } else {
226  RECORD_STATS(numGetError++);
227 #if 0
228  if (msg->getResultArraySize()) {
229  cout << "DHTTestApp: wrong value: " << msg->getResult(0).getValue() << endl;
230  } else {
231  cout << "DHTTestApp: no value" << endl;
232  }
233 #endif
234  return;
235  }
236  }
237 
238 }
239 
241 {
242  char* cmd = new char[strlen(msg->getName()) + 1];
243  strcpy(cmd, msg->getName());
244 
245  if (strlen(msg->getName()) < 5) {
246  delete[] cmd;
247  delete msg;
248  return;
249  }
250 
251  if (strncmp(cmd, "PUT ", 4) == 0) {
252  // Generate key
253  char* buf = cmd + 4;
254 
255  while (!isspace(buf[0])) {
256  if (buf[0] == '\0')
257  throw cRuntimeError("Error parsing PUT command");
258  buf++;
259  }
260 
261  buf[0] = '\0';
262  BinaryValue b(cmd + 4);
263  OverlayKey destKey(OverlayKey::sha1(b));
264 
265  // get value
266  buf++;
267 
268  // build putMsg
269  DHTputCAPICall* dhtPutMsg = new DHTputCAPICall();
270  dhtPutMsg->setKey(destKey);
271  dhtPutMsg->setValue(buf);
272  dhtPutMsg->setTtl(ttl);
273  dhtPutMsg->setIsModifiable(true);
274  RECORD_STATS(numSent++; numPutSent++);
275  sendInternalRpcCall(TIER1_COMP, dhtPutMsg,
276  new DHTStatsContext(globalStatistics->isMeasuring(),
277  simTime(), destKey, buf));
278  } else if (strncmp(cmd, "GET ", 4) == 0) {
279  // Get key
280  BinaryValue b(cmd + 4);
282 
283  DHTgetCAPICall* dhtGetMsg = new DHTgetCAPICall();
284  dhtGetMsg->setKey(key);
285  RECORD_STATS(numSent++; numGetSent++);
286  sendInternalRpcCall(TIER1_COMP, dhtGetMsg,
287  new DHTStatsContext(globalStatistics->isMeasuring(),
288  simTime(), key));
289  } else {
290  throw cRuntimeError("Unknown trace command; "
291  "only GET and PUT are allowed");
292  }
293 
294  delete[] cmd;
295  delete msg;
296 }
297 
298 void DHTTestApp::handleTimerEvent(cMessage* msg)
299 {
300  if (msg->isName("dhttest_put_timer")) {
301  // schedule next timer event
302  scheduleAt(simTime() + truncnormal(mean, deviation), msg);
303 
304  // do nothing if the network is still in the initialization phase
305  if (((!activeNetwInitPhase) && (underlayConfigurator->isInInitPhase()))
306  || underlayConfigurator->isSimulationEndingSoon()
307  || nodeIsLeavingSoon)
308  return;
309 
310  if (p2pnsTraffic) {
311  if (globalDhtTestMap->p2pnsNameCount < 4*globalNodeList->getNumNodes()) {
312  for (int i = 0; i < 4; i++) {
313  // create a put test message with random destination key
314  OverlayKey destKey = OverlayKey::random();
315  DHTputCAPICall* dhtPutMsg = new DHTputCAPICall();
316  dhtPutMsg->setKey(destKey);
317  dhtPutMsg->setValue(generateRandomValue());
318  dhtPutMsg->setTtl(ttl);
319  dhtPutMsg->setIsModifiable(true);
320 
321  RECORD_STATS(numSent++; numPutSent++);
322  sendInternalRpcCall(TIER1_COMP, dhtPutMsg,
323  new DHTStatsContext(globalStatistics->isMeasuring(),
324  simTime(), destKey, dhtPutMsg->getValue()));
325  globalDhtTestMap->p2pnsNameCount++;
326  }
327  }
328  cancelEvent(msg);
329  return;
330  }
331 
332  // create a put test message with random destination key
333  OverlayKey destKey = OverlayKey::random();
334  DHTputCAPICall* dhtPutMsg = new DHTputCAPICall();
335  dhtPutMsg->setKey(destKey);
336  dhtPutMsg->setValue(generateRandomValue());
337  dhtPutMsg->setTtl(ttl);
338  dhtPutMsg->setIsModifiable(true);
339 
340  RECORD_STATS(numSent++; numPutSent++);
341  sendInternalRpcCall(TIER1_COMP, dhtPutMsg,
342  new DHTStatsContext(globalStatistics->isMeasuring(),
343  simTime(), destKey, dhtPutMsg->getValue()));
344  } else if (msg->isName("dhttest_get_timer")) {
345  scheduleAt(simTime() + truncnormal(mean, deviation), msg);
346 
347  // do nothing if the network is still in the initialization phase
348  if (((!activeNetwInitPhase) && (underlayConfigurator->isInInitPhase()))
349  || underlayConfigurator->isSimulationEndingSoon()
350  || nodeIsLeavingSoon) {
351  return;
352  }
353 
354  if (p2pnsTraffic && (uniform(0, 1) > ((double)mean/1800.0))) {
355  return;
356  }
357 
358  const OverlayKey& key = globalDhtTestMap->getRandomKey();
359 
360  if (key.isUnspecified()) {
361  EV << "[DHTTestApp::handleTimerEvent() @ " << thisNode.getIp()
362  << " (" << thisNode.getKey().toString(16) << ")]\n"
363  << " Error: No key available in global DHT test map!"
364  << endl;
365  return;
366  }
367 
368  DHTgetCAPICall* dhtGetMsg = new DHTgetCAPICall();
369  dhtGetMsg->setKey(key);
370  RECORD_STATS(numSent++; numGetSent++);
371 
372  sendInternalRpcCall(TIER1_COMP, dhtGetMsg,
373  new DHTStatsContext(globalStatistics->isMeasuring(),
374  simTime(), key));
375  } else if (msg->isName("dhttest_mod_timer")) {
376  scheduleAt(simTime() + truncnormal(mean, deviation), msg);
377 
378  // do nothing if the network is still in the initialization phase
379  if (((!activeNetwInitPhase) && (underlayConfigurator->isInInitPhase()))
380  || underlayConfigurator->isSimulationEndingSoon()
381  || nodeIsLeavingSoon) {
382  return;
383  }
384 
385  if (p2pnsTraffic) {
386  if (globalDhtTestMap->p2pnsNameCount >= 4*globalNodeList->getNumNodes()) {
387  const OverlayKey& key = globalDhtTestMap->getRandomKey();
388 
389  if (key.isUnspecified())
390  return;
391 
392  DHTputCAPICall* dhtPutMsg = new DHTputCAPICall();
393  dhtPutMsg->setKey(key);
394  dhtPutMsg->setValue(generateRandomValue());
395  dhtPutMsg->setTtl(ttl);
396  dhtPutMsg->setIsModifiable(true);
397 
398  RECORD_STATS(numSent++; numPutSent++);
399  sendInternalRpcCall(TIER1_COMP, dhtPutMsg,
400  new DHTStatsContext(globalStatistics->isMeasuring(),
401  simTime(), key, dhtPutMsg->getValue()));
402  }
403  cancelEvent(msg);
404  return;
405  }
406 
407  const OverlayKey& key = globalDhtTestMap->getRandomKey();
408 
409  if (key.isUnspecified())
410  return;
411 #if 0
412  const DHTEntry* entry = globalDhtTestMap->findEntry(key);
413  if (entry->insertiontime + 10.0 > simTime()) {
414  std::cout << "avoided early get" << std::endl;
415  return;
416  }
417 #endif
418 
419  DHTputCAPICall* dhtPutMsg = new DHTputCAPICall();
420  dhtPutMsg->setKey(key);
421  dhtPutMsg->setValue(generateRandomValue());
422  dhtPutMsg->setTtl(ttl);
423  dhtPutMsg->setIsModifiable(true);
424 
425  RECORD_STATS(numSent++; numPutSent++);
426  sendInternalRpcCall(TIER1_COMP, dhtPutMsg,
427  new DHTStatsContext(globalStatistics->isMeasuring(),
428  simTime(), key, dhtPutMsg->getValue()));
429  }
430 }
431 
432 
434 {
435  char value[DHTTESTAPP_VALUE_LEN + 1];
436 
437  for (int i = 0; i < DHTTESTAPP_VALUE_LEN; i++) {
438  value[i] = intuniform(0, 25) + 'a';
439  }
440 
441  value[DHTTESTAPP_VALUE_LEN] = '\0';
442  return BinaryValue(value);
443 }
444 
446 {
447  nodeIsLeavingSoon = true;
448 }
449 
451 {
452  simtime_t time = globalStatistics->calcMeasuredLifetime(creationTime);
453 
454  if (time >= GlobalStatistics::MIN_MEASURED) {
455  // record scalar data
456  globalStatistics->addStdDev("DHTTestApp: Sent Total Messages/s",
457  numSent / time);
458  globalStatistics->addStdDev("DHTTestApp: Sent GET Messages/s",
459  numGetSent / time);
460  globalStatistics->addStdDev("DHTTestApp: Failed GET Requests/s",
461  numGetError / time);
462  globalStatistics->addStdDev("DHTTestApp: Successful GET Requests/s",
463  numGetSuccess / time);
464 
465  globalStatistics->addStdDev("DHTTestApp: Sent PUT Messages/s",
466  numPutSent / time);
467  globalStatistics->addStdDev("DHTTestApp: Failed PUT Requests/s",
468  numPutError / time);
469  globalStatistics->addStdDev("DHTTestApp: Successful PUT Requests/s",
470  numPutSuccess / time);
471 
472  if ((numGetSuccess + numGetError) > 0) {
473  globalStatistics->addStdDev("DHTTestApp: GET Success Ratio",
474  (double) numGetSuccess
475  / (double) (numGetSuccess + numGetError));
476  }
477  }
478 }
479