OverSim
DHT Class Reference

A Distributed Hash Table (DHT) for KBR protocols. More...

#include <DHT.h>

Inheritance diagram for DHT:
BaseApp BaseRpc BaseTcpSupport RpcListener

Classes

class  PendingRpcsEntry

Public Member Functions

 DHT ()
virtual ~DHT ()
- Public Member Functions inherited from BaseApp
 BaseApp ()
virtual ~BaseApp ()
 virtual destructor
- Public Member Functions inherited from BaseRpc
 BaseRpc ()
const NodeHandlegetThisNode ()
 Returns the NodeHandle of this node.
simtime_t getUdpTimeout ()
- Public Member Functions inherited from RpcListener
virtual ~RpcListener ()
 destructor
- Public Member Functions inherited from BaseTcpSupport
virtual void socketDataArrived (int connId, void *yourPtr, cPacket *msg, bool urgent)
virtual void socketEstablished (int connId, void *yourPtr)
virtual void socketPeerClosed (int connId, void *yourPtr)
virtual void socketFailure (int connId, void *yourPtr, int code)
virtual void socketStatusArrived (int connId, void *yourPtr, TCPStatusInfo *status)

Private Types

enum  PendingRpcsStates {
  INIT = 0, LOOKUP_STARTED = 1, GET_HASH_SENT = 2, GET_VALUE_SENT = 3,
  PUT_SENT = 4
}
typedef std::map< uint32_t,
PendingRpcsEntry
PendingRpcs

Private Member Functions

void initializeApp (int stage)
void finishApp ()
void handleTimerEvent (cMessage *msg)
bool handleRpcCall (BaseCallMessage *msg)
void handleRpcResponse (BaseResponseMessage *msg, cPolymorphic *context, int rpcId, simtime_t rtt)
void handleRpcTimeout (BaseCallMessage *msg, const TransportAddress &dest, cPolymorphic *context, int rpcId, const OverlayKey &destKey)
void handlePutRequest (DHTPutCall *dhtMsg)
void handleGetRequest (DHTGetCall *dhtMsg)
void handlePutResponse (DHTPutResponse *dhtMsg, int rpcId)
void handleGetResponse (DHTGetResponse *dhtMsg, int rpcId)
void handlePutCAPIRequest (DHTputCAPICall *capiPutMsg)
void handleGetCAPIRequest (DHTgetCAPICall *capiPutMsg)
void handleDumpDhtRequest (DHTdumpCall *call)
void update (const NodeHandle &node, bool joined)
void handleLookupResponse (LookupResponse *lookupMsg, int rpcId)
void sendMaintenancePutCall (const TransportAddress &dest, const OverlayKey &key, const DhtDataEntry &entry)
int resultValuesBitLength (DHTGetResponse *msg)

Private Attributes

uint numReplica
int numGetRequests
double ratioIdentical
double maintenanceMessages
double normalMessages
double numBytesMaintenance
double numBytesNormal
bool secureMaintenance
 use a secure maintenance algorithm based on majority decisions
bool invalidDataAttack
 if node is malicious, it tries a invalidData attack
bool maintenanceAttack
 if node is malicious, it tries a maintenanceData attack
PendingRpcs pendingRpcs
 a map of all pending RPC operations
DHTDataStoragedataStorage
 pointer to the dht data storage

Friends

std::ostream & operator<< (std::ostream &Stream, const PendingRpcsEntry &entry)

Additional Inherited Members

- Public Types inherited from BaseTcpSupport
enum  EvCode {
  NO_EST_CONNECTION, PEER_CLOSED, PEER_TIMEDOUT, PEER_REFUSED,
  CONNECTION_RESET, CONNECTION_SUCC_ClOSED
}
- Protected Member Functions inherited from BaseApp
int numInitStages () const
 method to set InitStage
void initialize (int stage)
 initializes base class-attributes
virtual void initializeApp (int stage)
 initializes derived class-attributes
void handleMessage (cMessage *msg)
 checks for message type and calls corresponding method
virtual void receiveChangeNotification (int category, const cPolymorphic *details)
 callback-method for events at the NotificationBoard
virtual void handleTransportAddressChangedNotification ()
 This method gets call if the node has a new TransportAddress (IP address) because he changed his access network.
virtual void handleNodeLeaveNotification ()
 This method gets call **.gracefulLeaveDelay seconds before it is killed.
virtual void handleNodeGracefulLeaveNotification ()
 This method gets call **.gracefulLeaveDelay seconds before it is killed if this node is among the gracefulLeaveProbability nodes.
void finish ()
 collects statistical data
virtual void finishApp ()
 collects statistical data of derived app
void callRoute (const OverlayKey &key, cPacket *msg, const TransportAddress &hint=TransportAddress::UNSPECIFIED_NODE, RoutingType routingType=DEFAULT_ROUTING)
 Common API function: calls route-method in overlay.
void callRoute (const OverlayKey &key, cPacket *msg, const std::vector< TransportAddress > &sourceRoute, RoutingType routingType=DEFAULT_ROUTING)
virtual void deliver (OverlayKey &key, cMessage *msg)
 Common API function: handles delivered messages from overlay.
virtual void forward (OverlayKey *key, cPacket **msg, NodeHandle *nextHopNode)
 Common API function: handles messages from overlay to be forwarded.
virtual void update (const NodeHandle &node, bool joined)
 Common API function: informs application about neighbors and own nodeID.
NodeVectorcallLocalLookup (const OverlayKey &key, int num, bool safe)
 Common API function: produces a list of nodes that can be used as next hops towards key.
NodeVectorcallNeighborSet (int num)
 Common API function: produces a list of neighbor nodes.
bool isSiblingFor (const NodeHandle &node, const OverlayKey &key, int numSiblings, bool *err)
 Query if a node is among the siblings for a given key.
virtual void handleLowerMessage (cMessage *msg)
 processes self-messages
virtual void handleUpperMessage (cMessage *msg)
 handleUpperMessage gets called of handleMessage(cMessage* msg) if msg arrivedOn from_upperTier (currently msg gets deleted in this function)
virtual void handleUDPMessage (cMessage *msg)
 method to handle messages that come directly from the UDP gate
virtual void handleReadyMessage (CompReadyMessage *msg)
 method to handle ready messages from the overlay
virtual void bindToPort (int port)
 Tells UDP we want to get all packets arriving on the given port.
virtual void sendMessageToUDP (const TransportAddress &destAddr, cPacket *msg, simtime_t delay=SIMTIME_ZERO)
 Sends a packet over UDP.
virtual void handleTraceMessage (cMessage *msg)
 handleTraceMessage gets called of handleMessage(cMessage* msg) if a message arrives at trace_in.
void sendMessageToLowerTier (cPacket *msg)
 sends non-commonAPI message to the lower tier
bool internalHandleRpcCall (BaseCallMessage *msg)
 Handles internal rpc requests.
void internalHandleRpcResponse (BaseResponseMessage *msg, cPolymorphic *context, int rpcId, simtime_t rtt)
 Handles rpc responses internal in base classes

void internalSendRouteRpc (BaseRpcMessage *message, const OverlayKey &destKey, const std::vector< TransportAddress > &sourceRoute, RoutingType routingType)
virtual CompType getThisCompType ()
 Return the component type of this module.
void sendReadyMessage (bool ready=true, const OverlayKey &nodeId=OverlayKey::UNSPECIFIED_KEY)
- Protected Attributes inherited from BaseApp
UnderlayConfiguratorunderlayConfigurator
 pointer to UnderlayConfigurator in this node
GlobalNodeListglobalNodeList
 pointer to GlobalNodeList in this node
GlobalStatisticsglobalStatistics
 pointer to GlobalStatistics module in this node
NotificationBoard * notificationBoard
 pointer to NotificationBoard in this node
bool debugOutput
 debug output yes/no?
int numOverlaySent
 number of sent packets to overlay
int bytesOverlaySent
 number of sent bytes to overlay
int numOverlayReceived
 number of received packets from overlay
int bytesOverlayReceived
 number of received bytes from overlay
int numUdpSent
 number of sent packets to UDP
int bytesUdpSent
 number of sent bytes to UDP
int numUdpReceived
 number of received packets from UDP
int bytesUdpReceived
 number of received bytes from UDP
simtime_t creationTime
 simTime when the App has been created

Detailed Description

A Distributed Hash Table (DHT) for KBR protocols.

A Distributed Hash Table (DHT) for KBR protocols

Definition at line 44 of file DHT.h.

Member Typedef Documentation

typedef std::map<uint32_t, PendingRpcsEntry> DHT::PendingRpcs
private

Definition at line 125 of file DHT.h.

Member Enumeration Documentation

enum DHT::PendingRpcsStates
private
Enumerator:
INIT 
LOOKUP_STARTED 
GET_HASH_SENT 
GET_VALUE_SENT 
PUT_SENT 

Definition at line 51 of file DHT.h.

Constructor & Destructor Documentation

DHT::DHT ( )

Definition at line 36 of file DHT.cc.

{
dataStorage = NULL;
}
DHT::~DHT ( )
virtual

Definition at line 41 of file DHT.cc.

{
PendingRpcs::iterator it;
for (it = pendingRpcs.begin(); it != pendingRpcs.end(); it++) {
delete(it->second.putCallMsg);
delete(it->second.getCallMsg);
}
pendingRpcs.clear();
if (dataStorage != NULL) {
}
}

Member Function Documentation

void DHT::finishApp ( )
private

Definition at line 927 of file DHT.cc.

{
globalStatistics->addStdDev("DHT: Sent Maintenance Messages/s",
globalStatistics->addStdDev("DHT: Sent Normal Messages/s",
normalMessages / time);
globalStatistics->addStdDev("DHT: Sent Maintenance Bytes/s",
globalStatistics->addStdDev("DHT: Sent Normal Bytes/s",
numBytesNormal / time);
}
}
void DHT::handleDumpDhtRequest ( DHTdumpCall call)
private

Definition at line 515 of file DHT.cc.

{
DHTdumpResponse* response = new DHTdumpResponse();
DhtDumpVector* dumpVector = dataStorage->dumpDht();
response->setRecordArraySize(dumpVector->size());
for (uint32_t i = 0; i < dumpVector->size(); i++) {
response->setRecord(i, (*dumpVector)[i]);
}
delete dumpVector;
sendRpcResponse(call, response);
}
void DHT::handleGetCAPIRequest ( DHTgetCAPICall capiPutMsg)
private

Definition at line 501 of file DHT.cc.

{
LookupCall* lookupCall = new LookupCall();
lookupCall->setKey(capiGetMsg->getKey());
lookupCall->setNumSiblings(numReplica);
sendInternalRpcCall(OVERLAY_COMP, lookupCall, NULL, -1, 0,
capiGetMsg->getNonce());
PendingRpcsEntry entry;
entry.getCallMsg = capiGetMsg;
entry.state = LOOKUP_STARTED;
pendingRpcs.insert(make_pair(capiGetMsg->getNonce(), entry));
}
void DHT::handleGetRequest ( DHTGetCall dhtMsg)
private

Definition at line 417 of file DHT.cc.

{
std::string tempString = "GET_REQUEST received: "
+ std::string(dhtMsg->getKey().toString(16));
getParentModule()->getParentModule()->bubble(tempString.c_str());
if (dhtMsg->getKey().isUnspecified()) {
throw cRuntimeError("DHT::handleGetRequest: Unspecified key!");
}
DhtDumpVector* dataVect = dataStorage->dumpDht(dhtMsg->getKey(),
dhtMsg->getKind(),
dhtMsg->getId());
dataVect->resize(1);
dataVect->at(0).setKey(dhtMsg->getKey());
dataVect->at(0).setKind(dhtMsg->getKind());
dataVect->at(0).setId(dhtMsg->getId());
dataVect->at(0).setValue("Modified Data");
dataVect->at(0).setTtl(3600*24*365);
dataVect->at(0).setOwnerNode(overlay->getThisNode());
dataVect->at(0).setIs_modifiable(false);
dataVect->at(0).setResponsible(true);
}
// send back
DHTGetResponse* responseMsg = new DHTGetResponse();
responseMsg->setKey(dhtMsg->getKey());
responseMsg->setIsHash(dhtMsg->getIsHash());
if (dataVect->size() == 0) {
responseMsg->setResultArraySize(0);
} else {
if (dhtMsg->getIsHash()) {
// TODO: verify this
BinaryValue resultValues;
for (uint32_t i = 0; i < dataVect->size(); i++) {
resultValues += (*dataVect)[i].getValue();
}
CSHA1 sha1;
BinaryValue hashValue(20);
sha1.Reset();
sha1.Update((uint8_t*) (&(*resultValues.begin())),
resultValues.size());
sha1.Final();
sha1.GetHash((unsigned char*)&hashValue[0]);
responseMsg->setHashValue(hashValue);
} else {
responseMsg->setResultArraySize(dataVect->size());
for (uint32_t i = 0; i < dataVect->size(); i++) {
responseMsg->setResult(i, (*dataVect)[i]);
}
}
}
delete dataVect;
responseMsg->setBitLength(GETRESPONSE_L(responseMsg));
numBytesNormal += responseMsg->getByteLength());
sendRpcResponse(dhtMsg, responseMsg);
}
void DHT::handleGetResponse ( DHTGetResponse dhtMsg,
int  rpcId 
)
private

Definition at line 555 of file DHT.cc.

{
NodeVector* hashVector = NULL;
PendingRpcs::iterator it = pendingRpcs.find(rpcId);
if (it == pendingRpcs.end()) // unknown request
return;
if (it->second.state == GET_VALUE_SENT) {
// we have sent a 'real' get request
if (!dhtMsg->getIsHash()) {
// TODO verify hash
DHTgetCAPIResponse* capiGetRespMsg = new DHTgetCAPIResponse();
capiGetRespMsg->setResultArraySize(dhtMsg->getResultArraySize());
for (uint i = 0; i < dhtMsg->getResultArraySize(); i++) {
capiGetRespMsg->setResult(i, dhtMsg->getResult(i));
}
capiGetRespMsg->setIsSuccess(true);
sendRpcResponse(it->second.getCallMsg, capiGetRespMsg);
pendingRpcs.erase(rpcId);
return;
}
}
if (dhtMsg->getIsHash()) {
std::map<BinaryValue, NodeVector>::iterator itHashes =
it->second.hashes.find(dhtMsg->getHashValue());
if (itHashes == it->second.hashes.end()) {
// new hash
NodeVector vect;
vect.push_back(dhtMsg->getSrcNode());
it->second.hashes.insert(make_pair(dhtMsg->getHashValue(),
vect));
} else {
itHashes->second.push_back(dhtMsg->getSrcNode());
}
it->second.numResponses++;
if (it->second.state == GET_VALUE_SENT) {
// we have already sent a real get request
return;
}
// count the maximum number of equal hash values received so far
unsigned int maxCount = 0;
for (itHashes = it->second.hashes.begin();
itHashes != it->second.hashes.end(); itHashes++) {
if (itHashes->second.size() > maxCount) {
maxCount = itHashes->second.size();
hashVector = &(itHashes->second);
}
}
if ((double) maxCount / (double) it->second.numAvailableReplica
it->second.hashVector = hashVector;
} else if (it->second.numResponses >= numGetRequests) {
// we'll try to ask some other nodes
if (it->second.replica.size() > 0) {
DHTGetCall* getCall = new DHTGetCall();
getCall->setKey(it->second.getCallMsg->getKey());
getCall->setKind(it->second.getCallMsg->getKind());
getCall->setId(it->second.getCallMsg->getId());
getCall->setIsHash(true);
getCall->setBitLength(GETCALL_L(getCall));
numBytesNormal += getCall->getByteLength());
it->second.replica.back(), getCall,
NULL, DEFAULT_ROUTING, -1, 0, rpcId);
it->second.replica.pop_back();
it->second.state = GET_HASH_SENT;
} else if (hashVector == NULL) {
// we don't have anyone else to ask and no hash
DHTgetCAPIResponse* capiGetRespMsg =
DhtDumpEntry result;
result.setKey(dhtMsg->getKey());
capiGetRespMsg->setResultArraySize(1);
capiGetRespMsg->setResult(0, result);
capiGetRespMsg->setIsSuccess(false);
sendRpcResponse(it->second.getCallMsg, capiGetRespMsg);
#if 0
cout << "DHT: GET failed: hash (no one else)" << endl;
cout << "numResponses: " << it->second.numResponses
<< " numAvailableReplica: " << it->second.numAvailableReplica << endl;
for (itHashes = it->second.hashes.begin();
itHashes != it->second.hashes.end(); itHashes++) {
cout << " - " << itHashes->first << " ("
<< itHashes->second.size() << ")" << endl;
}
#endif
pendingRpcs.erase(rpcId);
return;
} else {
// we don't have anyone else to ask => take what we've got
it->second.hashVector = hashVector;
}
}
}
if ((it->second.state != GET_VALUE_SENT) &&
(it->second.hashVector != NULL)) {
// we have already received all the response and chosen a hash
if (it->second.hashVector->size() > 0) {
DHTGetCall* getCall = new DHTGetCall();
getCall->setKey(it->second.getCallMsg->getKey());
getCall->setKind(it->second.getCallMsg->getKind());
getCall->setId(it->second.getCallMsg->getId());
getCall->setIsHash(false);
getCall->setBitLength(GETCALL_L(getCall));
numBytesNormal += getCall->getByteLength());
sendRouteRpcCall(TIER1_COMP, it->second.hashVector->back(),
getCall, NULL, DEFAULT_ROUTING, -1, 0, rpcId);
it->second.hashVector->pop_back();
it->second.state = GET_VALUE_SENT;
} else { // we don't have anyone else to ask
DHTgetCAPIResponse* capiGetRespMsg = new DHTgetCAPIResponse();
capiGetRespMsg->setResultArraySize(0);
sendRpcResponse(it->second.getCallMsg, capiGetRespMsg);
//cout << "DHT: GET failed: hash2 (no one else)" << endl;
pendingRpcs.erase(rpcId);
}
}
}
void DHT::handleLookupResponse ( LookupResponse lookupMsg,
int  rpcId 
)
private

Definition at line 805 of file DHT.cc.

{
PendingRpcs::iterator it = pendingRpcs.find(rpcId);
if (it == pendingRpcs.end()) {
return;
}
if (it->second.putCallMsg != NULL) {
#if 0
cout << "DHT::handleLookupResponse(): PUT "
<< lookupMsg->getKey() << " ("
<< overlay->getThisNode().getKey() << ")" << endl;
for (unsigned int i = 0; i < lookupMsg->getSiblingsArraySize(); i++) {
cout << i << ": " << lookupMsg->getSiblings(i) << endl;
}
#endif
if ((lookupMsg->getIsValid() == false)
|| (lookupMsg->getSiblingsArraySize() == 0)) {
EV << "[DHT::handleLookupResponse()]\n"
<< " Unable to get replica list : invalid lookup"
<< endl;
DHTputCAPIResponse* capiPutRespMsg = new DHTputCAPIResponse();
capiPutRespMsg->setIsSuccess(false);
//cout << "DHT::lookup failed" << endl;
sendRpcResponse(it->second.putCallMsg, capiPutRespMsg);
pendingRpcs.erase(rpcId);
return;
}
if ((it->second.putCallMsg->getId() == 0) &&
(it->second.putCallMsg->getValue().size() > 0)) {
// pick a random id before replication of the data item
// id 0 is kept for delete requests (i.e. a put with empty value)
it->second.putCallMsg->setId(intuniform(1, 2147483647));
}
for (unsigned int i = 0; i < lookupMsg->getSiblingsArraySize(); i++) {
DHTPutCall* dhtMsg = new DHTPutCall();
dhtMsg->setKey(it->second.putCallMsg->getKey());
dhtMsg->setKind(it->second.putCallMsg->getKind());
dhtMsg->setId(it->second.putCallMsg->getId());
dhtMsg->setValue(it->second.putCallMsg->getValue());
dhtMsg->setTtl(it->second.putCallMsg->getTtl());
dhtMsg->setIsModifiable(it->second.putCallMsg->getIsModifiable());
dhtMsg->setMaintenance(false);
dhtMsg->setBitLength(PUTCALL_L(dhtMsg));
numBytesNormal += dhtMsg->getByteLength());
dhtMsg, NULL, DEFAULT_ROUTING, -1,
0, rpcId);
}
it->second.state = PUT_SENT;
it->second.numResponses = 0;
it->second.numFailed = 0;
it->second.numSent = lookupMsg->getSiblingsArraySize();
}
else if (it->second.getCallMsg != NULL) {
#if 0
cout << "DHT::handleLookupResponse(): GET "
<< lookupMsg->getKey() << " ("
<< overlay->getThisNode().getKey() << ")" << endl;
for (unsigned int i = 0; i < lookupMsg->getSiblingsArraySize(); i++) {
cout << i << ": " << lookupMsg->getSiblings(i) << endl;
}
#endif
if ((lookupMsg->getIsValid() == false)
|| (lookupMsg->getSiblingsArraySize() == 0)) {
EV << "[DHT::handleLookupResponse()]\n"
<< " Unable to get replica list : invalid lookup"
<< endl;
DHTgetCAPIResponse* capiGetRespMsg = new DHTgetCAPIResponse();
DhtDumpEntry result;
result.setKey(lookupMsg->getKey());
capiGetRespMsg->setResultArraySize(1);
capiGetRespMsg->setResult(0, result);
capiGetRespMsg->setIsSuccess(false);
//cout << "DHT: lookup failed 2" << endl;
sendRpcResponse(it->second.getCallMsg, capiGetRespMsg);
pendingRpcs.erase(rpcId);
return;
}
it->second.numSent = 0;
for (unsigned int i = 0; i < lookupMsg->getSiblingsArraySize(); i++) {
if (i < (unsigned int)numGetRequests) {
DHTGetCall* dhtMsg = new DHTGetCall();
dhtMsg->setKey(it->second.getCallMsg->getKey());
dhtMsg->setKind(it->second.getCallMsg->getKind());
dhtMsg->setId(it->second.getCallMsg->getId());
dhtMsg->setIsHash(true);
dhtMsg->setBitLength(GETCALL_L(dhtMsg));
numBytesNormal += dhtMsg->getByteLength());
sendRouteRpcCall(TIER1_COMP, lookupMsg->getSiblings(i), dhtMsg,
NULL, DEFAULT_ROUTING, -1, 0, rpcId);
it->second.numSent++;
} else {
// we don't send, we just store the remaining keys
it->second.replica.push_back(lookupMsg->getSiblings(i));
}
}
it->second.numAvailableReplica = lookupMsg->getSiblingsArraySize();
it->second.numResponses = 0;
it->second.hashVector = NULL;
it->second.state = GET_HASH_SENT;
}
}
void DHT::handlePutCAPIRequest ( DHTputCAPICall capiPutMsg)
private

Definition at line 486 of file DHT.cc.

{
// asks the replica list
LookupCall* lookupCall = new LookupCall();
lookupCall->setKey(capiPutMsg->getKey());
lookupCall->setNumSiblings(numReplica);
sendInternalRpcCall(OVERLAY_COMP, lookupCall, NULL, -1, 0,
capiPutMsg->getNonce());
PendingRpcsEntry entry;
entry.putCallMsg = capiPutMsg;
entry.state = LOOKUP_STARTED;
pendingRpcs.insert(make_pair(capiPutMsg->getNonce(), entry));
}
void DHT::handlePutRequest ( DHTPutCall dhtMsg)
private

Definition at line 292 of file DHT.cc.

{
std::string tempString = "PUT_REQUEST received: "
+ std::string(dhtMsg->getKey().toString(16));
getParentModule()->getParentModule()->bubble(tempString.c_str());
bool err;
bool isSibling = overlay->isSiblingFor(overlay->getThisNode(),
dhtMsg->getKey(), secureMaintenance ? numReplica : 1, &err);
if (err) {
isSibling = true;
}
if (secureMaintenance && dhtMsg->getMaintenance()) {
dhtMsg->getKind(),
dhtMsg->getId());
if (entry == NULL) {
// add ttl timer
DHTTtlTimer *timerMsg = new DHTTtlTimer("ttl_timer");
timerMsg->setKey(dhtMsg->getKey());
timerMsg->setKind(dhtMsg->getKind());
timerMsg->setId(dhtMsg->getId());
scheduleAt(simTime() + dhtMsg->getTtl(), timerMsg);
entry = dataStorage->addData(dhtMsg->getKey(), dhtMsg->getKind(),
dhtMsg->getId(), dhtMsg->getValue(), timerMsg,
dhtMsg->getIsModifiable(), dhtMsg->getSrcNode(),
isSibling);
} else if ((entry->siblingVote.size() == 0) && isSibling) {
// we already have a verified entry with this key and are
// still responsible => ignore maintenance calls
delete dhtMsg;
return;
}
SiblingVoteMap::iterator it = entry->siblingVote.find(dhtMsg->getValue());
if (it == entry->siblingVote.end()) {
// new hash
NodeVector vect;
vect.add(dhtMsg->getSrcNode());
entry->siblingVote.insert(make_pair(dhtMsg->getValue(),
vect));
} else {
it->second.add(dhtMsg->getSrcNode());
}
size_t maxCount = 0;
SiblingVoteMap::iterator majorityIt;
for (it = entry->siblingVote.begin(); it != entry->siblingVote.end(); it++) {
if (it->second.size() > maxCount) {
maxCount = it->second.size();
majorityIt = it;
}
}
entry->value = majorityIt->first;
entry->responsible = true;
if (maxCount > numReplica) {
entry->siblingVote.clear();
}
// send back
DHTPutResponse* responseMsg = new DHTPutResponse();
responseMsg->setSuccess(true);
responseMsg->setBitLength(PUTRESPONSE_L(responseMsg));
RECORD_STATS(normalMessages++; numBytesNormal += responseMsg->getByteLength());
sendRpcResponse(dhtMsg, responseMsg);
return;
}
#if 0
if (!(dataStorage->isModifiable(dhtMsg->getKey(), dhtMsg->getKind(),
dhtMsg->getId()))) {
// check if the put request came from the right node
NodeHandle sourceNode = dataStorage->getSourceNode(dhtMsg->getKey(),
dhtMsg->getKind(), dhtMsg->getId());
if (((!sourceNode.isUnspecified())
&& (!dhtMsg->getSrcNode().isUnspecified()) && (sourceNode
!= dhtMsg->getSrcNode())) || ((dhtMsg->getMaintenance())
&& (dhtMsg->getOwnerNode() == sourceNode))) {
// TODO: set owner
DHTPutResponse* responseMsg = new DHTPutResponse();
responseMsg->setSuccess(false);
responseMsg->setBitLength(PUTRESPONSE_L(responseMsg));
numBytesNormal += responseMsg->getByteLength());
sendRpcResponse(dhtMsg, responseMsg);
return;
}
}
#endif
// remove data item from local data storage
dataStorage->removeData(dhtMsg->getKey(), dhtMsg->getKind(),
dhtMsg->getId());
if (dhtMsg->getValue().size() > 0) {
// add ttl timer
DHTTtlTimer *timerMsg = new DHTTtlTimer("ttl_timer");
timerMsg->setKey(dhtMsg->getKey());
timerMsg->setKind(dhtMsg->getKind());
timerMsg->setId(dhtMsg->getId());
scheduleAt(simTime() + dhtMsg->getTtl(), timerMsg);
// storage data item in local data storage
dataStorage->addData(dhtMsg->getKey(), dhtMsg->getKind(),
dhtMsg->getId(), dhtMsg->getValue(), timerMsg,
dhtMsg->getIsModifiable(), dhtMsg->getSrcNode(),
isSibling);
}
// send back
DHTPutResponse* responseMsg = new DHTPutResponse();
responseMsg->setSuccess(true);
responseMsg->setBitLength(PUTRESPONSE_L(responseMsg));
RECORD_STATS(normalMessages++; numBytesNormal += responseMsg->getByteLength());
sendRpcResponse(dhtMsg, responseMsg);
}
void DHT::handlePutResponse ( DHTPutResponse dhtMsg,
int  rpcId 
)
private

Definition at line 531 of file DHT.cc.

{
PendingRpcs::iterator it = pendingRpcs.find(rpcId);
if (it == pendingRpcs.end()) // unknown request
return;
if (dhtMsg->getSuccess()) {
it->second.numResponses++;
} else {
it->second.numFailed++;
}
// if ((it->second.numFailed + it->second.numResponses) == it->second.numSent) {
if (it->second.numResponses / (double)it->second.numSent > 0.5) {
DHTputCAPIResponse* capiPutRespMsg = new DHTputCAPIResponse();
capiPutRespMsg->setIsSuccess(true);
sendRpcResponse(it->second.putCallMsg, capiPutRespMsg);
pendingRpcs.erase(rpcId);
}
}
bool DHT::handleRpcCall ( BaseCallMessage msg)
private

Definition at line 105 of file DHT.cc.

{
// RPCs between nodes
// internal RPCs
return RPC_HANDLED;
}
void DHT::handleRpcResponse ( BaseResponseMessage msg,
cPolymorphic *  context,
int  rpcId,
simtime_t  rtt 
)
private

Definition at line 120 of file DHT.cc.

{
RPC_ON_RESPONSE(DHTPut){
handlePutResponse(_DHTPutResponse, rpcId);
EV << "[DHT::handleRpcResponse()]\n"
<< " DHT Put RPC Response received: id=" << rpcId
<< " msg=" << *_DHTPutResponse << " rtt=" << rtt
<< endl;
break;
}
RPC_ON_RESPONSE(DHTGet) {
handleGetResponse(_DHTGetResponse, rpcId);
EV << "[DHT::handleRpcResponse()]\n"
<< " DHT Get RPC Response received: id=" << rpcId
<< " msg=" << *_DHTGetResponse << " rtt=" << rtt
<< endl;
break;
}
handleLookupResponse(_LookupResponse, rpcId);
EV << "[DHT::handleRpcResponse()]\n"
<< " Lookup RPC Response received: id=" << rpcId
<< " msg=" << *_LookupResponse << " rtt=" << rtt
<< endl;
break;
}
}
void DHT::handleRpcTimeout ( BaseCallMessage msg,
const TransportAddress dest,
cPolymorphic *  context,
int  rpcId,
const OverlayKey destKey 
)
private

Definition at line 151 of file DHT.cc.

{
RPC_ON_CALL(DHTPut){
EV << "[DHT::handleRpcResponse()]\n"
<< " DHTPut Timeout"
<< endl;
PendingRpcs::iterator it = pendingRpcs.find(rpcId);
if (it == pendingRpcs.end()) // unknown request
return;
it->second.numFailed++;
if (it->second.numFailed / (double)it->second.numSent >= 0.5) {
DHTputCAPIResponse* capiPutRespMsg = new DHTputCAPIResponse();
capiPutRespMsg->setIsSuccess(false);
sendRpcResponse(it->second.putCallMsg, capiPutRespMsg);
//cout << "timeout 1" << endl;
pendingRpcs.erase(rpcId);
}
break;
}
RPC_ON_CALL(DHTGet) {
EV << "[DHT::handleRpcResponse()]\n"
<< " DHTGet Timeout"
<< endl;
PendingRpcs::iterator it = pendingRpcs.find(rpcId);
if (it == pendingRpcs.end()) { // unknown request
return;
}
if (it->second.state == GET_VALUE_SENT) {
// we have sent a 'real' get request
// ask anyone else, if possible
if ((it->second.hashVector != NULL)
&& (it->second.hashVector->size() > 0)) {
DHTGetCall* getCall = new DHTGetCall();
getCall->setKey(_DHTGetCall->getKey());
getCall->setKind(_DHTGetCall->getKind());
getCall->setId(_DHTGetCall->getId());
getCall->setIsHash(false);
getCall->setBitLength(GETCALL_L(getCall));
numBytesNormal += getCall->getByteLength());
sendRouteRpcCall(TIER1_COMP, it->second.hashVector->back(),
getCall, NULL, DEFAULT_ROUTING, -1, 0, rpcId);
it->second.hashVector->pop_back();
} else {
// no one else
DHTgetCAPIResponse* capiGetRespMsg = new DHTgetCAPIResponse();
capiGetRespMsg->setIsSuccess(false);
sendRpcResponse(it->second.getCallMsg,
capiGetRespMsg);
//cout << "DHT: GET failed: timeout (no one else)" << endl;
pendingRpcs.erase(rpcId);
return;
}
} else {
// timeout while waiting for hashes
// try to ask another one of the replica list for the hash
if (it->second.replica.size() > 0) {
DHTGetCall* getCall = new DHTGetCall();
getCall->setKey(_DHTGetCall->getKey());
getCall->setKind(_DHTGetCall->getKind());
getCall->setId(_DHTGetCall->getId());
getCall->setIsHash(true);
getCall->setBitLength(GETCALL_L(getCall));
numBytesNormal += getCall->getByteLength());
sendRouteRpcCall(TIER1_COMP, it->second.replica.back(),
getCall, NULL, DEFAULT_ROUTING, -1, 0,
rpcId);
it->second.replica.pop_back();
} else {
// no one else to ask, see what we can do with what we have
if (it->second.numResponses > 0) {
unsigned int maxCount = 0;
NodeVector* hashVector = NULL;
std::map<BinaryValue, NodeVector>::iterator itHashes;
for (itHashes = it->second.hashes.begin();
itHashes != it->second.hashes.end(); itHashes++) {
if (itHashes->second.size() > maxCount) {
maxCount = itHashes->second.size();
hashVector = &(itHashes->second);
}
}
// since it makes no difference for us, if we
// return a invalid result or return nothing,
// we simply return the value with the highest probability
it->second.hashVector = hashVector;
#if 0
if ((double)maxCount/(double)it->second.numResponses >=
it->second.hashVector = hashVector;
}
#endif
}
if ((it->second.hashVector != NULL)
&& (it->second.hashVector->size() > 0)) {
DHTGetCall* getCall = new DHTGetCall();
getCall->setKey(_DHTGetCall->getKey());
getCall->setKind(_DHTGetCall->getKind());
getCall->setId(_DHTGetCall->getId());
getCall->setIsHash(false);
getCall->setBitLength(GETCALL_L(getCall));
numBytesNormal += getCall->getByteLength());
sendRouteRpcCall(TIER1_COMP, it->second.hashVector->back(),
getCall, NULL, DEFAULT_ROUTING, -1,
0, rpcId);
it->second.hashVector->pop_back();
} else {
// no more nodes to ask -> get failed
DHTgetCAPIResponse* capiGetRespMsg = new DHTgetCAPIResponse();
capiGetRespMsg->setIsSuccess(false);
sendRpcResponse(it->second.getCallMsg, capiGetRespMsg);
//cout << "DHT: GET failed: timeout2 (no one else)" << endl;
pendingRpcs.erase(rpcId);
}
}
}
break;
}
}
void DHT::handleTimerEvent ( cMessage *  msg)
private

Definition at line 88 of file DHT.cc.

{
DHTTtlTimer* msg_timer = dynamic_cast<DHTTtlTimer*> (msg);
if (msg_timer) {
EV << "[DHT::handleTimerEvent()]\n"
<< " received timer ttl, key: "
<< msg_timer->getKey().toString(16)
<< "\n (overlay->getThisNode().getKey() = "
<< overlay->getThisNode().getKey().toString(16) << ")"
<< endl;
dataStorage->removeData(msg_timer->getKey(), msg_timer->getKind(),
msg_timer->getId());
}
}
void DHT::initializeApp ( int  stage)
private

Definition at line 57 of file DHT.cc.

{
if (stage != MIN_STAGE_APP)
return;
dataStorage = check_and_cast<DHTDataStorage*>
(getParentModule()->getSubmodule("dhtDataStorage"));
numReplica = par("numReplica");
numGetRequests = par("numGetRequests");
ratioIdentical = par("ratioIdentical");
secureMaintenance = par("secureMaintenance");
invalidDataAttack = par("invalidDataAttack");
maintenanceAttack = par("maintenanceAttack");
if ((int)numReplica > overlay->getMaxNumSiblings()) {
opp_error("DHT::initialize(): numReplica bigger than what this "
"overlay can handle (%d)", overlay->getMaxNumSiblings());
}
WATCH_MAP(pendingRpcs);
}
int DHT::resultValuesBitLength ( DHTGetResponse msg)
private

Definition at line 943 of file DHT.cc.

{
int bitSize = 0;
for (uint i = 0; i < msg->getResultArraySize(); i++) {
bitSize += msg->getResult(i).getValue().size();
}
return bitSize;
}
void DHT::sendMaintenancePutCall ( const TransportAddress dest,
const OverlayKey key,
const DhtDataEntry entry 
)
private

Definition at line 778 of file DHT.cc.

{
DHTPutCall* dhtMsg = new DHTPutCall();
dhtMsg->setKey(key);
dhtMsg->setKind(entry.kind);
dhtMsg->setId(entry.id);
dhtMsg->setValue("Modified Data");
} else {
dhtMsg->setValue(entry.value);
}
dhtMsg->setTtl((int)SIMTIME_DBL(entry.ttlMessage->getArrivalTime()
- simTime()));
dhtMsg->setMaintenance(true);
dhtMsg->setBitLength(PUTCALL_L(dhtMsg));
numBytesMaintenance += dhtMsg->getByteLength());
sendRouteRpcCall(TIER1_COMP, node, dhtMsg);
}
void DHT::update ( const NodeHandle node,
bool  joined 
)
private

Definition at line 690 of file DHT.cc.

{
bool err = false;
DhtDataEntry entry;
std::map<OverlayKey, DhtDataEntry>::iterator it;
EV << "[DHT::update() @ " << overlay->getThisNode().getIp()
<< " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
<< " Update called()"
<< endl;
for (it = dataStorage->begin(); it != dataStorage->end(); it++) {
if (it->second.responsible) {
NodeVector* siblings = overlay->local_lookup(it->first,
false);
if (siblings->size() == 0) {
delete siblings;
continue;
}
if (joined) {
EV << "[DHT::update() @ " << overlay->getThisNode().getIp()
<< " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
<< " Potential new sibling for record " << it->first
<< endl;
if (overlay->distance(node.getKey(), it->first) <=
overlay->distance(siblings->back().getKey(), it->first)) {
sendMaintenancePutCall(node, it->first, it->second);
}
if (overlay->distance(overlay->getThisNode().getKey(), it->first) >
overlay->distance(siblings->back().getKey(), it->first)) {
it->second.responsible = false;
}
} else {
if (overlay->distance(node.getKey(), it->first) <
overlay->distance(siblings->back().getKey(), it->first)) {
sendMaintenancePutCall(siblings->back(), it->first,
it->second);
}
}
delete siblings;
}
}
return;
}
for (it = dataStorage->begin(); it != dataStorage->end(); it++) {
key = it->first;
entry = it->second;
if (joined) {
if (entry.responsible && (overlay->isSiblingFor(node, key,
numReplica, &err)
|| err)) { // hack for Chord, if we've got a new predecessor
if (err) {
EV << "[DHT::update()]\n"
<< " Unable to know if key: " << key
<< " is in range of node: " << node
<< endl;
// For Chord: we've got a new predecessor
// TODO: only send record, if we are not responsible any more
// TODO: check all protocols to change routing table first,
// and than call update.
//if (overlay->isSiblingFor(overlay->getThisNode(), key, 1, &err)) {
// continue;
//}
}
sendMaintenancePutCall(node, key, entry);
}
}
//TODO: move this to the inner block above?
key, 1, &err);
}
}

Friends And Related Function Documentation

std::ostream& operator<< ( std::ostream &  Stream,
const PendingRpcsEntry entry 
)
friend

Definition at line 952 of file DHT.cc.

{
if (entry.getCallMsg) {
os << "GET";
} else if (entry.putCallMsg) {
os << "PUT";
}
os << " state: " << entry.state
<< " numSent: " << entry.numSent
<< " numResponses: " << entry.numResponses
<< " numFailed: " << entry.numFailed
<< " numAvailableReplica: " << entry.numAvailableReplica;
if (entry.replica.size() > 0) {
os << " replicaSize: " << entry.replica.size();
}
if (entry.hashVector != NULL) {
os << " hashVectorSize: " << entry.hashVector->size();
}
if (entry.hashes.size() > 0) {
os << " hashes:";
std::map<BinaryValue, NodeVector>::const_iterator it;
int i = 0;
for (it = entry.hashes.begin(); it != entry.hashes.end(); it++, i++) {
os << " hash" << i << ":" << it->second.size();
}
}
return os;
}

Member Data Documentation

DHTDataStorage* DHT::dataStorage
private

pointer to the dht data storage

Definition at line 129 of file DHT.h.

bool DHT::invalidDataAttack
private

if node is malicious, it tries a invalidData attack

Definition at line 122 of file DHT.h.

bool DHT::maintenanceAttack
private

if node is malicious, it tries a maintenanceData attack

Definition at line 123 of file DHT.h.

double DHT::maintenanceMessages
private

Definition at line 116 of file DHT.h.

double DHT::normalMessages
private

Definition at line 117 of file DHT.h.

double DHT::numBytesMaintenance
private

Definition at line 118 of file DHT.h.

double DHT::numBytesNormal
private

Definition at line 119 of file DHT.h.

int DHT::numGetRequests
private

Definition at line 114 of file DHT.h.

uint DHT::numReplica
private

Definition at line 113 of file DHT.h.

PendingRpcs DHT::pendingRpcs
private

a map of all pending RPC operations

Definition at line 126 of file DHT.h.

double DHT::ratioIdentical
private

Definition at line 115 of file DHT.h.

bool DHT::secureMaintenance
private

use a secure maintenance algorithm based on majority decisions

Definition at line 121 of file DHT.h.


The documentation for this class was generated from the following files: