24 #include <IPAddressResolver.h>
43 PendingRpcs::iterator it;
45 for (it = pendingRpcs.begin(); it != pendingRpcs.end(); it++) {
46 delete(it->second.putCallMsg);
47 delete(it->second.getCallMsg);
52 if (dataStorage != NULL) {
63 (getParentModule()->getSubmodule(
"dhtDataStorage"));
65 numReplica = par(
"numReplica");
66 numGetRequests = par(
"numGetRequests");
67 ratioIdentical = par(
"ratioIdentical");
68 secureMaintenance = par(
"secureMaintenance");
69 invalidDataAttack = par(
"invalidDataAttack");
70 maintenanceAttack = par(
"maintenanceAttack");
72 if ((
int)numReplica > overlay->getMaxNumSiblings()) {
73 opp_error(
"DHT::initialize(): numReplica bigger than what this "
74 "overlay can handle (%d)", overlay->getMaxNumSiblings());
77 maintenanceMessages = 0;
79 numBytesMaintenance = 0;
81 WATCH(maintenanceMessages);
82 WATCH(normalMessages);
83 WATCH(numBytesNormal);
84 WATCH(numBytesMaintenance);
85 WATCH_MAP(pendingRpcs);
93 EV <<
"[DHT::handleTimerEvent()]\n"
94 <<
" received timer ttl, key: "
96 <<
"\n (overlay->getThisNode().getKey() = "
97 << overlay->getThisNode().getKey().toString(16) <<
")"
100 dataStorage->removeData(msg_timer->
getKey(), msg_timer->
getKind(),
121 int rpcId, simtime_t rtt)
125 handlePutResponse(_DHTPutResponse, rpcId);
126 EV <<
"[DHT::handleRpcResponse()]\n"
127 <<
" DHT Put RPC Response received: id=" << rpcId
128 <<
" msg=" << *_DHTPutResponse <<
" rtt=" << rtt
133 handleGetResponse(_DHTGetResponse, rpcId);
134 EV <<
"[DHT::handleRpcResponse()]\n"
135 <<
" DHT Get RPC Response received: id=" << rpcId
136 <<
" msg=" << *_DHTGetResponse <<
" rtt=" << rtt
141 handleLookupResponse(_LookupResponse, rpcId);
142 EV <<
"[DHT::handleRpcResponse()]\n"
143 <<
" Lookup RPC Response received: id=" << rpcId
144 <<
" msg=" << *_LookupResponse <<
" rtt=" << rtt
152 cPolymorphic* context,
int rpcId,
157 EV <<
"[DHT::handleRpcResponse()]\n"
161 PendingRpcs::iterator it = pendingRpcs.find(rpcId);
163 if (it == pendingRpcs.end())
166 it->second.numFailed++;
168 if (it->second.numFailed / (
double)it->second.numSent >= 0.5) {
171 sendRpcResponse(it->second.putCallMsg, capiPutRespMsg);
173 pendingRpcs.erase(rpcId);
179 EV <<
"[DHT::handleRpcResponse()]\n"
183 PendingRpcs::iterator it = pendingRpcs.find(rpcId);
185 if (it == pendingRpcs.end()) {
189 if (it->second.state == GET_VALUE_SENT) {
192 if ((it->second.hashVector != NULL)
193 && (it->second.hashVector->size() > 0)) {
196 getCall->
setKey(_DHTGetCall->getKey());
197 getCall->
setKind(_DHTGetCall->getKind());
198 getCall->
setId(_DHTGetCall->getId());
200 getCall->setBitLength(
GETCALL_L(getCall));
202 numBytesNormal += getCall->getByteLength());
204 sendRouteRpcCall(
TIER1_COMP, it->second.hashVector->back(),
206 it->second.hashVector->pop_back();
211 sendRpcResponse(it->second.getCallMsg,
214 pendingRpcs.erase(rpcId);
220 if (it->second.replica.size() > 0) {
222 getCall->
setKey(_DHTGetCall->getKey());
223 getCall->
setKind(_DHTGetCall->getKind());
224 getCall->
setId(_DHTGetCall->getId());
226 getCall->setBitLength(
GETCALL_L(getCall));
229 numBytesNormal += getCall->getByteLength());
231 sendRouteRpcCall(
TIER1_COMP, it->second.replica.back(),
234 it->second.replica.pop_back();
237 if (it->second.numResponses > 0) {
238 unsigned int maxCount = 0;
240 std::map<BinaryValue, NodeVector>::iterator itHashes;
241 for (itHashes = it->second.hashes.begin();
242 itHashes != it->second.hashes.end(); itHashes++) {
244 if (itHashes->second.size() > maxCount) {
245 maxCount = itHashes->second.size();
246 hashVector = &(itHashes->second);
253 it->second.hashVector = hashVector;
255 if ((
double)maxCount/(double)it->second.numResponses >=
257 it->second.hashVector = hashVector;
262 if ((it->second.hashVector != NULL)
263 && (it->second.hashVector->size() > 0)) {
266 getCall->
setKey(_DHTGetCall->getKey());
267 getCall->
setKind(_DHTGetCall->getKind());
268 getCall->
setId(_DHTGetCall->getId());
270 getCall->setBitLength(
GETCALL_L(getCall));
272 numBytesNormal += getCall->getByteLength());
273 sendRouteRpcCall(
TIER1_COMP, it->second.hashVector->back(),
276 it->second.hashVector->pop_back();
281 sendRpcResponse(it->second.getCallMsg, capiGetRespMsg);
283 pendingRpcs.erase(rpcId);
294 std::string tempString =
"PUT_REQUEST received: "
296 getParentModule()->getParentModule()->bubble(tempString.c_str());
299 bool isSibling = overlay->isSiblingFor(overlay->getThisNode(),
300 dhtMsg->
getKey(), secureMaintenance ? numReplica : 1, &err);
315 scheduleAt(simTime() + dhtMsg->
getTtl(), timerMsg);
317 entry = dataStorage->addData(dhtMsg->
getKey(), dhtMsg->
getKind(),
321 }
else if ((entry->
siblingVote.size() == 0) && isSibling) {
340 SiblingVoteMap::iterator majorityIt;
343 if (it->second.size() > maxCount) {
344 maxCount = it->second.size();
349 entry->
value = majorityIt->first;
352 if (maxCount > numReplica) {
360 RECORD_STATS(normalMessages++; numBytesNormal += responseMsg->getByteLength());
362 sendRpcResponse(dhtMsg, responseMsg);
368 if (!(dataStorage->isModifiable(dhtMsg->
getKey(), dhtMsg->
getKind(),
382 numBytesNormal += responseMsg->getByteLength());
383 sendRpcResponse(dhtMsg, responseMsg);
394 if (dhtMsg->
getValue().size() > 0) {
400 scheduleAt(simTime() + dhtMsg->
getTtl(), timerMsg);
412 RECORD_STATS(normalMessages++; numBytesNormal += responseMsg->getByteLength());
414 sendRpcResponse(dhtMsg, responseMsg);
419 std::string tempString =
"GET_REQUEST received: "
422 getParentModule()->getParentModule()->bubble(tempString.c_str());
425 throw cRuntimeError(
"DHT::handleGetRequest: Unspecified key!");
432 if (overlay->isMalicious() && invalidDataAttack) {
434 dataVect->at(0).setKey(dhtMsg->
getKey());
435 dataVect->at(0).setKind(dhtMsg->
getKind());
436 dataVect->at(0).setId(dhtMsg->
getId());
437 dataVect->at(0).setValue(
"Modified Data");
438 dataVect->at(0).setTtl(3600*24*365);
439 dataVect->at(0).setOwnerNode(overlay->getThisNode());
440 dataVect->at(0).setIs_modifiable(
false);
441 dataVect->at(0).setResponsible(
true);
449 if (dataVect->size() == 0) {
456 for (uint32_t i = 0; i < dataVect->size(); i++) {
457 resultValues += (*dataVect)[i].getValue();
463 sha1.
Update((uint8_t*) (&(*resultValues.begin())),
464 resultValues.size());
466 sha1.
GetHash((
unsigned char*)&hashValue[0]);
472 for (uint32_t i = 0; i < dataVect->size(); i++) {
473 responseMsg->
setResult(i, (*dataVect)[i]);
482 numBytesNormal += responseMsg->getByteLength());
483 sendRpcResponse(dhtMsg, responseMsg);
492 sendInternalRpcCall(
OVERLAY_COMP, lookupCall, NULL, -1, 0,
497 entry.state = LOOKUP_STARTED;
498 pendingRpcs.insert(make_pair(capiPutMsg->
getNonce(), entry));
506 sendInternalRpcCall(
OVERLAY_COMP, lookupCall, NULL, -1, 0,
511 entry.state = LOOKUP_STARTED;
512 pendingRpcs.insert(make_pair(capiGetMsg->
getNonce(), entry));
522 for (uint32_t i = 0; i < dumpVector->size(); i++) {
523 response->
setRecord(i, (*dumpVector)[i]);
528 sendRpcResponse(call, response);
533 PendingRpcs::iterator it = pendingRpcs.find(rpcId);
535 if (it == pendingRpcs.end())
539 it->second.numResponses++;
541 it->second.numFailed++;
546 if (it->second.numResponses / (
double)it->second.numSent > 0.5) {
550 sendRpcResponse(it->second.putCallMsg, capiPutRespMsg);
551 pendingRpcs.erase(rpcId);
558 PendingRpcs::iterator it = pendingRpcs.
find(rpcId);
560 if (it == pendingRpcs.end())
563 if (it->second.state == GET_VALUE_SENT) {
573 sendRpcResponse(it->second.getCallMsg, capiGetRespMsg);
574 pendingRpcs.erase(rpcId);
580 std::map<BinaryValue, NodeVector>::iterator itHashes =
583 if (itHashes == it->second.hashes.end()) {
587 it->second.hashes.insert(make_pair(dhtMsg->
getHashValue(),
590 itHashes->second.push_back(dhtMsg->
getSrcNode());
593 it->second.numResponses++;
595 if (it->second.state == GET_VALUE_SENT) {
601 unsigned int maxCount = 0;
604 for (itHashes = it->second.hashes.begin();
605 itHashes != it->second.hashes.end(); itHashes++) {
607 if (itHashes->second.size() > maxCount) {
608 maxCount = itHashes->second.size();
609 hashVector = &(itHashes->second);
613 if ((
double) maxCount / (double) it->second.numAvailableReplica
615 it->second.hashVector = hashVector;
616 }
else if (it->second.numResponses >= numGetRequests) {
618 if (it->second.replica.size() > 0) {
620 getCall->
setKey(it->second.getCallMsg->getKey());
621 getCall->
setKind(it->second.getCallMsg->getKind());
622 getCall->
setId(it->second.getCallMsg->getId());
624 getCall->setBitLength(
GETCALL_L(getCall));
626 numBytesNormal += getCall->getByteLength());
628 it->second.replica.back(), getCall,
630 it->second.replica.pop_back();
631 it->second.state = GET_HASH_SENT;
632 }
else if (hashVector == NULL) {
642 sendRpcResponse(it->second.getCallMsg, capiGetRespMsg);
644 cout <<
"DHT: GET failed: hash (no one else)" << endl;
645 cout <<
"numResponses: " << it->second.numResponses
646 <<
" numAvailableReplica: " << it->second.numAvailableReplica << endl;
648 for (itHashes = it->second.hashes.begin();
649 itHashes != it->second.hashes.end(); itHashes++) {
650 cout <<
" - " << itHashes->first <<
" ("
651 << itHashes->second.size() <<
")" << endl;
655 pendingRpcs.erase(rpcId);
659 it->second.hashVector = hashVector;
664 if ((it->second.state != GET_VALUE_SENT) &&
665 (it->second.hashVector != NULL)) {
667 if (it->second.hashVector->size() > 0) {
669 getCall->
setKey(it->second.getCallMsg->getKey());
670 getCall->
setKind(it->second.getCallMsg->getKind());
671 getCall->
setId(it->second.getCallMsg->getId());
673 getCall->setBitLength(
GETCALL_L(getCall));
675 numBytesNormal += getCall->getByteLength());
676 sendRouteRpcCall(
TIER1_COMP, it->second.hashVector->back(),
678 it->second.hashVector->pop_back();
679 it->second.state = GET_VALUE_SENT;
683 sendRpcResponse(it->second.getCallMsg, capiGetRespMsg);
685 pendingRpcs.erase(rpcId);
695 std::map<OverlayKey, DhtDataEntry>::iterator it;
697 EV <<
"[DHT::update() @ " << overlay->getThisNode().getIp()
698 <<
" (" << overlay->getThisNode().getKey().toString(16) <<
")]\n"
699 <<
" Update called()"
702 if (secureMaintenance) {
703 for (it = dataStorage->begin(); it != dataStorage->end(); it++) {
704 if (it->second.responsible) {
705 NodeVector* siblings = overlay->local_lookup(it->first,
708 if (siblings->size() == 0) {
714 EV <<
"[DHT::update() @ " << overlay->getThisNode().getIp()
715 <<
" (" << overlay->getThisNode().getKey().toString(16) <<
")]\n"
716 <<
" Potential new sibling for record " << it->first
719 if (overlay->distance(node.
getKey(), it->first) <=
720 overlay->distance(siblings->back().getKey(), it->first)) {
722 sendMaintenancePutCall(node, it->first, it->second);
725 if (overlay->distance(overlay->getThisNode().getKey(), it->first) >
726 overlay->distance(siblings->back().getKey(), it->first)) {
728 it->second.responsible =
false;
731 if (overlay->distance(node.
getKey(), it->first) <
732 overlay->distance(siblings->back().getKey(), it->first)) {
734 sendMaintenancePutCall(siblings->back(), it->first,
746 for (it = dataStorage->begin(); it != dataStorage->end(); it++) {
750 if (entry.
responsible && (overlay->isSiblingFor(node, key,
755 EV <<
"[DHT::update()]\n"
756 <<
" Unable to know if key: " << key
757 <<
" is in range of node: " << node
769 sendMaintenancePutCall(node, key, entry);
773 entry.
responsible = overlay->isSiblingFor(overlay->getThisNode(),
788 if (overlay->isMalicious() && maintenanceAttack) {
800 numBytesMaintenance += dhtMsg->getByteLength());
807 PendingRpcs::iterator it = pendingRpcs.find(rpcId);
809 if (it == pendingRpcs.end()) {
813 if (it->second.putCallMsg != NULL) {
816 cout <<
"DHT::handleLookupResponse(): PUT "
817 << lookupMsg->
getKey() <<
" ("
818 << overlay->getThisNode().getKey() <<
")" << endl;
821 cout << i <<
": " << lookupMsg->
getSiblings(i) << endl;
828 EV <<
"[DHT::handleLookupResponse()]\n"
829 <<
" Unable to get replica list : invalid lookup"
834 sendRpcResponse(it->second.putCallMsg, capiPutRespMsg);
835 pendingRpcs.erase(rpcId);
839 if ((it->second.putCallMsg->getId() == 0) &&
840 (it->second.putCallMsg->getValue().size() > 0)) {
843 it->second.putCallMsg->setId(intuniform(1, 2147483647));
848 dhtMsg->
setKey(it->second.putCallMsg->getKey());
849 dhtMsg->
setKind(it->second.putCallMsg->getKind());
850 dhtMsg->
setId(it->second.putCallMsg->getId());
851 dhtMsg->
setValue(it->second.putCallMsg->getValue());
852 dhtMsg->
setTtl(it->second.putCallMsg->getTtl());
857 numBytesNormal += dhtMsg->getByteLength());
863 it->second.state = PUT_SENT;
864 it->second.numResponses = 0;
865 it->second.numFailed = 0;
868 else if (it->second.getCallMsg != NULL) {
871 cout <<
"DHT::handleLookupResponse(): GET "
872 << lookupMsg->
getKey() <<
" ("
873 << overlay->getThisNode().getKey() <<
")" << endl;
876 cout << i <<
": " << lookupMsg->
getSiblings(i) << endl;
883 EV <<
"[DHT::handleLookupResponse()]\n"
884 <<
" Unable to get replica list : invalid lookup"
894 sendRpcResponse(it->second.getCallMsg, capiGetRespMsg);
895 pendingRpcs.erase(rpcId);
899 it->second.numSent = 0;
902 if (i < (
unsigned int)numGetRequests) {
904 dhtMsg->
setKey(it->second.getCallMsg->getKey());
905 dhtMsg->
setKind(it->second.getCallMsg->getKind());
906 dhtMsg->
setId(it->second.getCallMsg->getId());
910 numBytesNormal += dhtMsg->getByteLength());
913 it->second.numSent++;
916 it->second.replica.push_back(lookupMsg->
getSiblings(i));
921 it->second.numResponses = 0;
922 it->second.hashVector = NULL;
923 it->second.state = GET_HASH_SENT;
929 simtime_t time = globalStatistics->calcMeasuredLifetime(creationTime);
932 globalStatistics->addStdDev(
"DHT: Sent Maintenance Messages/s",
933 maintenanceMessages / time);
934 globalStatistics->addStdDev(
"DHT: Sent Normal Messages/s",
935 normalMessages / time);
936 globalStatistics->addStdDev(
"DHT: Sent Maintenance Bytes/s",
937 numBytesMaintenance / time);
938 globalStatistics->addStdDev(
"DHT: Sent Normal Bytes/s",
939 numBytesNormal / time);
960 os <<
" state: " << entry.
state
961 <<
" numSent: " << entry.
numSent
966 if (entry.
replica.size() > 0) {
967 os <<
" replicaSize: " << entry.
replica.size();
971 os <<
" hashVectorSize: " << entry.
hashVector->size();
974 if (entry.
hashes.size() > 0) {
976 std::map<BinaryValue, NodeVector>::const_iterator it;
979 for (it = entry.
hashes.begin(); it != entry.
hashes.end(); it++, i++) {
980 os <<
" hash" << i <<
":" << it->second.size();