25 #include <IPAddressResolver.h>
48 twoStageResolution = par(
"twoStageResolution");
49 keepaliveInterval = par(
"keepaliveInterval");
50 idCacheLifetime = par(
"idCacheLifetime");
52 p2pnsCache = check_and_cast<
P2pnsCache*> (getParentModule()->
53 getSubmodule(
"p2pnsCache"));
73 Enter_Method_Silent();
80 EV <<
"[P2pns::tunnel()]\n"
81 <<
" Establishing new cache entry for key: " << destKey
85 lookupCall->
setKey(destKey);
87 sendInternalRpcCall(
OVERLAY_COMP, lookupCall, NULL, -1, 0,
89 p2pnsCache->addIdCacheEntry(destKey, &payload);
92 EV <<
"[P2pns::tunnel()]\n"
93 <<
" Queuing packet since lookup is still pending for key: "
100 sendTunnelMessage(entry->
addr, payload);
107 p2pnsCache->getIdCacheEntry(lookupResponse->
getKey());
119 EV <<
"[P2pns::handleTunnelLookupResponse()]\n"
120 <<
" Lookup response " << lookupResponse->
getSiblings(0)
121 <<
" doesn't match requested id " << lookupResponse->
getKey()
124 p2pnsCache->removeIdCacheEntry(lookupResponse->
getKey());
137 scheduleAt(simTime() + keepaliveInterval, msg);
146 p2pnsCache->removeIdCacheEntry(lookupResponse->
getKey());
153 EV <<
"[P2pns::sendTunnelMessage()]\n"
154 <<
" Sending TUNNEL message to " << addr << endl;
166 Enter_Method_Silent();
168 std::string name = par(
"registerName").stdstringValue();
171 EV <<
"[P2pns::p2pnsRegisterRpc()]\n"
172 <<
" registerId(): name: " << name <<
" addr: " << addr
180 dhtPutMsg->
setTtl(60*60*24*7);
190 if (xmlRpcInterface) {
191 xmlRpcInterface->deliverTunneledMessage(tunnelMsg->
getPayload());
194 updateIdCacheWithNewTransport(msg);
200 int rpcId, simtime_t rtt)
206 cPolymorphic* context,
int rpcId)
213 (entry = p2pnsCache->getIdCacheEntry(*key))) {
217 EV <<
"[P2pns::pingTimeout()]\n"
218 <<
" Removing id " << key <<
" from idCache (ping timeout)"
220 p2pnsCache->removeIdCacheEntry(*key);
240 if ((entry->
lastUsage + idCacheLifetime) < simTime()) {
242 EV <<
"[P2pns::handleTimerEvent()]\n"
243 <<
" Removing id " << timer->
getKey()
244 <<
" from idCache (connection idle)"
246 p2pnsCache->removeIdCacheEntry(timer->
getKey());
257 scheduleAt(simTime() + keepaliveInterval, msg);
261 lookupCall->
setKey(overlay->getThisNode().getKey());
264 sendInternalRpcCall(
OVERLAY_COMP, lookupCall, NULL, -1, 0,
279 EV <<
"[P2pns::updateCacheWithNewTransport()]\n"
280 <<
" Can't update cache without knowing the originator id"
291 EV <<
"[P2pns::updateCacheWithNewTransport()]\n"
292 <<
" Can't update cache without knowing the originator id"
304 EV <<
"[P2pns::updateCacheWithNewTransport()]\n"
305 <<
" Adding new cache entry for id " << srcId
308 entry = p2pnsCache->addIdCacheEntry(srcId);
315 scheduleAt(simTime() + keepaliveInterval, msg);
322 EV <<
"[P2pns::handleRpcCall()]\n"
323 <<
" Ping with new transport address received: "
324 <<
" Changing from " << entry->
addr <<
" to "
326 <<
" for id " << srcId << endl;
338 updateIdCacheWithNewTransport(msg);
350 cPolymorphic* context,
int rpcId, simtime_t rtt)
354 EV <<
"[P2pns::handleRpcResponse()]\n"
355 <<
" DHTputCAPI RPC Response received: id=" << rpcId
356 <<
" msg=" << *_DHTputCAPIResponse <<
" rtt=" << rtt
358 if (dynamic_cast<P2pnsRegisterCall*>(context)) {
359 handleDHTputCAPIResponse(_DHTputCAPIResponse,
360 check_and_cast<P2pnsRegisterCall*>(context));
365 EV <<
"[P2pns::handleRpcResponse()]\n"
366 <<
" DHTgetCAPI RPC Response received: id=" << rpcId
367 <<
" msg=" << *_DHTgetCAPIResponse <<
" rtt=" << rtt
369 handleDHTgetCAPIResponse(_DHTgetCAPIResponse,
370 check_and_cast<P2pnsResolveCall*>(context));
374 EV <<
"[P2pns::handleRpcResponse()]\n"
375 <<
" Lookup RPC Response received: id=" << rpcId
376 <<
" msg=" << *_LookupResponse <<
" rtt=" << rtt
378 handleLookupResponse(_LookupResponse, context, rpcId);
386 p2pnsCache->addData(registerCall->
getP2pName(),
391 EV <<
"[P2pns::p2pnsRegisterRpc()]\n"
392 <<
" RegisterRpc: name: " << registerCall->
getP2pName()
397 if (twoStageResolution) {
398 dhtPutMsg->
setValue(overlay->getThisNode().getKey().toString());
408 sendInternalRpcCall(
TIER1_COMP, dhtPutMsg, registerCall);
415 EV <<
"[P2pns::p2pnsResolveRpc()]\n"
416 <<
" ResolveRpc: name: " << resolveCall->
getP2pName()
423 sendInternalRpcCall(
TIER1_COMP, dhtGetMsg, resolveCall);
433 sendRpcResponse(registerCall, registerResponse);
447 resolveResponse->
setKind(0, 0);
448 resolveResponse->
setId(0, 0);
450 sendRpcResponse(resolveCall, resolveResponse);
458 if (twoStageResolution) {
460 throw cRuntimeError(
"P2pns::handleDHTgetCAPIResponse: "
461 "Two-stage name resolution currently only "
462 "works with unique keys!");
464 std::stringstream valueStream;
470 lookupCall->setKey(key);
471 lookupCall->setNumSiblings(1);
473 sendInternalRpcCall(
OVERLAY_COMP, lookupCall, resolveCall, -1, 0,
479 EV <<
"[P2pns::handleDHTgetCAPIResponse()]\n"
480 <<
" ResolveRpcResponse: name: " << resolveCall->
getP2pName();
497 sendRpcResponse(resolveCall, resolveResponse);
505 case RESOLVE_LOOKUP: {
509 stringstream sstream;
518 resolveResponse->
setAddress(0, sstream.str());
519 resolveResponse->
setKind(0, 0);
520 resolveResponse->
setId(0, 0);
522 sendRpcResponse(resolveCall, resolveResponse);
526 handleTunnelLookupResponse(lookupResponse);
531 throw cRuntimeError(
"P2pns::handleLookupResponse(): invalid rpcId!");