24 #include <IPAddressResolver.h>
45 std::map<unsigned int, BaseCallMessage*>::iterator it;
47 for (it = rpcIdMap.begin(); it != rpcIdMap.end(); it++) {
52 std::map<int, GetMapEntry>::iterator it2;
53 for (it2 = getMap.begin(); it2 != getMap.end(); it2++) {
55 delete it2->second.callMsg;
56 it2->second.callMsg = NULL;
59 std::map<int, PutMapEntry>::iterator it3;
61 for (it3 = putMap.begin(); it3 != putMap.end(); it3++) {
65 delete it3->second.callMsg;
66 it3->second.callMsg = NULL;
73 if (dataStorage != NULL) {
84 (getParentModule()->getSubmodule(
"dhtDataStorage"));
88 ->getParentModule()->getSubmodule(
"neighborCache");
90 numReplica = par(
"numReplica");
91 numReplicaTeams = par(
"numReplicaTeams");
93 if (numReplica > numReplicaTeams * overlay->getMaxNumSiblings()) {
94 throw cRuntimeError(
"DHT::initialize(): numReplica bigger than what this "
95 "overlay can handle (%d)", numReplicaTeams*overlay->getMaxNumSiblings());
98 if (numReplica < numReplicaTeams) {
99 throw cRuntimeError(
"DHT::initialize(): numReplica (%d) smaller than numReplicaTeam (%d)",
100 numReplica, numReplicaTeams);
103 maintenanceMessages = 0;
105 numBytesMaintenance = 0;
107 WATCH(maintenanceMessages);
108 WATCH(normalMessages);
109 WATCH(numBytesNormal);
110 WATCH(numBytesMaintenance);
119 EV <<
"[DHT::handleTimerEvent()]\n"
120 <<
" received timer ttl, key: "
122 <<
"\n (overlay->getThisNode().key = "
123 << overlay->getThisNode().getKey().toString(16) <<
")"
126 dataStorage->removeData(msg_timer->
getKey(), msg_timer->
getKind(),
164 int rpcId, simtime_t rtt)
168 handlePutResponse(_DHTPutResponse, rpcId);
169 EV <<
"[DHT::handleRpcResponse()]\n"
170 <<
" DHT Put RPC Response received: id=" << rpcId
171 <<
" msg=" << *_DHTPutResponse <<
" rtt=" << rtt
176 handleGetResponse(_CBRDHTGetResponse, rpcId);
177 EV <<
"[DHT::handleRpcResponse()]\n"
178 <<
" DHT Get RPC Response received: id=" << rpcId
179 <<
" msg=" << *_CBRDHTGetResponse <<
" rtt=" << rtt
184 handleLookupResponse(_LookupResponse);
185 EV <<
"[DHT::handleRpcResponse()]\n"
186 <<
" Replica Set RPC Response received: id=" << rpcId
187 <<
" msg=" << *_LookupResponse <<
" rtt=" << rtt
195 cPolymorphic* context,
int rpcId,
200 EV <<
"[DHT::handleRpcResponse()]\n"
204 std::map<int, PutMapEntry>::iterator it2 =
207 if (it2 == putMap.end())
210 it2->second.numFailed++;
212 if (it2->second.numFailed / (
double)it2->second.numSent >= 0.5) {
215 sendRpcResponse(it2->second.callMsg, capiPutRespMsg);
216 it2->second.callMsg = NULL;
222 EV <<
"[DHT::handleRpcResponse()]\n"
226 std::map<int, GetMapEntry>::iterator it2 =
229 if (it2 == getMap.end())
232 if (it2->second.replica.size() > 0) {
234 NodeHandle fallbackReplica = it2->second.replica.back();
237 dhtRecall->
setKey(_CBRDHTGetCall->getKey());
239 dhtRecall->setBitLength(
GETCALL_L(dhtRecall));
241 numBytesNormal += dhtRecall->getByteLength());
242 sendRouteRpcCall(
TIER1_COMP, fallbackReplica, dhtRecall,
244 it2->second.callMsg->getNonce());
245 it2->second.numSent++;
246 it2->second.replica.pop_back();
248 }
else if (it2->second.teamNumber < (numReplicaTeams - 1)) {
250 it2->second.teamNumber++;
251 handleGetCAPIRequest(it2->second.callMsg, it2->second.teamNumber);
259 result.
setKey(_CBRDHTGetCall->getKey());
264 sendRpcResponse(it2->second.callMsg, capiGetRespMsg);
274 error(
"DHT::handleUpperMessage(): Received message with unknown type!");
281 std::string tempString =
"PUT_REQUEST received: "
283 getParentModule()->getParentModule()->bubble(tempString.c_str());
285 if (!(dataStorage->isModifiable(dhtMsg->
getKey(), dhtMsg->
getKind(),
299 numBytesNormal += responseMsg->getByteLength());
300 sendRpcResponse(dhtMsg, responseMsg);
311 if (dhtMsg->
getValue().size() > 0) {
315 scheduleAt(simTime() + dhtMsg->
getTtl(), timerMsg);
321 overlay->isSiblingFor(overlay->getThisNode(),
331 RECORD_STATS(normalMessages++; numBytesNormal += responseMsg->getByteLength());
333 sendRpcResponse(dhtMsg, responseMsg);
338 std::string tempString =
"GET_REQUEST received: "
341 getParentModule()->getParentModule()->bubble(tempString.c_str());
346 storedValue = dataStorage->getDataEntry(dhtMsg->
getKey(), 1, 1)->value;
376 numBytesNormal += responseMsg->getByteLength());
377 sendRpcResponse(dhtMsg, responseMsg);
383 for (
int i = 1; i < numReplicaTeams; i++) {
396 *controlInfoCopy = controlInfo;
397 teamCopyPutMsg->setControlInfo(controlInfoCopy);
401 for (
int j = 0; j < i; j++) {
404 teamCopyPutMsg->
setKey(destKey);
410 int nonce = sendInternalRpcCall(
OVERLAY_COMP, replicaMsg);
411 rpcIdMap.insert(make_pair(nonce, teamCopyPutMsg));
418 int nonce = sendInternalRpcCall(
OVERLAY_COMP, replicaMsg);
419 rpcIdMap.insert(make_pair(nonce, capiPutMsg));
424 if (teamnum >= numReplicaTeams)
428 std::vector<OverlayKey> possibleKeys;
431 possibleKeys.push_back(originalKey);
433 for (
int i = 1; i < numReplicaTeams; i++) {
436 for (
int j = 0; j < i; j++) {
440 possibleKeys.push_back(keyHash);
444 std::vector<OverlayKey> orderedKeys;
445 OverlayKey compareKey = overlay->getThisNode().getKey();
447 while (possibleKeys.size() > 0) {
452 for (uint i = 0; i < possibleKeys.size(); i++) {
454 if (coordBasedRouting
455 ->getEuclidianDistanceByKeyAndCoords(possibleKeys[i],
456 ((
const Nps&)neighborCache->getNcsAccess()).getOwnCoordinates(),
457 overlay->getBitsPerDigit()) <
459 ->getEuclidianDistanceByKeyAndCoords(bestKey,
460 ((
const Nps&)neighborCache->getNcsAccess()).getOwnCoordinates(),
461 overlay->getBitsPerDigit())) {
462 bestKey = possibleKeys[i];
467 orderedKeys.push_back(bestKey);
468 possibleKeys.erase(possibleKeys.begin()+bestpos);
481 #define DIRECT_ROUTE_GET
482 #ifndef DIRECT_ROUTE_GET
485 replicaMsg->
setKey(searchKey);
487 int nonce = sendInternalRpcCall(
OVERLAY_COMP, replicaMsg);
488 rpcIdMap.insert(make_pair(nonce, capiGetMsg));
489 lastGetCall = SIMTIME_DBL(simTime());
497 std::map<int, GetMapEntry>::iterator it2 =
498 getMap.find(capiGetMsg->
getNonce());
500 if (it2 != getMap.end()) {
501 mapEntry = it2->second;
510 for (
unsigned int i = 0; i < 1; i++) {
516 dhtMsg->
setKey(searchKey);
523 numBytesNormal += dhtMsg->getByteLength());
525 sendRouteRpcCall(
TIER1_COMP, searchKey, dhtMsg,
546 if (it2 != getMap.end())
548 getMap.insert(make_pair(capiGetMsg->
getNonce(), mapEntry));
559 for (uint i = 0; i < dumpVector->size(); i++) {
560 response->
setRecord(i, (*dumpVector)[i]);
565 sendRpcResponse(call, response);
570 std::map<int, PutMapEntry>::iterator it2 =
573 if (it2 == putMap.end())
577 it2->second.numResponses++;
579 it2->second.numFailed++;
582 if (it2->second.numResponses / (
double)it2->second.numSent > 0.5) {
585 sendRpcResponse(it2->second.callMsg, capiPutRespMsg);
586 it2->second.callMsg = NULL;
593 std::map<int, GetMapEntry>::iterator it2 =
597 if (it2 == getMap.end()) {
598 std::cout <<
"- 1 -" << std::endl;
620 sendRpcResponse(it2->second.callMsg, capiGetRespMsg);
623 }
else if (it2->second.replica.size() > 0) {
625 NodeHandle fallbackReplica = it2->second.replica.back();
627 std::cout <<
"[" << overlay->getThisNode().
getIp() <<
"] " <<
"Empty value received. Asking replica now ("<< it2->second.replica.size()<<
" left)!" << std::endl;
633 dhtRecall->setBitLength(
GETCALL_L(dhtRecall));
635 numBytesNormal += dhtRecall->getByteLength());
636 sendRouteRpcCall(
TIER1_COMP, fallbackReplica, dhtRecall,
638 it2->second.callMsg->getNonce());
639 it2->second.numSent++;
640 it2->second.replica.pop_back();
642 }
else if (it2->second.teamNumber < (numReplicaTeams - 1)) {
645 std::cout <<
"it2->second.teamNumber (" << it2->second.teamNumber <<
") < (numReplicaTeams - 1) (" << (numReplicaTeams - 1) <<
")" << std::endl;
646 std::cout <<
"[" << overlay->getThisNode().getIp() <<
"] " <<
"No more fallback replica in this team "<< it2->second.teamNumber<<
". Trying next one ("<< it2->second.teamNumber+1 <<
")..." << std::endl;
648 it2->second.teamNumber++;
649 handleGetCAPIRequest(it2->second.callMsg, it2->second.teamNumber);
654 std::cout <<
"[" << overlay->getThisNode().getIp() <<
"] " <<
"No more fallback replica. Lookup failed. :(" << std::endl;
665 sendRpcResponse(it2->second.callMsg, capiGetRespMsg);
679 DhtDataMap::iterator it = dataStorage->begin();
680 for (
unsigned int i = 0; i < dataStorage->getSize(); i++) {
684 if (entry.
responsible && (overlay->isSiblingFor(node, key,
702 numBytesMaintenance += dhtMsg->getByteLength());
707 EV <<
"[DHT::update()]\n"
708 <<
" Unable to know if key: " << key
709 <<
" is in range of node: " << node
725 - simulation.simTime()));
730 rpcIdMap.insert(make_pair(nonce, dhtMsg));
734 entry.
responsible = overlay->isSiblingFor(overlay->getThisNode(),
742 std::map<unsigned int, BaseCallMessage*>::iterator it =
743 rpcIdMap.find(lookupMsg->
getNonce());
745 if (it == rpcIdMap.end() || it->second == NULL)
748 if (dynamic_cast<DHTputCAPICall*> (it->second)) {
751 cout <<
"DHT::handleLookupResponse(): PUT "
752 << lookupMsg->
getKey() <<
" ("
753 << overlay->getThisNode().getKey() <<
")" << endl;
756 cout << i <<
": " << lookupMsg->
getSiblings(i) << endl;
761 rpcIdMap.erase(lookupMsg->
getNonce());
767 EV <<
"[DHT::handleLookupResponse()]\n"
768 <<
" Unable to get replica list : invalid lookup"
772 sendRpcResponse(capiPutMsg, capiPutRespMsg);
787 numBytesNormal += dhtMsg->getByteLength());
799 putMap.insert(make_pair(capiPutMsg->
getNonce(), mapEntry));
801 else if (dynamic_cast<DHTgetCAPICall*>(it->second)) {
804 cout <<
"DHT::handleLookupResponse(): GET "
805 << lookupMsg->
getKey() <<
" ("
806 << overlay->getThisNode().getKey() <<
")" << endl;
809 cout << i <<
": " << lookupMsg->
getSiblings(i) << endl;
814 rpcIdMap.erase(lookupMsg->
getNonce());
820 EV <<
"[DHT::handleLookupResponse()]\n"
821 <<
" Unable to get replica list : invalid lookup"
832 sendRpcResponse(capiGetMsg, capiGetRespMsg);
842 std::map<int, GetMapEntry>::iterator it2 =
843 getMap.find(capiGetMsg->
getNonce());
845 if (it2 != getMap.end()) {
846 mapEntry = it2->second;
868 numBytesNormal += dhtMsg->getByteLength());
885 if (it2 != getMap.end())
887 getMap.insert(make_pair(capiGetMsg->
getNonce(), mapEntry));
888 }
else if (dynamic_cast<DHTPutCall*>(it->second)) {
890 rpcIdMap.erase(lookupMsg->
getNonce());
895 EV <<
"[DHT::handleLookupResponse()]\n"
896 <<
" Unable to get replica list : invalid lookup"
904 numBytesMaintenance += putMsg->getByteLength());
916 simtime_t time = globalStatistics->calcMeasuredLifetime(creationTime);
920 globalStatistics->addStdDev(
"DHT: Sent Maintenance Messages/s",
921 maintenanceMessages / time);
922 globalStatistics->addStdDev(
"DHT: Sent Normal Messages/s",
923 normalMessages / time);
924 globalStatistics->addStdDev(
"DHT: Sent Maintenance Bytes/s",
925 numBytesMaintenance / time);
926 globalStatistics->addStdDev(
"DHT: Sent Normal Bytes/s",
927 numBytesNormal / time);