24 #include <IPAddressResolver.h>
41 cancelAndDelete(dhttestput_timer);
42 cancelAndDelete(dhttestget_timer);
43 cancelAndDelete(dhttestmod_timer);
48 dhttestput_timer = NULL;
49 dhttestget_timer = NULL;
50 dhttestmod_timer = NULL;
59 debugOutput = par(
"debugOutput");
60 activeNetwInitPhase = par(
"activeNetwInitPhase");
62 mean = par(
"testInterval");
63 p2pnsTraffic = par(
"p2pnsTraffic");
64 deviation = mean / 10;
76 globalDhtTestMap =
dynamic_cast<GlobalDhtTestMap*
>(simulation.getModuleByPath(
77 "globalObserver.globalFunctions[0].function"));
79 if (globalDhtTestMap == NULL) {
80 throw cRuntimeError(
"DHTTestApp::initializeApp(): "
81 "GlobalDhtTestMap module not found!");
100 WATCH(numPutSuccess);
102 nodeIsLeavingSoon =
false;
105 dhttestput_timer =
new cMessage(
"dhttest_put_timer");
106 dhttestget_timer =
new cMessage(
"dhttest_get_timer");
107 dhttestmod_timer =
new cMessage(
"dhttest_mod_timer");
110 scheduleAt(simTime() + truncnormal(mean, deviation),
112 scheduleAt(simTime() + truncnormal(mean + mean / 3,
115 scheduleAt(simTime() + truncnormal(mean + 2 * mean / 3,
122 const RpcState& state, simtime_t rtt)
126 handlePutResponse(_DHTputCAPIResponse,
127 check_and_cast<DHTStatsContext*>(state.
getContext()));
128 EV <<
"[DHTTestApp::handleRpcResponse()]\n"
129 <<
" DHT Put RPC Response received: id=" << state.
getId()
130 <<
" msg=" << *_DHTputCAPIResponse <<
" rtt=" << rtt
136 handleGetResponse(_DHTgetCAPIResponse,
137 check_and_cast<DHTStatsContext*>(state.
getContext()));
138 EV <<
"[DHTTestApp::handleRpcResponse()]\n"
139 <<
" DHT Get RPC Response received: id=" << state.
getId()
140 <<
" msg=" << *_DHTgetCAPIResponse <<
" rtt=" << rtt
150 DHTEntry entry = {context->
value, simTime() + ttl, simTime()};
152 globalDhtTestMap->insertEntry(context->
key, entry);
163 RECORD_STATS(globalStatistics->addStdDev(
"DHTTestApp: PUT Latency (s)",
183 RECORD_STATS(globalStatistics->addStdDev(
"DHTTestApp: GET Latency (s)",
193 const DHTEntry* entry = globalDhtTestMap->findEntry(context->
key);
203 if (simTime() > entry->
endtime) {
206 globalDhtTestMap->eraseEntry(context->
key);
231 cout <<
"DHTTestApp: no value" << endl;
242 char* cmd =
new char[strlen(msg->getName()) + 1];
243 strcpy(cmd, msg->getName());
245 if (strlen(msg->getName()) < 5) {
251 if (strncmp(cmd,
"PUT ", 4) == 0) {
255 while (!isspace(buf[0])) {
257 throw cRuntimeError(
"Error parsing PUT command");
270 dhtPutMsg->
setKey(destKey);
277 simTime(), destKey, buf));
278 }
else if (strncmp(cmd,
"GET ", 4) == 0) {
290 throw cRuntimeError(
"Unknown trace command; "
291 "only GET and PUT are allowed");
300 if (msg->isName(
"dhttest_put_timer")) {
302 scheduleAt(simTime() + truncnormal(mean, deviation), msg);
305 if (((!activeNetwInitPhase) && (underlayConfigurator->isInInitPhase()))
306 || underlayConfigurator->isSimulationEndingSoon()
307 || nodeIsLeavingSoon)
311 if (globalDhtTestMap->p2pnsNameCount < 4*globalNodeList->getNumNodes()) {
312 for (
int i = 0; i < 4; i++) {
316 dhtPutMsg->
setKey(destKey);
317 dhtPutMsg->
setValue(generateRandomValue());
324 simTime(), destKey, dhtPutMsg->
getValue()));
325 globalDhtTestMap->p2pnsNameCount++;
335 dhtPutMsg->
setKey(destKey);
336 dhtPutMsg->
setValue(generateRandomValue());
343 simTime(), destKey, dhtPutMsg->
getValue()));
344 }
else if (msg->isName(
"dhttest_get_timer")) {
345 scheduleAt(simTime() + truncnormal(mean, deviation), msg);
348 if (((!activeNetwInitPhase) && (underlayConfigurator->isInInitPhase()))
349 || underlayConfigurator->isSimulationEndingSoon()
350 || nodeIsLeavingSoon) {
354 if (p2pnsTraffic && (uniform(0, 1) > ((
double)mean/1800.0))) {
358 const OverlayKey& key = globalDhtTestMap->getRandomKey();
361 EV <<
"[DHTTestApp::handleTimerEvent() @ " << thisNode.getIp()
362 <<
" (" << thisNode.getKey().
toString(16) <<
")]\n"
363 <<
" Error: No key available in global DHT test map!"
375 }
else if (msg->isName(
"dhttest_mod_timer")) {
376 scheduleAt(simTime() + truncnormal(mean, deviation), msg);
379 if (((!activeNetwInitPhase) && (underlayConfigurator->isInInitPhase()))
380 || underlayConfigurator->isSimulationEndingSoon()
381 || nodeIsLeavingSoon) {
386 if (globalDhtTestMap->p2pnsNameCount >= 4*globalNodeList->getNumNodes()) {
387 const OverlayKey& key = globalDhtTestMap->getRandomKey();
394 dhtPutMsg->
setValue(generateRandomValue());
401 simTime(), key, dhtPutMsg->
getValue()));
407 const OverlayKey& key = globalDhtTestMap->getRandomKey();
412 const DHTEntry* entry = globalDhtTestMap->findEntry(key);
414 std::cout <<
"avoided early get" << std::endl;
421 dhtPutMsg->
setValue(generateRandomValue());
428 simTime(), key, dhtPutMsg->
getValue()));
435 char value[DHTTESTAPP_VALUE_LEN + 1];
437 for (
int i = 0; i < DHTTESTAPP_VALUE_LEN; i++) {
438 value[i] = intuniform(0, 25) +
'a';
441 value[DHTTESTAPP_VALUE_LEN] =
'\0';
447 nodeIsLeavingSoon =
true;
452 simtime_t time = globalStatistics->calcMeasuredLifetime(creationTime);
456 globalStatistics->addStdDev(
"DHTTestApp: Sent Total Messages/s",
458 globalStatistics->addStdDev(
"DHTTestApp: Sent GET Messages/s",
460 globalStatistics->addStdDev(
"DHTTestApp: Failed GET Requests/s",
462 globalStatistics->addStdDev(
"DHTTestApp: Successful GET Requests/s",
463 numGetSuccess / time);
465 globalStatistics->addStdDev(
"DHTTestApp: Sent PUT Messages/s",
467 globalStatistics->addStdDev(
"DHTTestApp: Failed PUT Requests/s",
469 globalStatistics->addStdDev(
"DHTTestApp: Successful PUT Requests/s",
470 numPutSuccess / time);
472 if ((numGetSuccess + numGetError) > 0) {
473 globalStatistics->addStdDev(
"DHTTestApp: GET Success Ratio",
474 (
double) numGetSuccess
475 / (
double) (numGetSuccess + numGetError));