OverSim
BaseRpc.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2006 Institut fuer Telematik, Universitaet Karlsruhe (TH)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 //
18 
27 #include <vector>
28 #include <string>
29 #include <cassert>
30 
31 #include <CommonMessages_m.h>
33 #include <GlobalStatisticsAccess.h>
34 #include <NeighborCache.h>
35 #include <CryptoModule.h>
36 #include <Vivaldi.h>
37 #include <OverlayAccess.h>
38 
39 #include "BaseRpc.h"
40 #include "RpcMacros.h"
41 
42 
43 //------------------------------------------------------------------------
44 //--- Initialization & finishing -----------------------------------------
45 //------------------------------------------------------------------------
46 
47 
48 //------------------------------------------------------------------------
49 //--- RPC Handling -------------------------------------------------------
50 //------------------------------------------------------------------------
51 
53 {
54  defaultRpcListener = NULL;
55  neighborCache = NULL;
56  cryptoModule = NULL;
57 }
58 
59 bool BaseRpc::internalHandleMessage(cMessage* msg)
60 {
61  // process self-messages and RPC-timeouts
62  if (msg->isSelfMessage()) {
63  // process rpc self-messages
64  BaseRpcMessage* rpcMessage = dynamic_cast<BaseRpcMessage*>(msg);
65  if (rpcMessage != NULL) {
66  internalHandleRpcMessage(rpcMessage);
67  return true;
68  }
69  // process all other self-messages
70  handleTimerEvent(msg);
71  return true;
72  }
73 
74  // process RPC messages
75  BaseRpcMessage* rpcMessage = dynamic_cast<BaseRpcMessage*>(msg);
76  if (rpcMessage != NULL) {
77  internalHandleRpcMessage(rpcMessage);
78  return true;
79  }
80 
81  // other messages are processed by derived classes
82  // (e.g. BaseOverlay / BaseApp)
83  return false;
84 }
85 
86 void BaseRpc::handleTimerEvent(cMessage* msg)
87 {
88  // ...
89 }
90 
91 //private
93 {
94  // set friend modules
96 
97  rpcUdpTimeout = par("rpcUdpTimeout");
98  rpcKeyTimeout = par("rpcKeyTimeout");
99  optimizeTimeouts = par("optimizeTimeouts");
100  rpcExponentialBackoff = par("rpcExponentialBackoff");
101 
102  rpcsPending = 0;
103  rpcStates.clear();
104 
106 
107  //set ping cache
108  numPingSent = 0;
109  bytesPingSent = 0;
112 
113  WATCH(numPingSent);
114  WATCH(bytesPingSent);
115  WATCH(numPingResponseSent);
116  WATCH(bytesPingResponseSent);
117 
118  // set overlay pointer
119  overlay = OverlayAccess().get(this);
120 
121  // register component
124 
125  // get pointer to the neighborCache
126  cModule *mod = getParentModule();
127  while (neighborCache == NULL) {
128  neighborCache = (NeighborCache*)mod->getSubmodule("neighborCache");
129  mod = mod->getParentModule();
130  if (!mod)
131  throw cRuntimeError("BaseRpc::initRpc: "
132  "Module type contains no NeighborCache!");
133  }
134 
135  // get pointer to the cryptoModule
136  mod = getParentModule();
137  cryptoModule = NULL;
138  while (cryptoModule == NULL) {
139  cryptoModule = (CryptoModule*)mod->getSubmodule("cryptoModule");
140  mod = mod->getParentModule();
141  if (!mod)
142  throw cRuntimeError("BaseRpc::initRpc: CryptoModule not found!");
143  }
144 }
145 
146 //private
148 {
149  cancelAllRpcs();
150 
151  // delete default rpc listener
152  if (defaultRpcListener != NULL) {
153  delete defaultRpcListener;
154  defaultRpcListener = NULL;
155  }
156 }
157 
159 {
160  // stop all rpcs
161  for (RpcStates::iterator i = rpcStates.begin();
162  i != rpcStates.end(); i++) {
163  cancelAndDelete(i->second.callMsg);
164  cancelAndDelete(i->second.timeoutMsg);
165  delete i->second.dest;
166  i->second.dest = NULL;
167  delete i->second.context;
168  i->second.context = NULL;
169  }
170  rpcStates.clear();
171 }
172 
173 uint32_t BaseRpc::sendRpcCall(TransportType transportType,
174  CompType destComp,
175  const TransportAddress& dest,
176  const OverlayKey& destKey,
177  BaseCallMessage* msg,
178  cPolymorphic* context,
179  RoutingType routingType,
180  simtime_t timeout,
181  int retries,
182  int rpcId,
183  RpcListener* rpcListener)
184 {
185  // create nonce, timeout and set default parameters
186  uint32_t nonce;
187  do {
188  nonce = intuniform(1, 2147483647);
189  } while (rpcStates.count(nonce) > 0);
190 
191  if (timeout == -1) {
192  switch (transportType) {
193  case INTERNAL_TRANSPORT:
194  timeout = 0;
195  break;
196  case UDP_TRANSPORT:
197  if (optimizeTimeouts) {
198  timeout = neighborCache->getNodeTimeout(dest);
199  if (timeout == -1) timeout = rpcUdpTimeout;
200  } else timeout = rpcUdpTimeout;
201  break;
202  case ROUTE_TRANSPORT:
203  timeout = (destKey.isUnspecified() ?
204  rpcUdpTimeout :
205  rpcKeyTimeout);
206  break;
207  default:
208  throw cRuntimeError("BaseRpc::sendRpcMessage(): "
209  "Unknown RpcTransportType!");
210  }
211  }
212 
213  if (rpcListener == NULL)
214  rpcListener = defaultRpcListener;
215 
216  // create state
217  RpcState state;
218  state.id = rpcId;
219  state.timeSent = simTime();
220  state.dest = dest.dup();
221  state.destKey = destKey;
222  state.srcComp = getThisCompType();
223  state.destComp = destComp;
224  state.listener = rpcListener;
225  state.timeoutMsg = new RpcTimeoutMessage();
226  state.timeoutMsg->setNonce(nonce);
227  state.retries = retries;
228  state.rto = timeout;
229  state.transportType = transportType;
230  //state.transportType = (destKey.isUnspecified() && (dest.getSourceRouteSize() == 0)
231  // ? UDP_TRANSPORT : transportType); //test
232  state.routingType = routingType;
233  state.context = context;
234 
235  if (rpcStates.count(nonce) > 0)
236  throw cRuntimeError("RPC nonce collision");
237 
238  // set message parameters
239  msg->setNonce(nonce);
240  if (transportType == ROUTE_TRANSPORT)
241  msg->setSrcNode(overlay->getThisNode());
242  else
243  msg->setSrcNode(thisNode);
244  msg->setType(RPC);
245 
246  // sign the message
247  // if (transportType != INTERNAL_TRANSPORT) cryptoModule->signMessage(msg);
248 
249  // save copy of call message in RpcState
250  state.callMsg = dynamic_cast<BaseCallMessage*>(msg->dup());
251  assert(!msg->getEncapsulatedPacket() || !msg->getEncapsulatedPacket()->getControlInfo());
252 
253  // register state
254  rpcStates[nonce] = state;
255 
256  // schedule timeout message
257  if (state.rto != 0)
258  scheduleAt(simTime() + state.rto, state.timeoutMsg);
259 
260  // TODO: cleanup code to have only one type for source routes
261  std::vector<TransportAddress> sourceRoute;
262  sourceRoute.push_back(dest);
263  if (dest.getSourceRouteSize() > 0) {
264  state.transportType = transportType = ROUTE_TRANSPORT;
265  sourceRoute.insert(sourceRoute.begin(), dest.getSourceRoute().rend(),
266  dest.getSourceRoute().rbegin());
267  // remove the original source route from the destination
268  sourceRoute.back().clearSourceRoute();
269  }
270  sendRpcMessageWithTransport(transportType, destComp, routingType,
271  sourceRoute, destKey, msg);
272 
273  return nonce;
274 }
275 
276 
277 //public
278 void BaseRpc::cancelRpcMessage(uint32_t nonce)
279 {
280  if (rpcStates.count(nonce)==0)
281  return;
282  RpcState state = rpcStates[nonce];
283  rpcStates.erase(nonce);
284  cancelAndDelete(state.callMsg);
285  cancelAndDelete(state.timeoutMsg);
286  delete state.dest;
287  state.dest = NULL;
288  delete state.context;
289  state.context = NULL;
290 }
291 
292 //protected
294 {
295  // check if this is a rpc call message
296  BaseCallMessage* rpCall = dynamic_cast<BaseCallMessage*>(msg);
297  if (rpCall != NULL) {
298  // verify the message signature
299  //cryptoModule->verifyMessage(msg);
300 
301  OverlayCtrlInfo* overlayCtrlInfo =
302  dynamic_cast<OverlayCtrlInfo*>(msg->getControlInfo());
303 
304  if (overlayCtrlInfo && overlayCtrlInfo->getSrcRoute().isUnspecified() &&
305  (!overlayCtrlInfo->getLastHop().isUnspecified())) {
306  overlayCtrlInfo->setSrcRoute(NodeHandle(msg->getSrcNode().getKey(),
307  overlayCtrlInfo->getLastHop()));
308  }
309 
310  bool rpcHandled = true;
311  if (!handleRpcCall(rpCall)) rpcHandled = internalHandleRpcCall(rpCall);
312  if (!rpcHandled) {
313  EV << "[BaseRpc::internalHandleRpcMessage() @ " << thisNode.getIp()
314  << " (" << thisNode.getKey().toString(16) << ")]\n"
315  << " Error: RPC '" << msg->getFullName()<< "' was not handled"
316  << endl;
317  delete msg;
318  }
319  return;
320  }
321 
322  // get nonce
323  int nonce = msg->getNonce();
324 
325  // nonce known? no -> delete message and return
326  if (rpcStates.count(nonce)==0) {
327  EV << "[BaseRpc::internalHandleRpcMessage() @ " << thisNode.getIp()
328  << " " << thisNode.getKey().toString(16) << ")]\n"
329  << " RPC: Nonce Unknown"
330  << endl;
331  delete msg;
332  return;
333  }
334 
335  // get state and remove from map
336  RpcState state = rpcStates[nonce];
337  rpcStates.erase(nonce);
338 
339  // is timeout message?
340  if (msg->isSelfMessage() &&
341  (dynamic_cast<RpcTimeoutMessage*>(msg) != NULL)) {
342  // yes-> inform listener
343 
344  // retry?
345  state.retries--;
346  if (state.retries>=0) {
347  // TODO: cleanup code to have only one type for source routes
348  std::vector<TransportAddress> sourceRoute;
349  sourceRoute.push_back(*state.dest);
350  if (state.dest->getSourceRouteSize() > 0) {
351  sourceRoute.insert(sourceRoute.begin(),
352  state.dest->getSourceRoute().rend(),
353  state.dest->getSourceRoute().rbegin());
354  // remove the original source route from the destination
355  sourceRoute.back().clearSourceRoute();
356  }
357 
359  state.routingType,
360  sourceRoute,
361  state.destKey,
362  dynamic_cast<BaseCallMessage*>
363  (state.callMsg->dup()));
364 
365  if (rpcExponentialBackoff) {
366  state.rto *= 2;
367  }
368 
369  if (state.rto!=0)
370  scheduleAt(simTime() + state.rto, msg);
371 
372  state.timeSent = simTime();
373  rpcStates[nonce] = state;
374  return;
375  }
376  EV << "[BaseRpc::internalHandleRpcMessage() @ " << thisNode.getIp()
377  << " " << thisNode.getKey().toString(16) << ")]\n"
378  << " RPC to " << state.dest->getIp()
379  << " timeout (" << state.callMsg->getName() << ")"
380  << endl;
381 
382  // inform neighborcache
383  if (state.transportType == UDP_TRANSPORT ||
384  (!state.dest->isUnspecified() && state.destKey.isUnspecified())) {
386  }
387 
388  // inform listener
389  if (state.listener != NULL)
390  state.listener->handleRpcTimeout(state);
391 
392  // inform overlay
393  internalHandleRpcTimeout(state.callMsg, *state.dest, state.context,
394  state.id, state.destKey);
395  handleRpcTimeout(state);
396 
397  } else { // no-> handle rpc response
398 
399  // verify the message signature
400  //cryptoModule->verifyMessage(msg);
401 
402  OverlayCtrlInfo* overlayCtrlInfo =
403  dynamic_cast<OverlayCtrlInfo*>(msg->getControlInfo());
404 
405  if (overlayCtrlInfo && overlayCtrlInfo->getSrcRoute().isUnspecified() &&
406  (!overlayCtrlInfo->getLastHop().isUnspecified())) {
407  overlayCtrlInfo->setSrcRoute(NodeHandle(msg->getSrcNode().getKey(),
408  overlayCtrlInfo->getLastHop()));
409  }
410 
411  // drop responses with wrong source key
412  if (state.destKey.isUnspecified()) {
413  const NodeHandle* stateHandle =
414  dynamic_cast<const NodeHandle*>(state.dest);
415  if (stateHandle != NULL &&
416  stateHandle->getKey() != msg->getSrcNode().getKey()) {
417 
418  EV << "[BaseRpc::internalHandleRpcMessage() @ "
419  << thisNode.getIp()
420  << " " << thisNode.getKey().toString(16) << ")]\n"
421  << " Dropping RPC: Invalid source key"
422  << endl;
423 
424  // restore state to trigger timeout message
425  rpcStates[nonce] = state;
426  delete msg;
427  return;
428  }
429  }
430 
431  // get parameters
432  simtime_t rtt = simTime() - state.timeSent;
433  BaseResponseMessage* response
434  = dynamic_cast<BaseResponseMessage*>(msg);
435 
436  //if (state.transportType == UDP_TRANSPORT)
437  // globalStatistics->recordOutVector("BaseRpc: UDP Round Trip Time",
438  // rtt);
439 
440  // neighborCache/ncs stuff
441  if (state.transportType == UDP_TRANSPORT ||
442  (state.transportType != INTERNAL_TRANSPORT &&
443  response->getCallHopCount() == 1)) {
444  unsigned int ncsArraySize = response->getNcsInfoArraySize();
445  if (ncsArraySize > 0) {
446  std::vector<double> tempCoords(ncsArraySize);
447  for (uint8_t i = 0; i < ncsArraySize; i++) {
448  tempCoords[i] = response->getNcsInfo(i);
449  }
450  AbstractNcsNodeInfo* coords =
452 
453  OverlayCtrlInfo* ctrlInfo =
454  dynamic_cast<OverlayCtrlInfo*>(response->getControlInfo());
455 
456  neighborCache->updateNode(response->getSrcNode(), rtt,
457  (ctrlInfo ?
458  ctrlInfo->getSrcRoute() :
460  coords);
461  } else {
462  neighborCache->updateNode(response->getSrcNode(), rtt);
463  }
464  }
465 
466  // inform listener
467  if (state.listener != NULL)
468  state.listener->handleRpcResponse(response, state, rtt);
469 
470  // inform overlay
471  internalHandleRpcResponse(response, state.context, state.id, rtt);
472  handleRpcResponse(response, state, rtt);
473 
474  // delete response
475  delete response->removeControlInfo();
476  delete response;
477  }
478 
479  // delete messages
480  delete state.callMsg;
481  cancelAndDelete(state.timeoutMsg);
482  delete state.dest;
483 
484  // clean up pointers
485  state.dest = NULL;
486  state.context = NULL;
487  state.callMsg = NULL;
488  state.timeoutMsg = NULL;
489 }
490 
491 //private
493 {
494  RPC_SWITCH_START( msg );
495  RPC_DELEGATE( Ping, pingRpcCall );
496  RPC_SWITCH_END( );
497 
498  return RPC_HANDLED;
499 }
500 
502  cPolymorphic* context,
503  int rpcId, simtime_t rtt)
504 {
505  // call rpc stubs
506  RPC_SWITCH_START( msg );
507  RPC_ON_RESPONSE( Ping ) {
508  pingRpcResponse(_PingResponse, context, rpcId, rtt);
509  }
510  RPC_SWITCH_END( );
511 }
512 
514  const TransportAddress& dest,
515  cPolymorphic* context,
516  int rpcId, const OverlayKey& destKey)
517 {
518  RPC_SWITCH_START( msg ) {
519  RPC_ON_CALL( Ping ) {
520  pingRpcTimeout(_PingCall, dest, context, rpcId);
521  }
522  }
523  RPC_SWITCH_END( )
524 }
525 
526 //virtual protected
528 {
529  return false;
530 }
531 
533  CompType compType,
534  const TransportAddress& dest,
535  const OverlayKey& destKey,
536  BaseCallMessage* call,
537  BaseResponseMessage* response)
538 {
539  if (call == NULL || response == NULL) {
540  throw cRuntimeError("call or response = NULL!");
541  }
542 
543  // vivaldi: set coordinates and error estimation in response
544  if (neighborCache->piggybackOwnCoords()) { //TODO only for directly sent msgs
545  std::vector<double> nodeCoord =
547 
548  response->setNcsInfoArraySize(nodeCoord.size());
549  for (uint32_t i = 0; i < nodeCoord.size(); i++) {
550  response->setNcsInfo(i, nodeCoord[i]);
551  }
552  }
553 
554  assert(transportType == INTERNAL_TRANSPORT ||
555  !dest.isUnspecified() ||
556  !destKey.isUnspecified());
557 
558  if (transportType == ROUTE_TRANSPORT)
559  response->setSrcNode(overlay->getThisNode());
560  else
561  response->setSrcNode(thisNode);
562  response->setType(RPC);
563  response->setNonce(call->getNonce());
564  response->setStatType(call->getStatType());
565 
566  RoutingType routingType = NO_OVERLAY_ROUTING;
567  OverlayCtrlInfo* overlayCtrlInfo = NULL;
568  if (dynamic_cast<OverlayCtrlInfo*>(call->getControlInfo())) {
569  overlayCtrlInfo =
570  static_cast<OverlayCtrlInfo*>(call->removeControlInfo());
571  response->setCallHopCount(overlayCtrlInfo->getHopCount());
572  } else {
573  delete call->removeControlInfo();
574  response->setCallHopCount(1); // one udp hop (?)
575  }
576 
577  // source routing
578  std::vector<TransportAddress> sourceRoute;
579  if (overlayCtrlInfo && transportType == ROUTE_TRANSPORT) {
580  routingType =
581  static_cast<RoutingType>(overlayCtrlInfo->getRoutingType());
582  for (uint32_t i = overlayCtrlInfo->getVisitedHopsArraySize(); i > 0; --i) {
583  sourceRoute.push_back(overlayCtrlInfo->getVisitedHops(i - 1));
584  }
585  }
586 
587  if (sourceRoute.size() == 0) {
588  // empty visited hops list => direct response
589  sourceRoute.push_back(dest);
590  }
591 
592  sendRpcMessageWithTransport(transportType, compType,
593  routingType, sourceRoute,
594  destKey, response);
595  delete overlayCtrlInfo;
596  delete call;
597 }
598 
599 //protected
601  BaseResponseMessage* response)
602 {
603  const TransportAddress* destNode = &(call->getSrcNode());
604  const OverlayKey* destKey = &(call->getSrcNode().getKey());
605 
606  OverlayCtrlInfo* overlayCtrlInfo =
607  dynamic_cast<OverlayCtrlInfo*>(call->getControlInfo());
608 
609  // "magic" transportType selection: internal
610  if (overlayCtrlInfo &&
611  overlayCtrlInfo->getTransportType() == INTERNAL_TRANSPORT) {
613  static_cast<CompType>(overlayCtrlInfo->getSrcComp()),
614  *destNode, *destKey, call, response);
615  } else {
616  internalSendRpcResponse(call, response);
617  }
618 }
619 
621  CompType destComp,
622  RoutingType routingType,
623  const std::vector<TransportAddress>& sourceRoute,
624  const OverlayKey& destKey,
625  BaseRpcMessage* message)
626 {
627  switch (transportType) {
628  case UDP_TRANSPORT: {
629  sendMessageToUDP(sourceRoute[0], message);
630  break;
631  }
632  case ROUTE_TRANSPORT: {
633  internalSendRouteRpc(message, destKey,
634  sourceRoute, routingType);
635  break;
636  }
637  case INTERNAL_TRANSPORT: {
638  cGate *destCompGate = overlay->getCompRpcGate(destComp);
639  if (destCompGate == NULL) {
640  throw cRuntimeError("BaseRpc::sendRpcMessageWithTransport():"
641  " INTERNAL_RPC to unknown RpcCompType!");
642  }
643  OverlayCtrlInfo *overlayCtrlInfo = new OverlayCtrlInfo();
644  overlayCtrlInfo->setSrcComp(getThisCompType());
645  overlayCtrlInfo->setDestComp(destComp);
646  overlayCtrlInfo->setTransportType(INTERNAL_TRANSPORT);
647  message->setControlInfo(overlayCtrlInfo);
648  sendDirect(message, destCompGate);
649  break;
650  }
651  default:
652  throw cRuntimeError("BaseRpc::sendRpcMessageWithTransport: "
653  "invalid transportType!");
654  break;
655  }
656 }
657 
658 // ping RPC stuff
659 void BaseRpc::pingResponse(PingResponse* response, cPolymorphic* context,
660  int rpcId, simtime_t rtt)
661 {
662 }
663 
665  cPolymorphic* context, int rpcId)
666 {
667 }
668 
670 {
671  std::string pongName(call->getName());
672  if (pongName == "PING")
673  pongName = "PONG";
674  else {
675  pongName = "PONG: [ ";
676  pongName += call->getName();
677  pongName += " ]";
678  }
679 
680  PingResponse* response = new PingResponse(pongName.c_str());
681  response->setBitLength(PINGRESPONSE_L(response));
683  response->getByteLength());
684 
685  sendRpcResponse(call, response );
686 }
687 
689  cPolymorphic* context, int rpcId, simtime_t rtt)
690 {
691  pingResponse(response, context, rpcId, rtt);
692 }
693 
695  const TransportAddress& dest,
696  cPolymorphic* context,
697  int rpcId)
698 {
699  pingTimeout(pingCall, dest, context, rpcId);
700 }
701 
702 int BaseRpc::pingNode(const TransportAddress& dest, simtime_t timeout,
703  int retries, cPolymorphic* context,
704  const char* caption, RpcListener* rpcListener,
705  int rpcId, TransportType transportType)
706 {
707  PingCall* call = new PingCall(caption);
708  call->setBitLength(PINGCALL_L(call));
709  RECORD_STATS(numPingSent++; bytesPingSent += call->getByteLength());
710 
711  if (transportType == UDP_TRANSPORT ||
712  (transportType != ROUTE_TRANSPORT &&
714  return sendUdpRpcCall(dest, call, context, timeout, retries, rpcId,
715  rpcListener);
716  } else {
717  return sendRouteRpcCall(getThisCompType(), dest, call, context,
718  DEFAULT_ROUTING, timeout, retries, rpcId,
719  rpcListener);
720  }
721 }
722 
723