OverSim
CBR-DHT.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2007 Institut fuer Telematik, Universitaet Karlsruhe (TH)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 //
18 
24 #include <IPAddressResolver.h>
25 
26 #include "CBR-DHT.h"
27 
28 #include <RpcMacros.h>
29 #include <BaseRpc.h>
30 #include <GlobalStatistics.h>
32 #include <NeighborCache.h>
33 
35 
36 using namespace std;
37 
39 {
40  dataStorage = NULL;
41 }
42 
44 {
45  std::map<unsigned int, BaseCallMessage*>::iterator it;
46 
47  for (it = rpcIdMap.begin(); it != rpcIdMap.end(); it++) {
48  delete it->second;
49  it->second = NULL;
50  }
51 
52  std::map<int, GetMapEntry>::iterator it2;
53  for (it2 = getMap.begin(); it2 != getMap.end(); it2++) {
54  //cancelAndDelete(it2->second.callMsg);
55  delete it2->second.callMsg;
56  it2->second.callMsg = NULL;
57  }
58 
59  std::map<int, PutMapEntry>::iterator it3;
60 
61  for (it3 = putMap.begin(); it3 != putMap.end(); it3++) {
62  //if (it3->second.callMsg != NULL) {
63  // cancelAndDelete(it3->second.callMsg);
64  //}
65  delete it3->second.callMsg;
66  it3->second.callMsg = NULL;
67  }
68 
69  rpcIdMap.clear();
70  getMap.clear();
71  putMap.clear();
72 
73  if (dataStorage != NULL) {
74  dataStorage->clear();
75  }
76 }
77 
78 void CBRDHT::initializeApp(int stage)
79 {
80  if (stage != MIN_STAGE_APP)
81  return;
82 
83  dataStorage = check_and_cast<DHTDataStorage*>
84  (getParentModule()->getSubmodule("dhtDataStorage"));
85 
86  coordBasedRouting = CoordBasedRoutingAccess().get();
87  neighborCache = (NeighborCache*)getParentModule()
88  ->getParentModule()->getSubmodule("neighborCache");
89 
90  numReplica = par("numReplica");
91  numReplicaTeams = par("numReplicaTeams");
92 
93  if (numReplica > numReplicaTeams * overlay->getMaxNumSiblings()) {
94  throw cRuntimeError("DHT::initialize(): numReplica bigger than what this "
95  "overlay can handle (%d)", numReplicaTeams*overlay->getMaxNumSiblings());
96  }
97 
98  if (numReplica < numReplicaTeams) {
99  throw cRuntimeError("DHT::initialize(): numReplica (%d) smaller than numReplicaTeam (%d)",
100  numReplica, numReplicaTeams);
101  }
102 
103  maintenanceMessages = 0;
104  normalMessages = 0;
105  numBytesMaintenance = 0;
106  numBytesNormal = 0;
107  WATCH(maintenanceMessages);
108  WATCH(normalMessages);
109  WATCH(numBytesNormal);
110  WATCH(numBytesMaintenance);
111  WATCH_MAP(rpcIdMap);
112 }
113 
114 void CBRDHT::handleTimerEvent(cMessage* msg)
115 {
116  DHTTtlTimer* msg_timer = dynamic_cast<DHTTtlTimer*> (msg);
117 
118  if (msg_timer) {
119  EV << "[DHT::handleTimerEvent()]\n"
120  << " received timer ttl, key: "
121  << msg_timer->getKey().toString(16)
122  << "\n (overlay->getThisNode().key = "
123  << overlay->getThisNode().getKey().toString(16) << ")"
124  << endl;
125 
126  dataStorage->removeData(msg_timer->getKey(), msg_timer->getKind(),
127  msg_timer->getId());
128  //delete msg_timer;
129  }
130  /*DHTTtlTimer* msg_timer;
131 
132  if (msg->isName("ttl_timer")) {
133  msg_timer = check_and_cast<DHTTtlTimer*> (msg);
134 
135  EV << "[DHT::handleTimerEvent()]\n"
136  << " received timer ttl, key: "
137  << msg_timer->getKey().toString(16)
138  << "\n (overlay->getThisNode().key = "
139  << overlay->getThisNode().getKey().toString(16) << ")"
140  << endl;
141 
142  dataStorage->removeData(msg_timer->getKey(), msg_timer->getKind(),
143  msg_timer->getId());
144  delete msg_timer;
145  }*/
146 }
147 
149 {
150  // delegate messages
151  RPC_SWITCH_START( msg )
152  // RPC_DELEGATE( <messageName>[Call|Response], <methodToCall> )
153  RPC_DELEGATE( DHTPut, handlePutRequest );
154  RPC_DELEGATE( CBRDHTGet, handleGetRequest );
155  RPC_DELEGATE( DHTputCAPI, handlePutCAPIRequest ); //requests coming from an upper tier
156  RPC_DELEGATE( DHTgetCAPI, handleGetCAPIRequest );
157  RPC_DELEGATE( DHTdump, handleDumpDhtRequest );
158  RPC_SWITCH_END( )
159 
160  return RPC_HANDLED;
161 }
162 
163 void CBRDHT::handleRpcResponse(BaseResponseMessage* msg, cPolymorphic* context,
164  int rpcId, simtime_t rtt)
165 {
166  RPC_SWITCH_START(msg)
167  RPC_ON_RESPONSE(DHTPut){
168  handlePutResponse(_DHTPutResponse, rpcId);
169  EV << "[DHT::handleRpcResponse()]\n"
170  << " DHT Put RPC Response received: id=" << rpcId
171  << " msg=" << *_DHTPutResponse << " rtt=" << rtt
172  << endl;
173  break;
174  }
175  RPC_ON_RESPONSE(CBRDHTGet) {
176  handleGetResponse(_CBRDHTGetResponse, rpcId);
177  EV << "[DHT::handleRpcResponse()]\n"
178  << " DHT Get RPC Response received: id=" << rpcId
179  << " msg=" << *_CBRDHTGetResponse << " rtt=" << rtt
180  << endl;
181  break;
182  }
184  handleLookupResponse(_LookupResponse);
185  EV << "[DHT::handleRpcResponse()]\n"
186  << " Replica Set RPC Response received: id=" << rpcId
187  << " msg=" << *_LookupResponse << " rtt=" << rtt
188  << endl;
189  break;
190  }
192 }
193 
195  cPolymorphic* context, int rpcId,
196  const OverlayKey& destKey)
197 {
198  RPC_SWITCH_START(msg)
199  RPC_ON_CALL(DHTPut){
200  EV << "[DHT::handleRpcResponse()]\n"
201  << " DHTPut Timeout"
202  << endl;
203 
204  std::map<int, PutMapEntry>::iterator it2 =
205  putMap.find(rpcId);
206 
207  if (it2 == putMap.end()) //unknown request
208  return;
209 
210  it2->second.numFailed++;
211 
212  if (it2->second.numFailed / (double)it2->second.numSent >= 0.5) {
213  DHTputCAPIResponse* capiPutRespMsg = new DHTputCAPIResponse();
214  capiPutRespMsg->setIsSuccess(false);
215  sendRpcResponse(it2->second.callMsg, capiPutRespMsg);
216  it2->second.callMsg = NULL;
217  putMap.erase(rpcId);
218  }
219  break;
220  }
221  RPC_ON_CALL(CBRDHTGet) {
222  EV << "[DHT::handleRpcResponse()]\n"
223  << " DHTGet Timeout"
224  << endl;
225 
226  std::map<int, GetMapEntry>::iterator it2 =
227  getMap.find(rpcId);
228 
229  if (it2 == getMap.end()) //unknown request
230  return;
231 
232  if (it2->second.replica.size() > 0) {
233  // Received empty value, try fallback replica
234  NodeHandle fallbackReplica = it2->second.replica.back();
235  CBRDHTGetCall* dhtRecall = new CBRDHTGetCall();
236  dhtRecall->setOriginalKey(_CBRDHTGetCall->getOriginalKey());
237  dhtRecall->setKey(_CBRDHTGetCall->getKey());
238  dhtRecall->setIsHash(false);
239  dhtRecall->setBitLength(GETCALL_L(dhtRecall));
240  RECORD_STATS(normalMessages++;
241  numBytesNormal += dhtRecall->getByteLength());
242  sendRouteRpcCall(TIER1_COMP, fallbackReplica, dhtRecall,
243  NULL, DEFAULT_ROUTING, -1, 0,
244  it2->second.callMsg->getNonce());
245  it2->second.numSent++;
246  it2->second.replica.pop_back();
247  return;
248  } else if (it2->second.teamNumber < (numReplicaTeams - 1)) {
249  // No more fallback replica in this team, try next one
250  it2->second.teamNumber++;
251  handleGetCAPIRequest(it2->second.callMsg, it2->second.teamNumber);
252  return;
253  } else {
254  // No more replica, no more teams, send success == false to Tier 2 :(
255  DHTgetCAPIResponse* capiGetRespMsg = new DHTgetCAPIResponse();
256  //capiGetRespMsg->setKey(_CBRDHTGetCall->getOriginalKey());
257  //capiGetRespMsg->setValue(BinaryValue::UNSPECIFIED_VALUE);
258  DhtDumpEntry result;
259  result.setKey(_CBRDHTGetCall->getKey());
261  capiGetRespMsg->setResultArraySize(1);
262  capiGetRespMsg->setResult(0, result);
263  capiGetRespMsg->setIsSuccess(false);
264  sendRpcResponse(it2->second.callMsg, capiGetRespMsg);
265  getMap.erase(rpcId);
266  }
267  break;
268  }
269  RPC_SWITCH_END( )
270 }
271 
272 void CBRDHT::handleUpperMessage(cMessage* msg)
273 {
274  error("DHT::handleUpperMessage(): Received message with unknown type!");
275 
276  delete msg;
277 }
278 
280 {
281  std::string tempString = "PUT_REQUEST received: "
282  + std::string(dhtMsg->getKey().toString(16));
283  getParentModule()->getParentModule()->bubble(tempString.c_str());
284 
285  if (!(dataStorage->isModifiable(dhtMsg->getKey(), dhtMsg->getKind(),
286  dhtMsg->getId()))) {
287  //check if the put request came from the right node
288  NodeHandle sourceNode = dataStorage->getSourceNode(dhtMsg->getKey(),
289  dhtMsg->getKind(), dhtMsg->getId());
290  if (((!sourceNode.isUnspecified())
291  && (!dhtMsg->getSrcNode().isUnspecified()) && (sourceNode
292  != dhtMsg->getSrcNode())) || ((dhtMsg->getMaintenance())
293  && (dhtMsg->getOwnerNode() == sourceNode))) {
294  // TODO: set owner
295  DHTPutResponse* responseMsg = new DHTPutResponse();
296  responseMsg->setSuccess(false);
297  responseMsg->setBitLength(PUTRESPONSE_L(responseMsg));
298  RECORD_STATS(normalMessages++;
299  numBytesNormal += responseMsg->getByteLength());
300  sendRpcResponse(dhtMsg, responseMsg);
301  return;
302  }
303 
304  }
305 
306  // remove data item from local data storage
307  //cancelAndDelete(dataStorage->getTtlMessage(dhtMsg->getKey()));
308  //dataStorage->removeData(dhtMsg->getKey());
309  dataStorage->removeData(dhtMsg->getKey(), dhtMsg->getKind(),
310  dhtMsg->getId());
311  if (dhtMsg->getValue().size() > 0) {
312  // add ttl timer
313  DHTTtlTimer *timerMsg = new DHTTtlTimer("ttl_timer");
314  timerMsg->setKey(dhtMsg->getKey());
315  scheduleAt(simTime() + dhtMsg->getTtl(), timerMsg);
316  // storage data item in local data storage
317  bool err;
318  dataStorage->addData(dhtMsg->getKey(), dhtMsg->getKind(),
319  dhtMsg->getId(), dhtMsg->getValue(), timerMsg,
320  dhtMsg->getIsModifiable(), dhtMsg->getSrcNode(),
321  overlay->isSiblingFor(overlay->getThisNode(),
322  dhtMsg->getKey(),
323  1, &err));
324  }
325 
326  // send back
327  DHTPutResponse* responseMsg = new DHTPutResponse();
328 
329  responseMsg->setSuccess(true);
330  responseMsg->setBitLength(PUTRESPONSE_L(responseMsg));
331  RECORD_STATS(normalMessages++; numBytesNormal += responseMsg->getByteLength());
332 
333  sendRpcResponse(dhtMsg, responseMsg);
334 }
335 
337 {
338  std::string tempString = "GET_REQUEST received: "
339  + std::string(dhtMsg->getKey().toString(16));
340 
341  getParentModule()->getParentModule()->bubble(tempString.c_str());
342 
343  BinaryValue storedValue;
344  DhtDataEntry* dataEntry = dataStorage->getDataEntry(dhtMsg->getKey(), 1, 1);
345  if (dataEntry) {
346  storedValue = dataStorage->getDataEntry(dhtMsg->getKey(), 1, 1)->value;
347  } else {
348  storedValue = BinaryValue::UNSPECIFIED_VALUE;
349  }
350 
351  // send back
352  CBRDHTGetResponse* responseMsg = new CBRDHTGetResponse();
353 
354  responseMsg->setKey(dhtMsg->getKey());
355  responseMsg->setOriginalKey(dhtMsg->getOriginalKey());
356  responseMsg->setIsHash(false);
357  if (storedValue.isUnspecified()) {
358  //responseMsg->setValue(BinaryValue::UNSPECIFIED_VALUE);
359  DhtDumpEntry result;
360  result.setKey(dhtMsg->getKey());
362  responseMsg->setResultArraySize(1);
363  responseMsg->setResult(0, result);
364  } else {
365  //responseMsg->setValue(storedValue);
366  DhtDumpEntry result;
367  result.setKey(dhtMsg->getKey());
368  result.setValue(storedValue);
369  responseMsg->setResultArraySize(1);
370  responseMsg->setResult(0, result);
371  }
372  rpcIdMap.insert(make_pair(dhtMsg->getNonce(), (BaseCallMessage*)NULL));
373 
374  responseMsg->setBitLength(GETRESPONSE_L(responseMsg));
375  RECORD_STATS(normalMessages++;
376  numBytesNormal += responseMsg->getByteLength());
377  sendRpcResponse(dhtMsg, responseMsg);
378 }
379 
381 {
382  // provide copies of this message for other teams
383  for (int i = 1; i < numReplicaTeams; i++) {
384  DHTPutCall* teamCopyPutMsg = new DHTPutCall; //TODO memleak
385 
386  // transfer attributes of original DHTputCAPICall to DHTPutCall for teams
387  teamCopyPutMsg->setValue(capiPutMsg->getValue());
388  teamCopyPutMsg->setTtl(capiPutMsg->getTtl());
389  teamCopyPutMsg->setIsModifiable(capiPutMsg->getIsModifiable());
390  teamCopyPutMsg->setKind(capiPutMsg->getKind());
391  teamCopyPutMsg->setId(capiPutMsg->getId());
392 
393  // control info needs to be copied by value
394  OverlayCtrlInfo controlInfo = *(check_and_cast<OverlayCtrlInfo*>(capiPutMsg->getControlInfo()));
395  OverlayCtrlInfo* controlInfoCopy = new OverlayCtrlInfo;
396  *controlInfoCopy = controlInfo;
397  teamCopyPutMsg->setControlInfo(controlInfoCopy);
398 
399  // multiple SHA1 hashing of original key
400  OverlayKey destKey = capiPutMsg->getKey();
401  for (int j = 0; j < i; j++) {
402  destKey = OverlayKey::sha1(BinaryValue(destKey.toString(16).c_str()));
403  }
404  teamCopyPutMsg->setKey(destKey);
405 
406  // rest is analog to handlePutCAPIRequest, but for DHTPutCall
407  LookupCall* replicaMsg = new LookupCall();
408  replicaMsg->setKey(teamCopyPutMsg->getKey());
409  replicaMsg->setNumSiblings(floor(numReplica / numReplicaTeams));
410  int nonce = sendInternalRpcCall(OVERLAY_COMP, replicaMsg);
411  rpcIdMap.insert(make_pair(nonce, teamCopyPutMsg));
412  }
413 
414  //asks the replica list
415  LookupCall* replicaMsg = new LookupCall();
416  replicaMsg->setKey(capiPutMsg->getKey());
417  replicaMsg->setNumSiblings(floor(numReplica / numReplicaTeams));
418  int nonce = sendInternalRpcCall(OVERLAY_COMP, replicaMsg);
419  rpcIdMap.insert(make_pair(nonce, capiPutMsg));
420 }
421 
422 void CBRDHT::handleGetCAPIRequest(DHTgetCAPICall* capiGetMsg, int teamnum) {
423  // Extended multi team version, default: teamnum = 0
424  if (teamnum >= numReplicaTeams)
425  return;
426 
427  OverlayKey originalKey = capiGetMsg->getKey();
428  std::vector<OverlayKey> possibleKeys;
429 
430  assert(!originalKey.isUnspecified());
431  possibleKeys.push_back(originalKey);
432 
433  for (int i = 1; i < numReplicaTeams; i++) {
434  // multiple SHA1 hashing of original key
435  OverlayKey keyHash = originalKey;
436  for (int j = 0; j < i; j++) {
437  keyHash = OverlayKey::sha1(BinaryValue(keyHash.toString(16).c_str()));
438  }
439  assert(!keyHash.isUnspecified());
440  possibleKeys.push_back(keyHash);
441  }
442 
443  // Order possible keys by euclidian distance to this node
444  std::vector<OverlayKey> orderedKeys;
445  OverlayKey compareKey = overlay->getThisNode().getKey();
446 
447  while (possibleKeys.size() > 0) {
448  OverlayKey bestKey = possibleKeys[0];
449  int bestpos = 0;
450 
451  // TODO: i = 1?
452  for (uint i = 0; i < possibleKeys.size(); i++) {
453  //std::cout << neighborCache->getOwnEuclidianDistanceToKey(possibleKeys[i]) << std::endl;
454  if (coordBasedRouting
455  ->getEuclidianDistanceByKeyAndCoords(possibleKeys[i],
456  ((const Nps&)neighborCache->getNcsAccess()).getOwnCoordinates(), //TODO
457  overlay->getBitsPerDigit()) <
458  coordBasedRouting
459  ->getEuclidianDistanceByKeyAndCoords(bestKey,
460  ((const Nps&)neighborCache->getNcsAccess()).getOwnCoordinates(), //TODO
461  overlay->getBitsPerDigit())) {
462  bestKey = possibleKeys[i];
463  bestpos = i;
464  }
465  }
466  //std::cout << neighborCache->getOwnEuclidianDistanceToKey(bestKey) << "\n" << std::endl;
467  orderedKeys.push_back(bestKey);
468  possibleKeys.erase(possibleKeys.begin()+bestpos);
469  }
470 
471  /*
472  std::cout << "NodeID: " << overlay->getThisNode().getKey().toString(16) << std::endl;
473  std::cout << "Original Key: " << originalKey.toString(16) << std::endl;
474  for (int i = 0; i < orderedKeys.size(); i++) {
475  std::cout << "Sorted Key " << i << ": " << orderedKeys[i].toString(16) << " (" << overlay->getOwnEuclidianDistanceToKey(orderedKeys[i]) << ")" << std::endl;
476  }
477  */
478 
479  OverlayKey searchKey = orderedKeys[teamnum];
480 
481 #define DIRECT_ROUTE_GET
482 #ifndef DIRECT_ROUTE_GET
483 
484  LookupCall* replicaMsg = new LookupCall();
485  replicaMsg->setKey(searchKey);
486  replicaMsg->setNumSiblings(floor(numReplica / numReplicaTeams));
487  int nonce = sendInternalRpcCall(OVERLAY_COMP, replicaMsg);
488  rpcIdMap.insert(make_pair(nonce, capiGetMsg));
489  lastGetCall = SIMTIME_DBL(simTime());
490 
491 #else
492 
493  GetMapEntry mapEntry;
494  mapEntry.numSent = 0;
495 
496  // Multi team version: Already mapEntry from earlier team?
497  std::map<int, GetMapEntry>::iterator it2 =
498  getMap.find(capiGetMsg->getNonce());
499 
500  if (it2 != getMap.end()) {
501  mapEntry = it2->second;
502  } else {
503  mapEntry.teamNumber = 0;
504  }
505  mapEntry.numAvailableReplica = 1;//lookupMsg->getSiblingsArraySize();
506  mapEntry.numResponses = 0;
507  mapEntry.callMsg = capiGetMsg;
508  mapEntry.hashVector = NULL;
509  mapEntry.replica.clear();
510  for (unsigned int i = 0; i < 1/*lookupMsg->getSiblingsArraySize()*/; i++) {
511  // Simplified GET Request: Just one real request, rest is for fallback
512  if (i == 0) {
513  CBRDHTGetCall* dhtMsg = new CBRDHTGetCall();
514 
515  dhtMsg->setOriginalKey(capiGetMsg->getKey());
516  dhtMsg->setKey(searchKey);//lookupMsg->getKey());
517 
518  dhtMsg->setIsHash(false);
519  dhtMsg->setKind(capiGetMsg->getKind());
520  dhtMsg->setId(capiGetMsg->getId());
521  dhtMsg->setBitLength(GETCALL_L(dhtMsg));
522  RECORD_STATS(normalMessages++;
523  numBytesNormal += dhtMsg->getByteLength());
524 
525  /*int nonce = */sendRouteRpcCall(TIER1_COMP, searchKey, dhtMsg,
526  NULL, DEFAULT_ROUTING, -1, 0,
527  capiGetMsg->getNonce());
528 
529  //rpcIdMap.insert(make_pair(nonce, capiGetMsg));
530  //sendRouteRpcCall(TIER1_COMP, lookupMsg->getSiblings(i), dhtMsg,
531  // NULL, DEFAULT_ROUTING, -1, 0,
532  // capiGetMsg->getNonce());
533  mapEntry.numSent++;
534  } else {
535  //We don't send, we just store the remaining keys as fallback
536  //mapEntry.replica.push_back(lookupMsg->getSiblings(i));
537  }
538  }
539  /*
540  std::cout << "New replica: " << std::endl;
541  for (int i = 0; i < mapEntry.replica.size(); i++) {
542  std::cout << mapEntry.replica[i] << std::endl;
543  }
544  std::cout << "*************************" << std::endl;
545  */
546  if (it2 != getMap.end())
547  getMap.erase(it2);
548  getMap.insert(make_pair(capiGetMsg->getNonce(), mapEntry));
549 #endif
550 }
551 
553 {
554  DHTdumpResponse* response = new DHTdumpResponse();
555  DhtDumpVector* dumpVector = dataStorage->dumpDht();
556 
557  response->setRecordArraySize(dumpVector->size());
558 
559  for (uint i = 0; i < dumpVector->size(); i++) {
560  response->setRecord(i, (*dumpVector)[i]);
561  }
562 
563  delete dumpVector;
564 
565  sendRpcResponse(call, response);
566 }
567 
569 {
570  std::map<int, PutMapEntry>::iterator it2 =
571  putMap.find(rpcId);
572 
573  if (it2 == putMap.end()) //unknown request
574  return;
575 
576  if (dhtMsg->getSuccess()) {
577  it2->second.numResponses++;
578  } else {
579  it2->second.numFailed++;
580  }
581 
582  if (it2->second.numResponses / (double)it2->second.numSent > 0.5) {
583  DHTputCAPIResponse* capiPutRespMsg = new DHTputCAPIResponse();
584  capiPutRespMsg->setIsSuccess(true);
585  sendRpcResponse(it2->second.callMsg, capiPutRespMsg);
586  it2->second.callMsg = NULL;
587  putMap.erase(rpcId);
588  }
589 }
590 
592 {
593  std::map<int, GetMapEntry>::iterator it2 =
594  getMap.find(rpcId);
595 
596  //unknown request
597  if (it2 == getMap.end()) {
598  std::cout << "- 1 -" << std::endl;
599  return;
600  }
601 
602  if (!dhtMsg->getIsHash()) {
603  //std::cout << "[" << overlay->getThisNode().getIp() << "] " << "Received an answer! Sending up key " << dhtMsg->getKey().toString(16) << "(orig: " << dhtMsg->getOriginalKey().toString(16) << ") -- value " << dhtMsg->getHashValue() << std::endl;
604  //std::cout << "Replica left: " << it2->second.replica.size() << std::endl;
605 
606  if (dhtMsg->getHashValue().size() > 0 || dhtMsg->getResultArraySize() > 0) {
607  // Successful Lookup, received a value
608  DHTgetCAPIResponse* capiGetRespMsg = new DHTgetCAPIResponse();
609  //capiGetRespMsg->setKey(dhtMsg->getOriginalKey());
610  //capiGetRespMsg->setValue(dhtMsg->getHashValue());
611  DhtDumpEntry result;
612  result.setKey(dhtMsg->getKey());
613  result.setValue(dhtMsg->getResult(0).getValue());//getHashValue());
614  capiGetRespMsg->setResultArraySize(1);
615  capiGetRespMsg->setResult(0, result);
616 
617  //std::cout << "[" << overlay->getThisNode().getIp() << "] " << "SUCCESSFUL LOOKUP! Sending up key " << dhtMsg->getKey().toString(16) << "(orig: " << dhtMsg->getOriginalKey().toString(16) << ") -- value " << dhtMsg->getHashValue() << std::endl;
618 
619  capiGetRespMsg->setIsSuccess(true);
620  sendRpcResponse(it2->second.callMsg, capiGetRespMsg);
621  getMap.erase(rpcId);
622  return;
623  } else if (it2->second.replica.size() > 0) {
624  // Received empty value, try fallback replica
625  NodeHandle fallbackReplica = it2->second.replica.back();
626 
627  std::cout << "[" << overlay->getThisNode().getIp() << "] " << "Empty value received. Asking replica now ("<< it2->second.replica.size()<<" left)!" << std::endl;
628 
629  CBRDHTGetCall* dhtRecall = new CBRDHTGetCall();
630  dhtRecall->setOriginalKey(dhtMsg->getOriginalKey());
631  dhtRecall->setKey(dhtMsg->getKey());
632  dhtRecall->setIsHash(false);
633  dhtRecall->setBitLength(GETCALL_L(dhtRecall));
634  RECORD_STATS(normalMessages++;
635  numBytesNormal += dhtRecall->getByteLength());
636  sendRouteRpcCall(TIER1_COMP, fallbackReplica, dhtRecall,
637  NULL, DEFAULT_ROUTING, -1, 0,
638  it2->second.callMsg->getNonce());
639  it2->second.numSent++;
640  it2->second.replica.pop_back();
641  return;
642  } else if (it2->second.teamNumber < (numReplicaTeams - 1)) {
643  // No more fallback replica in this team, try next one
644 
645  std::cout << "it2->second.teamNumber (" << it2->second.teamNumber << ") < (numReplicaTeams - 1) (" << (numReplicaTeams - 1) << ")" << std::endl;
646  std::cout << "[" << overlay->getThisNode().getIp() << "] " << "No more fallback replica in this team "<< it2->second.teamNumber<<". Trying next one ("<< it2->second.teamNumber+1 << ")..." << std::endl;
647 
648  it2->second.teamNumber++;
649  handleGetCAPIRequest(it2->second.callMsg, it2->second.teamNumber);
650  return;
651  } else {
652  // No more replica, no more teams, send success == false to Tier 2 :(
653 
654  std::cout << "[" << overlay->getThisNode().getIp() << "] " << "No more fallback replica. Lookup failed. :(" << std::endl;
655 
656  DHTgetCAPIResponse* capiGetRespMsg = new DHTgetCAPIResponse();
657  //capiGetRespMsg->setKey(dhtMsg->getOriginalKey());
658  //capiGetRespMsg->setValue(BinaryValue::UNSPECIFIED_VALUE);
659  DhtDumpEntry result;
660  result.setKey(dhtMsg->getKey());
662  capiGetRespMsg->setResultArraySize(1);
663  capiGetRespMsg->setResult(0, result);
664  capiGetRespMsg->setIsSuccess(false);
665  sendRpcResponse(it2->second.callMsg, capiGetRespMsg);
666  getMap.erase(rpcId);
667  }
668  }
669 }
670 
671 void CBRDHT::update(const NodeHandle& node, bool joined)
672 {
673  OverlayKey key;
674  DHTPutCall* dhtMsg;
675  bool err = false;
676  //DHTData entry;
677  DhtDataEntry entry;
678  //std::map<OverlayKey, DHTData>::iterator it = dataStorage->begin();
679  DhtDataMap::iterator it = dataStorage->begin();
680  for (unsigned int i = 0; i < dataStorage->getSize(); i++) {
681  key = it->first;
682  entry = it->second;
683  if (joined) {
684  if (entry.responsible && (overlay->isSiblingFor(node, key,
685  numReplica, &err)
686  || err)) { // hack for Chord, if we've got a new predecessor
687 
688  dhtMsg = new DHTPutCall();
689  dhtMsg->setKey(key);
690  dhtMsg->setValue(entry.value);
691  dhtMsg->setKind(entry.kind);
692  dhtMsg->setId(entry.id);
693 
694  //dhtMsg->setTtl((int) (entry.ttlMessage->arrivalTime()
695  // - simTime()));
696  dhtMsg->setTtl((int)SIMTIME_DBL(entry.ttlMessage->getArrivalTime()
697  - simTime()));
698  dhtMsg->setIsModifiable(entry.is_modifiable);
699  dhtMsg->setMaintenance(true);
700  dhtMsg->setBitLength(PUTCALL_L(dhtMsg));
701  RECORD_STATS(maintenanceMessages++;
702  numBytesMaintenance += dhtMsg->getByteLength());
703  sendRouteRpcCall(TIER1_COMP, node, dhtMsg);
704  }
705 
706  if (err) {
707  EV << "[DHT::update()]\n"
708  << " Unable to know if key: " << key
709  << " is in range of node: " << node
710  << endl;
711  }
712  } else {
713 #if 0
714  //the update concerns a node who has left
715  //replicate
716  LookupCall* replicaMsg = new LookupCall();
717  replicaMsg->setKey(key);
718  replicaMsg->setNumSiblings(numReplica);
719  int nonce = sendInternalRpcCall(OVERLAY_COMP,
720  replicaMsg);
721  dhtMsg = new DHTPutCall();
722  dhtMsg->setKey(key);
723  dhtMsg->setValue(entry.value);
724  dhtMsg->setTtl((int)(entry.ttlMessage->arrivalTime()
725  - simulation.simTime()));
726  dhtMsg->setIsModifiable(entry.is_modifiable);
727  dhtMsg->setMaintenance(true);
728  dhtMsg->setLength(PUTCALL_L(dhtMsg));
729 
730  rpcIdMap.insert(make_pair(nonce, dhtMsg));
731 #endif
732  }
733 
734  entry.responsible = overlay->isSiblingFor(overlay->getThisNode(),
735  key, 1, &err);
736  it++;
737  }
738 }
739 
741 {
742  std::map<unsigned int, BaseCallMessage*>::iterator it =
743  rpcIdMap.find(lookupMsg->getNonce());
744 
745  if (it == rpcIdMap.end() || it->second == NULL)
746  return;
747 
748  if (dynamic_cast<DHTputCAPICall*> (it->second)) {
749 
750  #if 0
751  cout << "DHT::handleLookupResponse(): PUT "
752  << lookupMsg->getKey() << " ("
753  << overlay->getThisNode().getKey() << ")" << endl;
754 
755  for (unsigned int i = 0; i < lookupMsg->getSiblingsArraySize(); i++) {
756  cout << i << ": " << lookupMsg->getSiblings(i) << endl;
757  }
758 #endif
759 
760  DHTputCAPICall* capiPutMsg = dynamic_cast<DHTputCAPICall*> (it->second);
761  rpcIdMap.erase(lookupMsg->getNonce());
762 
763 
764  if ((lookupMsg->getIsValid() == false)
765  || (lookupMsg->getSiblingsArraySize() == 0)) {
766 
767  EV << "[DHT::handleLookupResponse()]\n"
768  << " Unable to get replica list : invalid lookup"
769  << endl;
770  DHTputCAPIResponse* capiPutRespMsg = new DHTputCAPIResponse();
771  capiPutRespMsg->setIsSuccess(false);
772  sendRpcResponse(capiPutMsg, capiPutRespMsg);
773  return;
774  }
775 
776  for (unsigned int i = 0; i < lookupMsg->getSiblingsArraySize(); i++) {
777  DHTPutCall* dhtMsg = new DHTPutCall();
778  dhtMsg->setKey(capiPutMsg->getKey());
779  dhtMsg->setValue(capiPutMsg->getValue());
780  dhtMsg->setKind(capiPutMsg->getKind());
781  dhtMsg->setId(capiPutMsg->getId());
782  dhtMsg->setTtl(capiPutMsg->getTtl());
783  dhtMsg->setIsModifiable(capiPutMsg->getIsModifiable());
784  dhtMsg->setMaintenance(false);
785  dhtMsg->setBitLength(PUTCALL_L(dhtMsg));
786  RECORD_STATS(normalMessages++;
787  numBytesNormal += dhtMsg->getByteLength());
788  sendRouteRpcCall(TIER1_COMP, lookupMsg->getSiblings(i),
789  dhtMsg, NULL, DEFAULT_ROUTING, -1,
790  0, capiPutMsg->getNonce());
791  }
792 
793  PutMapEntry mapEntry;
794  mapEntry.callMsg = capiPutMsg;
795  mapEntry.numResponses = 0;
796  mapEntry.numFailed = 0;
797  mapEntry.numSent = lookupMsg->getSiblingsArraySize();
798 
799  putMap.insert(make_pair(capiPutMsg->getNonce(), mapEntry));
800  }
801  else if (dynamic_cast<DHTgetCAPICall*>(it->second)) {
802 
803 #if 0
804  cout << "DHT::handleLookupResponse(): GET "
805  << lookupMsg->getKey() << " ("
806  << overlay->getThisNode().getKey() << ")" << endl;
807 
808  for (unsigned int i = 0; i < lookupMsg->getSiblingsArraySize(); i++) {
809  cout << i << ": " << lookupMsg->getSiblings(i) << endl;
810  }
811 #endif
812 
813  DHTgetCAPICall* capiGetMsg = dynamic_cast<DHTgetCAPICall*>(it->second);
814  rpcIdMap.erase(lookupMsg->getNonce());
815 
816  // Invalid lookup
817  if ((lookupMsg->getIsValid() == false)
818  || (lookupMsg->getSiblingsArraySize() == 0)) {
819 
820  EV << "[DHT::handleLookupResponse()]\n"
821  << " Unable to get replica list : invalid lookup"
822  << endl;
823  DHTgetCAPIResponse* capiGetRespMsg = new DHTgetCAPIResponse();
824  //capiGetRespMsg->setKey(lookupMsg->getKey());
825  //capiGetRespMsg->setValue(BinaryValue::UNSPECIFIED_VALUE);
826  DhtDumpEntry result;
827  result.setKey(lookupMsg->getKey());
829  capiGetRespMsg->setResultArraySize(1);
830  capiGetRespMsg->setResult(0, result);
831  capiGetRespMsg->setIsSuccess(false);
832  sendRpcResponse(capiGetMsg, capiGetRespMsg);
833  return;
834  }
835 
836  // Valid lookup
837  GetMapEntry mapEntry;
838  mapEntry.numSent = 0;
839 
840  // Multi team version: Already mapEntry from earlier team?
841 
842  std::map<int, GetMapEntry>::iterator it2 =
843  getMap.find(capiGetMsg->getNonce());
844 
845  if (it2 != getMap.end()) {
846  mapEntry = it2->second;
847  } else {
848  mapEntry.teamNumber = 0;
849  }
850  mapEntry.numAvailableReplica = lookupMsg->getSiblingsArraySize();
851  mapEntry.numResponses = 0;
852  mapEntry.callMsg = capiGetMsg;
853  mapEntry.hashVector = NULL;
854  mapEntry.replica.clear();
855  for (unsigned int i = 0; i < lookupMsg->getSiblingsArraySize(); i++) {
856  // Simplified GET Request: Just one real request, rest is for fallback
857  if (i == 0) {
858  CBRDHTGetCall* dhtMsg = new CBRDHTGetCall();
859 
860  dhtMsg->setOriginalKey(capiGetMsg->getKey());
861  dhtMsg->setKey(lookupMsg->getKey());
862 
863  dhtMsg->setIsHash(false);
864  dhtMsg->setKind(capiGetMsg->getKind());
865  dhtMsg->setId(capiGetMsg->getId());
866  dhtMsg->setBitLength(GETCALL_L(dhtMsg));
867  RECORD_STATS(normalMessages++;
868  numBytesNormal += dhtMsg->getByteLength());
869  sendRouteRpcCall(TIER1_COMP, lookupMsg->getSiblings(i), dhtMsg,
870  NULL, DEFAULT_ROUTING, -1, 0,
871  capiGetMsg->getNonce());
872  mapEntry.numSent++;
873  } else {
874  //We don't send, we just store the remaining keys as fallback
875  mapEntry.replica.push_back(lookupMsg->getSiblings(i));
876  }
877  }
878  /*
879  std::cout << "New replica: " << std::endl;
880  for (int i = 0; i < mapEntry.replica.size(); i++) {
881  std::cout << mapEntry.replica[i] << std::endl;
882  }
883  std::cout << "*************************" << std::endl;
884  */
885  if (it2 != getMap.end())
886  getMap.erase(it2);
887  getMap.insert(make_pair(capiGetMsg->getNonce(), mapEntry));
888  } else if (dynamic_cast<DHTPutCall*>(it->second)) {
889  DHTPutCall* putMsg = dynamic_cast<DHTPutCall*>(it->second);
890  rpcIdMap.erase(lookupMsg->getNonce());
891 
892  if ((lookupMsg->getIsValid() == false)
893  || (lookupMsg->getSiblingsArraySize() == 0)) {
894 
895  EV << "[DHT::handleLookupResponse()]\n"
896  << " Unable to get replica list : invalid lookup"
897  << endl;
898  delete putMsg;
899  return;
900  }
901 
902  for( unsigned int i = 0; i < lookupMsg->getSiblingsArraySize(); i++ ) {
903  RECORD_STATS(maintenanceMessages++;
904  numBytesMaintenance += putMsg->getByteLength());
905 
906  sendRouteRpcCall(TIER1_COMP, lookupMsg->getSiblings(i),
907  new DHTPutCall(*putMsg));
908  }
909 
910  delete putMsg;
911  }
912 }
913 
915 {
916  simtime_t time = globalStatistics->calcMeasuredLifetime(creationTime);
917 
918  if (time != 0) {
919  // std::cout << dataStorage->getSize() << " " << overlay->getThisNode().getKey().toString(16) << std::endl;
920  globalStatistics->addStdDev("DHT: Sent Maintenance Messages/s",
921  maintenanceMessages / time);
922  globalStatistics->addStdDev("DHT: Sent Normal Messages/s",
923  normalMessages / time);
924  globalStatistics->addStdDev("DHT: Sent Maintenance Bytes/s",
925  numBytesMaintenance / time);
926  globalStatistics->addStdDev("DHT: Sent Normal Bytes/s",
927  numBytesNormal / time);
928  }
929 }
930 
932  int bitSize = 0;
933  for (uint i = 0; i < msg->getResultArraySize(); i++) {
934  bitSize += msg->getResult(i).getValue().size();
935 
936  }
937  return bitSize;
938 }
939