OverSim
DHT.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2007 Institut fuer Telematik, Universitaet Karlsruhe (TH)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 //
18 
24 #include <IPAddressResolver.h>
25 
26 #include "DHT.h"
27 
28 #include <RpcMacros.h>
29 #include <BaseRpc.h>
30 #include <GlobalStatistics.h>
31 
33 
34 using namespace std;
35 
37 {
38  dataStorage = NULL;
39 }
40 
42 {
43  PendingRpcs::iterator it;
44 
45  for (it = pendingRpcs.begin(); it != pendingRpcs.end(); it++) {
46  delete(it->second.putCallMsg);
47  delete(it->second.getCallMsg);
48  }
49 
50  pendingRpcs.clear();
51 
52  if (dataStorage != NULL) {
53  dataStorage->clear();
54  }
55 }
56 
57 void DHT::initializeApp(int stage)
58 {
59  if (stage != MIN_STAGE_APP)
60  return;
61 
62  dataStorage = check_and_cast<DHTDataStorage*>
63  (getParentModule()->getSubmodule("dhtDataStorage"));
64 
65  numReplica = par("numReplica");
66  numGetRequests = par("numGetRequests");
67  ratioIdentical = par("ratioIdentical");
68  secureMaintenance = par("secureMaintenance");
69  invalidDataAttack = par("invalidDataAttack");
70  maintenanceAttack = par("maintenanceAttack");
71 
72  if ((int)numReplica > overlay->getMaxNumSiblings()) {
73  opp_error("DHT::initialize(): numReplica bigger than what this "
74  "overlay can handle (%d)", overlay->getMaxNumSiblings());
75  }
76 
77  maintenanceMessages = 0;
78  normalMessages = 0;
79  numBytesMaintenance = 0;
80  numBytesNormal = 0;
81  WATCH(maintenanceMessages);
82  WATCH(normalMessages);
83  WATCH(numBytesNormal);
84  WATCH(numBytesMaintenance);
85  WATCH_MAP(pendingRpcs);
86 }
87 
88 void DHT::handleTimerEvent(cMessage* msg)
89 {
90  DHTTtlTimer* msg_timer = dynamic_cast<DHTTtlTimer*> (msg);
91 
92  if (msg_timer) {
93  EV << "[DHT::handleTimerEvent()]\n"
94  << " received timer ttl, key: "
95  << msg_timer->getKey().toString(16)
96  << "\n (overlay->getThisNode().getKey() = "
97  << overlay->getThisNode().getKey().toString(16) << ")"
98  << endl;
99 
100  dataStorage->removeData(msg_timer->getKey(), msg_timer->getKind(),
101  msg_timer->getId());
102  }
103 }
104 
106 {
107  RPC_SWITCH_START(msg)
108  // RPCs between nodes
109  RPC_DELEGATE(DHTPut, handlePutRequest);
110  RPC_DELEGATE(DHTGet, handleGetRequest);
111  // internal RPCs
112  RPC_DELEGATE(DHTputCAPI, handlePutCAPIRequest);
113  RPC_DELEGATE(DHTgetCAPI, handleGetCAPIRequest);
114  RPC_DELEGATE(DHTdump, handleDumpDhtRequest);
115  RPC_SWITCH_END( )
116 
117  return RPC_HANDLED;
118 }
119 
120 void DHT::handleRpcResponse(BaseResponseMessage* msg, cPolymorphic* context,
121  int rpcId, simtime_t rtt)
122 {
123  RPC_SWITCH_START(msg)
124  RPC_ON_RESPONSE(DHTPut){
125  handlePutResponse(_DHTPutResponse, rpcId);
126  EV << "[DHT::handleRpcResponse()]\n"
127  << " DHT Put RPC Response received: id=" << rpcId
128  << " msg=" << *_DHTPutResponse << " rtt=" << rtt
129  << endl;
130  break;
131  }
132  RPC_ON_RESPONSE(DHTGet) {
133  handleGetResponse(_DHTGetResponse, rpcId);
134  EV << "[DHT::handleRpcResponse()]\n"
135  << " DHT Get RPC Response received: id=" << rpcId
136  << " msg=" << *_DHTGetResponse << " rtt=" << rtt
137  << endl;
138  break;
139  }
141  handleLookupResponse(_LookupResponse, rpcId);
142  EV << "[DHT::handleRpcResponse()]\n"
143  << " Lookup RPC Response received: id=" << rpcId
144  << " msg=" << *_LookupResponse << " rtt=" << rtt
145  << endl;
146  break;
147  }
149 }
150 
152  cPolymorphic* context, int rpcId,
153  const OverlayKey& destKey)
154 {
155  RPC_SWITCH_START(msg)
156  RPC_ON_CALL(DHTPut){
157  EV << "[DHT::handleRpcResponse()]\n"
158  << " DHTPut Timeout"
159  << endl;
160 
161  PendingRpcs::iterator it = pendingRpcs.find(rpcId);
162 
163  if (it == pendingRpcs.end()) // unknown request
164  return;
165 
166  it->second.numFailed++;
167 
168  if (it->second.numFailed / (double)it->second.numSent >= 0.5) {
169  DHTputCAPIResponse* capiPutRespMsg = new DHTputCAPIResponse();
170  capiPutRespMsg->setIsSuccess(false);
171  sendRpcResponse(it->second.putCallMsg, capiPutRespMsg);
172  //cout << "timeout 1" << endl;
173  pendingRpcs.erase(rpcId);
174  }
175 
176  break;
177  }
178  RPC_ON_CALL(DHTGet) {
179  EV << "[DHT::handleRpcResponse()]\n"
180  << " DHTGet Timeout"
181  << endl;
182 
183  PendingRpcs::iterator it = pendingRpcs.find(rpcId);
184 
185  if (it == pendingRpcs.end()) { // unknown request
186  return;
187  }
188 
189  if (it->second.state == GET_VALUE_SENT) {
190  // we have sent a 'real' get request
191  // ask anyone else, if possible
192  if ((it->second.hashVector != NULL)
193  && (it->second.hashVector->size() > 0)) {
194 
195  DHTGetCall* getCall = new DHTGetCall();
196  getCall->setKey(_DHTGetCall->getKey());
197  getCall->setKind(_DHTGetCall->getKind());
198  getCall->setId(_DHTGetCall->getId());
199  getCall->setIsHash(false);
200  getCall->setBitLength(GETCALL_L(getCall));
201  RECORD_STATS(normalMessages++;
202  numBytesNormal += getCall->getByteLength());
203 
204  sendRouteRpcCall(TIER1_COMP, it->second.hashVector->back(),
205  getCall, NULL, DEFAULT_ROUTING, -1, 0, rpcId);
206  it->second.hashVector->pop_back();
207  } else {
208  // no one else
209  DHTgetCAPIResponse* capiGetRespMsg = new DHTgetCAPIResponse();
210  capiGetRespMsg->setIsSuccess(false);
211  sendRpcResponse(it->second.getCallMsg,
212  capiGetRespMsg);
213  //cout << "DHT: GET failed: timeout (no one else)" << endl;
214  pendingRpcs.erase(rpcId);
215  return;
216  }
217  } else {
218  // timeout while waiting for hashes
219  // try to ask another one of the replica list for the hash
220  if (it->second.replica.size() > 0) {
221  DHTGetCall* getCall = new DHTGetCall();
222  getCall->setKey(_DHTGetCall->getKey());
223  getCall->setKind(_DHTGetCall->getKind());
224  getCall->setId(_DHTGetCall->getId());
225  getCall->setIsHash(true);
226  getCall->setBitLength(GETCALL_L(getCall));
227 
228  RECORD_STATS(normalMessages++;
229  numBytesNormal += getCall->getByteLength());
230 
231  sendRouteRpcCall(TIER1_COMP, it->second.replica.back(),
232  getCall, NULL, DEFAULT_ROUTING, -1, 0,
233  rpcId);
234  it->second.replica.pop_back();
235  } else {
236  // no one else to ask, see what we can do with what we have
237  if (it->second.numResponses > 0) {
238  unsigned int maxCount = 0;
239  NodeVector* hashVector = NULL;
240  std::map<BinaryValue, NodeVector>::iterator itHashes;
241  for (itHashes = it->second.hashes.begin();
242  itHashes != it->second.hashes.end(); itHashes++) {
243 
244  if (itHashes->second.size() > maxCount) {
245  maxCount = itHashes->second.size();
246  hashVector = &(itHashes->second);
247  }
248  }
249 
250  // since it makes no difference for us, if we
251  // return a invalid result or return nothing,
252  // we simply return the value with the highest probability
253  it->second.hashVector = hashVector;
254 #if 0
255  if ((double)maxCount/(double)it->second.numResponses >=
256  ratioIdentical) {
257  it->second.hashVector = hashVector;
258  }
259 #endif
260  }
261 
262  if ((it->second.hashVector != NULL)
263  && (it->second.hashVector->size() > 0)) {
264 
265  DHTGetCall* getCall = new DHTGetCall();
266  getCall->setKey(_DHTGetCall->getKey());
267  getCall->setKind(_DHTGetCall->getKind());
268  getCall->setId(_DHTGetCall->getId());
269  getCall->setIsHash(false);
270  getCall->setBitLength(GETCALL_L(getCall));
271  RECORD_STATS(normalMessages++;
272  numBytesNormal += getCall->getByteLength());
273  sendRouteRpcCall(TIER1_COMP, it->second.hashVector->back(),
274  getCall, NULL, DEFAULT_ROUTING, -1,
275  0, rpcId);
276  it->second.hashVector->pop_back();
277  } else {
278  // no more nodes to ask -> get failed
279  DHTgetCAPIResponse* capiGetRespMsg = new DHTgetCAPIResponse();
280  capiGetRespMsg->setIsSuccess(false);
281  sendRpcResponse(it->second.getCallMsg, capiGetRespMsg);
282  //cout << "DHT: GET failed: timeout2 (no one else)" << endl;
283  pendingRpcs.erase(rpcId);
284  }
285  }
286  }
287  break;
288  }
289  RPC_SWITCH_END( )
290 }
291 
293 {
294  std::string tempString = "PUT_REQUEST received: "
295  + std::string(dhtMsg->getKey().toString(16));
296  getParentModule()->getParentModule()->bubble(tempString.c_str());
297 
298  bool err;
299  bool isSibling = overlay->isSiblingFor(overlay->getThisNode(),
300  dhtMsg->getKey(), secureMaintenance ? numReplica : 1, &err);
301  if (err) {
302  isSibling = true;
303  }
304 
305  if (secureMaintenance && dhtMsg->getMaintenance()) {
306  DhtDataEntry* entry = dataStorage->getDataEntry(dhtMsg->getKey(),
307  dhtMsg->getKind(),
308  dhtMsg->getId());
309  if (entry == NULL) {
310  // add ttl timer
311  DHTTtlTimer *timerMsg = new DHTTtlTimer("ttl_timer");
312  timerMsg->setKey(dhtMsg->getKey());
313  timerMsg->setKind(dhtMsg->getKind());
314  timerMsg->setId(dhtMsg->getId());
315  scheduleAt(simTime() + dhtMsg->getTtl(), timerMsg);
316 
317  entry = dataStorage->addData(dhtMsg->getKey(), dhtMsg->getKind(),
318  dhtMsg->getId(), dhtMsg->getValue(), timerMsg,
319  dhtMsg->getIsModifiable(), dhtMsg->getSrcNode(),
320  isSibling);
321  } else if ((entry->siblingVote.size() == 0) && isSibling) {
322  // we already have a verified entry with this key and are
323  // still responsible => ignore maintenance calls
324  delete dhtMsg;
325  return;
326  }
327 
328  SiblingVoteMap::iterator it = entry->siblingVote.find(dhtMsg->getValue());
329  if (it == entry->siblingVote.end()) {
330  // new hash
331  NodeVector vect;
332  vect.add(dhtMsg->getSrcNode());
333  entry->siblingVote.insert(make_pair(dhtMsg->getValue(),
334  vect));
335  } else {
336  it->second.add(dhtMsg->getSrcNode());
337  }
338 
339  size_t maxCount = 0;
340  SiblingVoteMap::iterator majorityIt;
341 
342  for (it = entry->siblingVote.begin(); it != entry->siblingVote.end(); it++) {
343  if (it->second.size() > maxCount) {
344  maxCount = it->second.size();
345  majorityIt = it;
346  }
347  }
348 
349  entry->value = majorityIt->first;
350  entry->responsible = true;
351 
352  if (maxCount > numReplica) {
353  entry->siblingVote.clear();
354  }
355 
356  // send back
357  DHTPutResponse* responseMsg = new DHTPutResponse();
358  responseMsg->setSuccess(true);
359  responseMsg->setBitLength(PUTRESPONSE_L(responseMsg));
360  RECORD_STATS(normalMessages++; numBytesNormal += responseMsg->getByteLength());
361 
362  sendRpcResponse(dhtMsg, responseMsg);
363 
364  return;
365  }
366 
367 #if 0
368  if (!(dataStorage->isModifiable(dhtMsg->getKey(), dhtMsg->getKind(),
369  dhtMsg->getId()))) {
370  // check if the put request came from the right node
371  NodeHandle sourceNode = dataStorage->getSourceNode(dhtMsg->getKey(),
372  dhtMsg->getKind(), dhtMsg->getId());
373  if (((!sourceNode.isUnspecified())
374  && (!dhtMsg->getSrcNode().isUnspecified()) && (sourceNode
375  != dhtMsg->getSrcNode())) || ((dhtMsg->getMaintenance())
376  && (dhtMsg->getOwnerNode() == sourceNode))) {
377  // TODO: set owner
378  DHTPutResponse* responseMsg = new DHTPutResponse();
379  responseMsg->setSuccess(false);
380  responseMsg->setBitLength(PUTRESPONSE_L(responseMsg));
381  RECORD_STATS(normalMessages++;
382  numBytesNormal += responseMsg->getByteLength());
383  sendRpcResponse(dhtMsg, responseMsg);
384  return;
385  }
386 
387  }
388 #endif
389 
390  // remove data item from local data storage
391  dataStorage->removeData(dhtMsg->getKey(), dhtMsg->getKind(),
392  dhtMsg->getId());
393 
394  if (dhtMsg->getValue().size() > 0) {
395  // add ttl timer
396  DHTTtlTimer *timerMsg = new DHTTtlTimer("ttl_timer");
397  timerMsg->setKey(dhtMsg->getKey());
398  timerMsg->setKind(dhtMsg->getKind());
399  timerMsg->setId(dhtMsg->getId());
400  scheduleAt(simTime() + dhtMsg->getTtl(), timerMsg);
401  // storage data item in local data storage
402  dataStorage->addData(dhtMsg->getKey(), dhtMsg->getKind(),
403  dhtMsg->getId(), dhtMsg->getValue(), timerMsg,
404  dhtMsg->getIsModifiable(), dhtMsg->getSrcNode(),
405  isSibling);
406  }
407 
408  // send back
409  DHTPutResponse* responseMsg = new DHTPutResponse();
410  responseMsg->setSuccess(true);
411  responseMsg->setBitLength(PUTRESPONSE_L(responseMsg));
412  RECORD_STATS(normalMessages++; numBytesNormal += responseMsg->getByteLength());
413 
414  sendRpcResponse(dhtMsg, responseMsg);
415 }
416 
418 {
419  std::string tempString = "GET_REQUEST received: "
420  + std::string(dhtMsg->getKey().toString(16));
421 
422  getParentModule()->getParentModule()->bubble(tempString.c_str());
423 
424  if (dhtMsg->getKey().isUnspecified()) {
425  throw cRuntimeError("DHT::handleGetRequest: Unspecified key!");
426  }
427 
428  DhtDumpVector* dataVect = dataStorage->dumpDht(dhtMsg->getKey(),
429  dhtMsg->getKind(),
430  dhtMsg->getId());
431 
432  if (overlay->isMalicious() && invalidDataAttack) {
433  dataVect->resize(1);
434  dataVect->at(0).setKey(dhtMsg->getKey());
435  dataVect->at(0).setKind(dhtMsg->getKind());
436  dataVect->at(0).setId(dhtMsg->getId());
437  dataVect->at(0).setValue("Modified Data");
438  dataVect->at(0).setTtl(3600*24*365);
439  dataVect->at(0).setOwnerNode(overlay->getThisNode());
440  dataVect->at(0).setIs_modifiable(false);
441  dataVect->at(0).setResponsible(true);
442  }
443 
444  // send back
445  DHTGetResponse* responseMsg = new DHTGetResponse();
446  responseMsg->setKey(dhtMsg->getKey());
447  responseMsg->setIsHash(dhtMsg->getIsHash());
448 
449  if (dataVect->size() == 0) {
451  responseMsg->setResultArraySize(0);
452  } else {
453  if (dhtMsg->getIsHash()) {
454  // TODO: verify this
455  BinaryValue resultValues;
456  for (uint32_t i = 0; i < dataVect->size(); i++) {
457  resultValues += (*dataVect)[i].getValue();
458  }
459 
460  CSHA1 sha1;
461  BinaryValue hashValue(20);
462  sha1.Reset();
463  sha1.Update((uint8_t*) (&(*resultValues.begin())),
464  resultValues.size());
465  sha1.Final();
466  sha1.GetHash((unsigned char*)&hashValue[0]);
467 
468  responseMsg->setHashValue(hashValue);
469  } else {
470  responseMsg->setResultArraySize(dataVect->size());
471 
472  for (uint32_t i = 0; i < dataVect->size(); i++) {
473  responseMsg->setResult(i, (*dataVect)[i]);
474  }
475 
476  }
477  }
478  delete dataVect;
479 
480  responseMsg->setBitLength(GETRESPONSE_L(responseMsg));
481  RECORD_STATS(normalMessages++;
482  numBytesNormal += responseMsg->getByteLength());
483  sendRpcResponse(dhtMsg, responseMsg);
484 }
485 
487 {
488  // asks the replica list
489  LookupCall* lookupCall = new LookupCall();
490  lookupCall->setKey(capiPutMsg->getKey());
491  lookupCall->setNumSiblings(numReplica);
492  sendInternalRpcCall(OVERLAY_COMP, lookupCall, NULL, -1, 0,
493  capiPutMsg->getNonce());
494 
495  PendingRpcsEntry entry;
496  entry.putCallMsg = capiPutMsg;
497  entry.state = LOOKUP_STARTED;
498  pendingRpcs.insert(make_pair(capiPutMsg->getNonce(), entry));
499 }
500 
502 {
503  LookupCall* lookupCall = new LookupCall();
504  lookupCall->setKey(capiGetMsg->getKey());
505  lookupCall->setNumSiblings(numReplica);
506  sendInternalRpcCall(OVERLAY_COMP, lookupCall, NULL, -1, 0,
507  capiGetMsg->getNonce());
508 
509  PendingRpcsEntry entry;
510  entry.getCallMsg = capiGetMsg;
511  entry.state = LOOKUP_STARTED;
512  pendingRpcs.insert(make_pair(capiGetMsg->getNonce(), entry));
513 }
514 
516 {
517  DHTdumpResponse* response = new DHTdumpResponse();
518  DhtDumpVector* dumpVector = dataStorage->dumpDht();
519 
520  response->setRecordArraySize(dumpVector->size());
521 
522  for (uint32_t i = 0; i < dumpVector->size(); i++) {
523  response->setRecord(i, (*dumpVector)[i]);
524  }
525 
526  delete dumpVector;
527 
528  sendRpcResponse(call, response);
529 }
530 
531 void DHT::handlePutResponse(DHTPutResponse* dhtMsg, int rpcId)
532 {
533  PendingRpcs::iterator it = pendingRpcs.find(rpcId);
534 
535  if (it == pendingRpcs.end()) // unknown request
536  return;
537 
538  if (dhtMsg->getSuccess()) {
539  it->second.numResponses++;
540  } else {
541  it->second.numFailed++;
542  }
543 
544 
545 // if ((it->second.numFailed + it->second.numResponses) == it->second.numSent) {
546  if (it->second.numResponses / (double)it->second.numSent > 0.5) {
547 
548  DHTputCAPIResponse* capiPutRespMsg = new DHTputCAPIResponse();
549  capiPutRespMsg->setIsSuccess(true);
550  sendRpcResponse(it->second.putCallMsg, capiPutRespMsg);
551  pendingRpcs.erase(rpcId);
552  }
553 }
554 
555 void DHT::handleGetResponse(DHTGetResponse* dhtMsg, int rpcId)
556 {
557  NodeVector* hashVector = NULL;
558  PendingRpcs::iterator it = pendingRpcs.find(rpcId);
559 
560  if (it == pendingRpcs.end()) // unknown request
561  return;
562 
563  if (it->second.state == GET_VALUE_SENT) {
564  // we have sent a 'real' get request
565  if (!dhtMsg->getIsHash()) {
566  // TODO verify hash
567  DHTgetCAPIResponse* capiGetRespMsg = new DHTgetCAPIResponse();
568  capiGetRespMsg->setResultArraySize(dhtMsg->getResultArraySize());
569  for (uint i = 0; i < dhtMsg->getResultArraySize(); i++) {
570  capiGetRespMsg->setResult(i, dhtMsg->getResult(i));
571  }
572  capiGetRespMsg->setIsSuccess(true);
573  sendRpcResponse(it->second.getCallMsg, capiGetRespMsg);
574  pendingRpcs.erase(rpcId);
575  return;
576  }
577  }
578 
579  if (dhtMsg->getIsHash()) {
580  std::map<BinaryValue, NodeVector>::iterator itHashes =
581  it->second.hashes.find(dhtMsg->getHashValue());
582 
583  if (itHashes == it->second.hashes.end()) {
584  // new hash
585  NodeVector vect;
586  vect.push_back(dhtMsg->getSrcNode());
587  it->second.hashes.insert(make_pair(dhtMsg->getHashValue(),
588  vect));
589  } else {
590  itHashes->second.push_back(dhtMsg->getSrcNode());
591  }
592 
593  it->second.numResponses++;
594 
595  if (it->second.state == GET_VALUE_SENT) {
596  // we have already sent a real get request
597  return;
598  }
599 
600  // count the maximum number of equal hash values received so far
601  unsigned int maxCount = 0;
602 
603 
604  for (itHashes = it->second.hashes.begin();
605  itHashes != it->second.hashes.end(); itHashes++) {
606 
607  if (itHashes->second.size() > maxCount) {
608  maxCount = itHashes->second.size();
609  hashVector = &(itHashes->second);
610  }
611  }
612 
613  if ((double) maxCount / (double) it->second.numAvailableReplica
614  >= ratioIdentical) {
615  it->second.hashVector = hashVector;
616  } else if (it->second.numResponses >= numGetRequests) {
617  // we'll try to ask some other nodes
618  if (it->second.replica.size() > 0) {
619  DHTGetCall* getCall = new DHTGetCall();
620  getCall->setKey(it->second.getCallMsg->getKey());
621  getCall->setKind(it->second.getCallMsg->getKind());
622  getCall->setId(it->second.getCallMsg->getId());
623  getCall->setIsHash(true);
624  getCall->setBitLength(GETCALL_L(getCall));
625  RECORD_STATS(normalMessages++;
626  numBytesNormal += getCall->getByteLength());
627  sendRouteRpcCall(TIER1_COMP,
628  it->second.replica.back(), getCall,
629  NULL, DEFAULT_ROUTING, -1, 0, rpcId);
630  it->second.replica.pop_back();
631  it->second.state = GET_HASH_SENT;
632  } else if (hashVector == NULL) {
633  // we don't have anyone else to ask and no hash
634  DHTgetCAPIResponse* capiGetRespMsg =
635  new DHTgetCAPIResponse();
636  DhtDumpEntry result;
637  result.setKey(dhtMsg->getKey());
639  capiGetRespMsg->setResultArraySize(1);
640  capiGetRespMsg->setResult(0, result);
641  capiGetRespMsg->setIsSuccess(false);
642  sendRpcResponse(it->second.getCallMsg, capiGetRespMsg);
643 #if 0
644  cout << "DHT: GET failed: hash (no one else)" << endl;
645  cout << "numResponses: " << it->second.numResponses
646  << " numAvailableReplica: " << it->second.numAvailableReplica << endl;
647 
648  for (itHashes = it->second.hashes.begin();
649  itHashes != it->second.hashes.end(); itHashes++) {
650  cout << " - " << itHashes->first << " ("
651  << itHashes->second.size() << ")" << endl;
652  }
653 #endif
654 
655  pendingRpcs.erase(rpcId);
656  return;
657  } else {
658  // we don't have anyone else to ask => take what we've got
659  it->second.hashVector = hashVector;
660  }
661  }
662  }
663 
664  if ((it->second.state != GET_VALUE_SENT) &&
665  (it->second.hashVector != NULL)) {
666  // we have already received all the response and chosen a hash
667  if (it->second.hashVector->size() > 0) {
668  DHTGetCall* getCall = new DHTGetCall();
669  getCall->setKey(it->second.getCallMsg->getKey());
670  getCall->setKind(it->second.getCallMsg->getKind());
671  getCall->setId(it->second.getCallMsg->getId());
672  getCall->setIsHash(false);
673  getCall->setBitLength(GETCALL_L(getCall));
674  RECORD_STATS(normalMessages++;
675  numBytesNormal += getCall->getByteLength());
676  sendRouteRpcCall(TIER1_COMP, it->second.hashVector->back(),
677  getCall, NULL, DEFAULT_ROUTING, -1, 0, rpcId);
678  it->second.hashVector->pop_back();
679  it->second.state = GET_VALUE_SENT;
680  } else { // we don't have anyone else to ask
681  DHTgetCAPIResponse* capiGetRespMsg = new DHTgetCAPIResponse();
682  capiGetRespMsg->setResultArraySize(0);
683  sendRpcResponse(it->second.getCallMsg, capiGetRespMsg);
684  //cout << "DHT: GET failed: hash2 (no one else)" << endl;
685  pendingRpcs.erase(rpcId);
686  }
687  }
688 }
689 
690 void DHT::update(const NodeHandle& node, bool joined)
691 {
692  OverlayKey key;
693  bool err = false;
694  DhtDataEntry entry;
695  std::map<OverlayKey, DhtDataEntry>::iterator it;
696 
697  EV << "[DHT::update() @ " << overlay->getThisNode().getIp()
698  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
699  << " Update called()"
700  << endl;
701 
702  if (secureMaintenance) {
703  for (it = dataStorage->begin(); it != dataStorage->end(); it++) {
704  if (it->second.responsible) {
705  NodeVector* siblings = overlay->local_lookup(it->first,
706  numReplica,
707  false);
708  if (siblings->size() == 0) {
709  delete siblings;
710  continue;
711  }
712 
713  if (joined) {
714  EV << "[DHT::update() @ " << overlay->getThisNode().getIp()
715  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
716  << " Potential new sibling for record " << it->first
717  << endl;
718 
719  if (overlay->distance(node.getKey(), it->first) <=
720  overlay->distance(siblings->back().getKey(), it->first)) {
721 
722  sendMaintenancePutCall(node, it->first, it->second);
723  }
724 
725  if (overlay->distance(overlay->getThisNode().getKey(), it->first) >
726  overlay->distance(siblings->back().getKey(), it->first)) {
727 
728  it->second.responsible = false;
729  }
730  } else {
731  if (overlay->distance(node.getKey(), it->first) <
732  overlay->distance(siblings->back().getKey(), it->first)) {
733 
734  sendMaintenancePutCall(siblings->back(), it->first,
735  it->second);
736  }
737  }
738 
739  delete siblings;
740  }
741  }
742 
743  return;
744  }
745 
746  for (it = dataStorage->begin(); it != dataStorage->end(); it++) {
747  key = it->first;
748  entry = it->second;
749  if (joined) {
750  if (entry.responsible && (overlay->isSiblingFor(node, key,
751  numReplica, &err)
752  || err)) { // hack for Chord, if we've got a new predecessor
753 
754  if (err) {
755  EV << "[DHT::update()]\n"
756  << " Unable to know if key: " << key
757  << " is in range of node: " << node
758  << endl;
759  // For Chord: we've got a new predecessor
760  // TODO: only send record, if we are not responsible any more
761  // TODO: check all protocols to change routing table first,
762  // and than call update.
763 
764  //if (overlay->isSiblingFor(overlay->getThisNode(), key, 1, &err)) {
765  // continue;
766  //}
767  }
768 
769  sendMaintenancePutCall(node, key, entry);
770  }
771  }
772  //TODO: move this to the inner block above?
773  entry.responsible = overlay->isSiblingFor(overlay->getThisNode(),
774  key, 1, &err);
775  }
776 }
777 
779  const OverlayKey& key,
780  const DhtDataEntry& entry) {
781 
782  DHTPutCall* dhtMsg = new DHTPutCall();
783 
784  dhtMsg->setKey(key);
785  dhtMsg->setKind(entry.kind);
786  dhtMsg->setId(entry.id);
787 
788  if (overlay->isMalicious() && maintenanceAttack) {
789  dhtMsg->setValue("Modified Data");
790  } else {
791  dhtMsg->setValue(entry.value);
792  }
793 
794  dhtMsg->setTtl((int)SIMTIME_DBL(entry.ttlMessage->getArrivalTime()
795  - simTime()));
796  dhtMsg->setIsModifiable(entry.is_modifiable);
797  dhtMsg->setMaintenance(true);
798  dhtMsg->setBitLength(PUTCALL_L(dhtMsg));
799  RECORD_STATS(maintenanceMessages++;
800  numBytesMaintenance += dhtMsg->getByteLength());
801 
802  sendRouteRpcCall(TIER1_COMP, node, dhtMsg);
803 }
804 
805 void DHT::handleLookupResponse(LookupResponse* lookupMsg, int rpcId)
806 {
807  PendingRpcs::iterator it = pendingRpcs.find(rpcId);
808 
809  if (it == pendingRpcs.end()) {
810  return;
811  }
812 
813  if (it->second.putCallMsg != NULL) {
814 
815 #if 0
816  cout << "DHT::handleLookupResponse(): PUT "
817  << lookupMsg->getKey() << " ("
818  << overlay->getThisNode().getKey() << ")" << endl;
819 
820  for (unsigned int i = 0; i < lookupMsg->getSiblingsArraySize(); i++) {
821  cout << i << ": " << lookupMsg->getSiblings(i) << endl;
822  }
823 #endif
824 
825  if ((lookupMsg->getIsValid() == false)
826  || (lookupMsg->getSiblingsArraySize() == 0)) {
827 
828  EV << "[DHT::handleLookupResponse()]\n"
829  << " Unable to get replica list : invalid lookup"
830  << endl;
831  DHTputCAPIResponse* capiPutRespMsg = new DHTputCAPIResponse();
832  capiPutRespMsg->setIsSuccess(false);
833  //cout << "DHT::lookup failed" << endl;
834  sendRpcResponse(it->second.putCallMsg, capiPutRespMsg);
835  pendingRpcs.erase(rpcId);
836  return;
837  }
838 
839  if ((it->second.putCallMsg->getId() == 0) &&
840  (it->second.putCallMsg->getValue().size() > 0)) {
841  // pick a random id before replication of the data item
842  // id 0 is kept for delete requests (i.e. a put with empty value)
843  it->second.putCallMsg->setId(intuniform(1, 2147483647));
844  }
845 
846  for (unsigned int i = 0; i < lookupMsg->getSiblingsArraySize(); i++) {
847  DHTPutCall* dhtMsg = new DHTPutCall();
848  dhtMsg->setKey(it->second.putCallMsg->getKey());
849  dhtMsg->setKind(it->second.putCallMsg->getKind());
850  dhtMsg->setId(it->second.putCallMsg->getId());
851  dhtMsg->setValue(it->second.putCallMsg->getValue());
852  dhtMsg->setTtl(it->second.putCallMsg->getTtl());
853  dhtMsg->setIsModifiable(it->second.putCallMsg->getIsModifiable());
854  dhtMsg->setMaintenance(false);
855  dhtMsg->setBitLength(PUTCALL_L(dhtMsg));
856  RECORD_STATS(normalMessages++;
857  numBytesNormal += dhtMsg->getByteLength());
858  sendRouteRpcCall(TIER1_COMP, lookupMsg->getSiblings(i),
859  dhtMsg, NULL, DEFAULT_ROUTING, -1,
860  0, rpcId);
861  }
862 
863  it->second.state = PUT_SENT;
864  it->second.numResponses = 0;
865  it->second.numFailed = 0;
866  it->second.numSent = lookupMsg->getSiblingsArraySize();
867  }
868  else if (it->second.getCallMsg != NULL) {
869 
870 #if 0
871  cout << "DHT::handleLookupResponse(): GET "
872  << lookupMsg->getKey() << " ("
873  << overlay->getThisNode().getKey() << ")" << endl;
874 
875  for (unsigned int i = 0; i < lookupMsg->getSiblingsArraySize(); i++) {
876  cout << i << ": " << lookupMsg->getSiblings(i) << endl;
877  }
878 #endif
879 
880  if ((lookupMsg->getIsValid() == false)
881  || (lookupMsg->getSiblingsArraySize() == 0)) {
882 
883  EV << "[DHT::handleLookupResponse()]\n"
884  << " Unable to get replica list : invalid lookup"
885  << endl;
886  DHTgetCAPIResponse* capiGetRespMsg = new DHTgetCAPIResponse();
887  DhtDumpEntry result;
888  result.setKey(lookupMsg->getKey());
890  capiGetRespMsg->setResultArraySize(1);
891  capiGetRespMsg->setResult(0, result);
892  capiGetRespMsg->setIsSuccess(false);
893  //cout << "DHT: lookup failed 2" << endl;
894  sendRpcResponse(it->second.getCallMsg, capiGetRespMsg);
895  pendingRpcs.erase(rpcId);
896  return;
897  }
898 
899  it->second.numSent = 0;
900 
901  for (unsigned int i = 0; i < lookupMsg->getSiblingsArraySize(); i++) {
902  if (i < (unsigned int)numGetRequests) {
903  DHTGetCall* dhtMsg = new DHTGetCall();
904  dhtMsg->setKey(it->second.getCallMsg->getKey());
905  dhtMsg->setKind(it->second.getCallMsg->getKind());
906  dhtMsg->setId(it->second.getCallMsg->getId());
907  dhtMsg->setIsHash(true);
908  dhtMsg->setBitLength(GETCALL_L(dhtMsg));
909  RECORD_STATS(normalMessages++;
910  numBytesNormal += dhtMsg->getByteLength());
911  sendRouteRpcCall(TIER1_COMP, lookupMsg->getSiblings(i), dhtMsg,
912  NULL, DEFAULT_ROUTING, -1, 0, rpcId);
913  it->second.numSent++;
914  } else {
915  // we don't send, we just store the remaining keys
916  it->second.replica.push_back(lookupMsg->getSiblings(i));
917  }
918  }
919 
920  it->second.numAvailableReplica = lookupMsg->getSiblingsArraySize();
921  it->second.numResponses = 0;
922  it->second.hashVector = NULL;
923  it->second.state = GET_HASH_SENT;
924  }
925 }
926 
928 {
929  simtime_t time = globalStatistics->calcMeasuredLifetime(creationTime);
930 
931  if (time >= GlobalStatistics::MIN_MEASURED) {
932  globalStatistics->addStdDev("DHT: Sent Maintenance Messages/s",
933  maintenanceMessages / time);
934  globalStatistics->addStdDev("DHT: Sent Normal Messages/s",
935  normalMessages / time);
936  globalStatistics->addStdDev("DHT: Sent Maintenance Bytes/s",
937  numBytesMaintenance / time);
938  globalStatistics->addStdDev("DHT: Sent Normal Bytes/s",
939  numBytesNormal / time);
940  }
941 }
942 
944  int bitSize = 0;
945  for (uint i = 0; i < msg->getResultArraySize(); i++) {
946  bitSize += msg->getResult(i).getValue().size();
947 
948  }
949  return bitSize;
950 }
951 
952 std::ostream& operator<<(std::ostream& os, const DHT::PendingRpcsEntry& entry)
953 {
954  if (entry.getCallMsg) {
955  os << "GET";
956  } else if (entry.putCallMsg) {
957  os << "PUT";
958  }
959 
960  os << " state: " << entry.state
961  << " numSent: " << entry.numSent
962  << " numResponses: " << entry.numResponses
963  << " numFailed: " << entry.numFailed
964  << " numAvailableReplica: " << entry.numAvailableReplica;
965 
966  if (entry.replica.size() > 0) {
967  os << " replicaSize: " << entry.replica.size();
968  }
969 
970  if (entry.hashVector != NULL) {
971  os << " hashVectorSize: " << entry.hashVector->size();
972  }
973 
974  if (entry.hashes.size() > 0) {
975  os << " hashes:";
976  std::map<BinaryValue, NodeVector>::const_iterator it;
977 
978  int i = 0;
979  for (it = entry.hashes.begin(); it != entry.hashes.end(); it++, i++) {
980  os << " hash" << i << ":" << it->second.size();
981  }
982  }
983 
984  return os;
985 }