OverSim
XmlRpcInterface.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2007 Institut fuer Telematik, Universitaet Karlsruhe (TH)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 //
18 
24 #include <platdep/sockets.h>
25 #if not defined _WIN32
26 #include <arpa/inet.h>
27 #endif
28 
29 #if not defined _WIN32 && not defined __APPLE__
30 #include <netinet/ip6.h>
31 #endif
32 
33 #include <sstream>
34 
35 #include <NodeVector.h>
36 #include <P2pns.h>
37 
38 #include "XmlRpcInterface.h"
39 
40 using namespace XmlRpc;
41 
43 
44 // Register a name with P2PNS
46 {
47 public:
49  XmlRpcServerMethod("register", s)
50  {
51  }
52 
53  void execute(XmlRpcValue& params, XmlRpcValue& result)
54  {
55  (dynamic_cast<XmlRpcInterface*>(_server))->p2pnsRegister(params, result);
56  }
57 
58  std::string help()
59  {
60  return std::string("Register a name with P2PNS");
61  }
62 };
63 
64 // Register a name with P2PNS
66 {
67 public:
69  XmlRpcServerMethod("resolve", s)
70  {
71  }
72 
73  void execute(XmlRpcValue& params, XmlRpcValue& result)
74  {
75  (dynamic_cast<XmlRpcInterface*>(_server))->p2pnsResolve(params, result);
76  }
77 
78  std::string help()
79  {
80  return std::string("Resolve a name with P2PNS");
81  }
82 };
83 
84 // Common API: local_lookup()
86 {
87 public:
89  XmlRpcServerMethod("local_lookup", s)
90  {
91  }
92 
93  void execute(XmlRpcValue& params, XmlRpcValue& result)
94  {
95  (dynamic_cast<XmlRpcInterface*>(_server))->localLookup(params, result);
96  }
97 
98  std::string help()
99  {
100  return std::string("Lookup key in local "
101  "routing table");
102  }
103 };
104 
105 // Common API: lookup()
107 {
108 public:
110  XmlRpcServerMethod("lookup", s)
111  {
112  }
113 
114  void execute(XmlRpcValue& params, XmlRpcValue& result)
115  {
116  (dynamic_cast<XmlRpcInterface*>(_server))->lookup(params, result);
117  }
118 
119  std::string help()
120  {
121  return std::string("Lookup key with KBR layer");
122  }
123 };
124 
125 // Store a value in the DHT
126 class Put : public XmlRpcServerMethod
127 {
128 public:
130  XmlRpcServerMethod("put", s)
131  {
132  }
133 
134  void execute(XmlRpcValue& params, XmlRpcValue& result)
135  {
136  (dynamic_cast<XmlRpcInterface*>(_server))->put(params, result);
137  }
138 
139  std::string help()
140  {
141  return std::string("Store a value in the DHT");
142  }
143 };
144 
145 // Get a value from the DHT
146 class Get : public XmlRpcServerMethod
147 {
148 public:
150  XmlRpcServerMethod("get", s)
151  {
152  }
153 
154  void execute(XmlRpcValue& params, XmlRpcValue& result)
155  {
156  (dynamic_cast<XmlRpcInterface*>(_server))->get(params, result);
157  }
158 
159  std::string help()
160  {
161  return std::string("Get a value from the DHT");
162  }
163 };
164 
165 // Dump all local records from the DHT
167 {
168 public:
170  XmlRpcServerMethod("dump_dht", s)
171  {
172  }
173 
174  void execute(XmlRpcValue& params, XmlRpcValue& result)
175  {
176  (dynamic_cast<XmlRpcInterface*>(_server))->dumpDht(params, result);
177  }
178 
179  std::string help()
180  {
181  return std::string("Dump all local records from the DHT");
182  }
183 };
184 
185 // Join the overlay with a specific nodeID
187 {
188 public:
190  XmlRpcServerMethod("join", s)
191  {
192  }
193 
194  void execute(XmlRpcValue& params, XmlRpcValue& result)
195  {
196  (dynamic_cast<XmlRpcInterface*>(_server))->joinOverlay(params, result);
197  }
198 
199  std::string help()
200  {
201  return std::string("Join the overlay with a specific nodeID");
202  }
203 };
204 
206 {
207  if ((params.size() != 5) ||
208  (params[0].getType() != XmlRpcValue::TypeBase64) ||
209  (params[1].getType() != XmlRpcValue::TypeInt) ||
210  (params[2].getType() != XmlRpcValue::TypeInt) ||
211  (params[3].getType() != XmlRpcValue::TypeBase64) ||
212  (params[4].getType() != XmlRpcValue::TypeInt))
213  throw XmlRpcException("register(base64 name, int kind, int id, base64 address, int ttl): "
214  "Invalid argument type");
215 
216  if (overlay->getCompModule(TIER2_COMP) == NULL)
217  throw XmlRpcException("register(base64 name, int kind, int id, base64 address, int ttl): "
218  "No P2PNS service");
219 
220  if (!isPrivileged()) {
221  throw XmlRpcException("register(base64 name, int kind, base64 address, "
222  "int ttl): Not allowed");
223  }
224 
225  state[curAppFd]._connectionState = EXECUTE_REQUEST;
226 
227  P2pnsRegisterCall* registerCall = new P2pnsRegisterCall();
228  registerCall->setP2pName(((const XmlRpcValue::BinaryData&)params[0]));
229  registerCall->setKind((int)params[1]);
230  registerCall->setId((int)params[2]);
231  registerCall->setAddress(((const XmlRpcValue::BinaryData&)params[3]));
232  registerCall->setTtl(params[4]);
233 
234  sendInternalRpcWithTimeout(TIER2_COMP, registerCall);
235 }
236 
238 {
239  if ((params.size() != 2) ||
240  (params[0].getType() != XmlRpcValue::TypeBase64) ||
241  (params[1].getType() != XmlRpcValue::TypeInt))
242  throw XmlRpcException("resolve(base64 name, int kind): Invalid argument type");
243 
244  if (overlay->getCompModule(TIER2_COMP) == NULL)
245  throw XmlRpcException("resolve(base64 name, int kind): No P2PNS service");
246 
247  state[curAppFd]._connectionState = EXECUTE_REQUEST;
248 
249  P2pnsResolveCall* resolveCall = new P2pnsResolveCall();
250 
251  resolveCall->setP2pName(((const XmlRpcValue::BinaryData&)params[0]));
252  resolveCall->setKind((int)params[1]);
253  resolveCall->setId(0);
254 
255  sendInternalRpcWithTimeout(TIER2_COMP, resolveCall);
256 }
257 
258 
260 {
261  if ((params.size() != 3)
262  || (params[0].getType() != XmlRpcValue::TypeBase64)
263  || (params[1].getType() != XmlRpcValue::TypeInt)
264  || (params[2].getType() != XmlRpcValue::TypeBoolean))
265  throw XmlRpcException("local_lookup(base64 key, int num, "
266  "boolean safe): Invalid argument type");
267 
268  BinaryValue keyString = (const XmlRpcValue::BinaryData&)params[0];
269 
270  NodeVector* nextHops = NULL;
271 
272  if (keyString.size() > 0) {
273  nextHops = overlay->local_lookup(OverlayKey::sha1(keyString),
274  params[1], params[2]);
275  } else {
276  nextHops = overlay->local_lookup(overlay->getThisNode().getKey(),
277  params[1], params[2]);
278  }
279 
280  for (uint32_t i=0; i < nextHops->size(); i++) {
281  result[i][0] = (*nextHops)[i].getIp().str();
282  result[i][1] = (*nextHops)[i].getPort();
283  result[i][2] = (*nextHops)[i].getKey().toString(16);
284  }
285 
286  delete nextHops;
287 }
288 
290 {
291  if (((params.size() != 2)
292  || (params[0].getType() != XmlRpcValue::TypeBase64)
293  || (params[1].getType() != XmlRpcValue::TypeInt))
294  && ((params.size() != 3)
295  || (params[0].getType() != XmlRpcValue::TypeBase64)
296  || (params[1].getType() != XmlRpcValue::TypeInt)
297  || (params[2].getType() != XmlRpcValue::TypeInt)))
298  throw XmlRpcException("lookup(base64 key, int numSiblings(, "
299  "int RoutingType)): Invalid argument type");
300 
301  if ((int)params[1] > overlay->getMaxNumSiblings())
302  throw XmlRpcException("lookup(base64 key, int numSiblings(, "
303  "int RoutingType)): numSibling to big");
304 
305  if (params.size() == 3) {
306  if (((int)params[2] != DEFAULT_ROUTING) &&
307  ((int)params[2] != ITERATIVE_ROUTING) &&
308  ((int)params[2] != EXHAUSTIVE_ITERATIVE_ROUTING) &&
309  ((int)params[2] != SEMI_RECURSIVE_ROUTING) &&
310  ((int)params[2] != FULL_RECURSIVE_ROUTING) &&
311  ((int)params[2] != RECURSIVE_SOURCE_ROUTING)) {
312 
313  throw XmlRpcException("lookup(base64 key, int numSiblings(, "
314  "int RoutingType)): invalid routingType");
315  }
316  }
317 
318  state[curAppFd]._connectionState = EXECUTE_REQUEST;
319 
320  LookupCall* lookupCall = new LookupCall();
321 
322  BinaryValue keyString = (const XmlRpcValue::BinaryData&)params[0];
323  lookupCall->setKey(OverlayKey::sha1(keyString));
324  lookupCall->setNumSiblings(params[1]);
325 
326  if (params.size() == 3) {
327  lookupCall->setRoutingType(params[2]);
328  }
329 
330  sendInternalRpcWithTimeout(OVERLAY_COMP, lookupCall);
331 }
332 
334 {
335  if ((params.size() != 1)
336  || (params[0].getType() != XmlRpcValue::TypeBase64))
337  throw XmlRpcException("join(base64 nodeID): Invalid argument type");
338 
339  if (!isPrivileged()) {
340  throw XmlRpcException("join(base64 nodeID): Not allowed");
341  }
342 
343  BinaryValue nodeID = (const XmlRpcValue::BinaryData&)params[0];
344 
345  overlay->join(OverlayKey::sha1(nodeID));
346 
347  result[0] = 0;
348 }
349 
351 {
352  if ((params.size() != 4)
353  || (params[0].getType() != XmlRpcValue::TypeBase64)
354  || (params[1].getType() != XmlRpcValue::TypeBase64)
355  || (params[2].getType() != XmlRpcValue::TypeInt)
356  || (params[3].getType() != XmlRpcValue::TypeString))
357  throw XmlRpcException("put(base64 key, base64 value, int ttl "
358  ", string application): Invalid argument type");
359 
360  if (!isPrivileged()) {
361  throw XmlRpcException("put(base64 key, base64 value, int ttl "
362  ", string application): Not allowed");
363  }
364 
365  if (overlay->getCompModule(TIER1_COMP) == NULL)
366  throw XmlRpcException("put(base64 key, base64 value, int ttl "
367  ", string application): No DHT service");
368 
369  state[curAppFd]._connectionState = EXECUTE_REQUEST;
370 
371  DHTputCAPICall* dhtPutMsg = new DHTputCAPICall();
372 
373  BinaryValue keyString = (const XmlRpcValue::BinaryData&)params[0];
374 
375  dhtPutMsg->setKey(OverlayKey::sha1(keyString));
376  dhtPutMsg->setValue(((const XmlRpcValue::BinaryData&)params[1]));
377  dhtPutMsg->setTtl(params[2]);
378  dhtPutMsg->setIsModifiable(true);
379 
380  sendInternalRpcWithTimeout(TIER1_COMP, dhtPutMsg);
381 }
382 
384 {
385  if ((params.size() != 4)
386  || (params[0].getType() != XmlRpcValue::TypeBase64)
387  || (params[1].getType() != XmlRpcValue::TypeInt)
388  || (params[2].getType() != XmlRpcValue::TypeBase64)
389  || (params[3].getType() != XmlRpcValue::TypeString))
390  throw XmlRpcException("get(base64 key, int num, base64 placemark "
391  ", string application): Invalid argument type");
392 
393  if (overlay->getCompModule(TIER1_COMP) == NULL)
394  throw XmlRpcException("get(base64 key, int num, base64 placemark "
395  ", string application): No DHT service");
396 
397  state[curAppFd]._connectionState = EXECUTE_REQUEST;
398 
399  DHTgetCAPICall* dhtGetMsg = new DHTgetCAPICall();
400 
401  BinaryValue keyString = (const XmlRpcValue::BinaryData&)params[0];
402  dhtGetMsg->setKey(OverlayKey::sha1(keyString));
403 
404  sendInternalRpcWithTimeout(TIER1_COMP, dhtGetMsg);
405 }
406 
408 {
409  if (params.size() != 1) {
410  throw XmlRpcException("dump_dht(int dummy): Invalid argument type");
411  }
412 
413  if (!isPrivileged()) {
414  throw XmlRpcException("dump_dht(int dummy): Not allowed");
415  }
416 
417  if (overlay->getCompModule(TIER1_COMP) == NULL)
418  throw XmlRpcException("dump_dht(): No DHT service");
419 
420  state[curAppFd]._connectionState = EXECUTE_REQUEST;
421 
422  DHTdumpCall* call = new DHTdumpCall();
423 
424  sendInternalRpcWithTimeout(TIER1_COMP, call);
425 }
426 
427 
429 {
430  if (limitAccess) {
431  return state[curAppFd].localhost;
432  } else {
433  return true;
434  }
435 }
436 
438 {
439  // all initialization is done in the first stage
440  if (stage != MAX_STAGE_APP)
441  return;
442 
443  packetNotification = new cMessage("packetNotification");
444  mtu = par("mtu");
445  limitAccess = par("limitAccess");
446 
447  scheduler = check_and_cast<RealtimeScheduler *>(simulation.getScheduler());
448  scheduler->setInterfaceModule(this, packetNotification, &packetBuffer, mtu,
449  true);
450 
451  appTunFd = scheduler->getAppTunFd();
452 
453  p2pns = dynamic_cast<P2pns*>(overlay->getCompModule(TIER2_COMP));
454 
456 
457  _localLookup = new LocalLookup(this);
458  _lookup = new Lookup(this);
459  _register = new P2pnsRegister(this);
460  _resolve = new P2pnsResolve(this);
461  _put = new Put(this);
462  _get = new Get(this);
463  _dumpDht = new DumpDht(this);
464  _joinOverlay = new JoinOverlay(this);
465 
466  enableIntrospection(true);
467 
468  curAppFd = INVALID_SOCKET;
469 }
470 
472 {
473  p2pns = NULL;
474 
475  _localLookup = NULL;
476  _lookup = NULL;
477  _register = NULL;
478  _resolve = NULL;
479  _put = NULL;
480  _get = NULL;
481  _dumpDht = NULL;
482  _joinOverlay = NULL;
483  _addContact = NULL;
484  _removeContact = NULL;
485  _getContacts = NULL;
486  _getUserId = NULL;
487  _searchContact = NULL;
488  _subscribe = NULL;
489  _publish = NULL;
490  _pull_notification = NULL;
491 
492  packetNotification = NULL;
493 }
494 
496 {
497  delete _localLookup;
498  delete _lookup;
499  delete _register;
500  delete _resolve;
501  delete _put;
502  delete _get;
503  delete _dumpDht;
504  delete _joinOverlay;
505  delete _addContact;
506  delete _removeContact;
507  delete _getContacts;
508  delete _getUserId;
509  delete _searchContact;
510  delete _publish;
511  delete _subscribe;
512  delete _pull_notification;
513 
514  cancelAndDelete(packetNotification);
515 }
516 
518 {
519  if (state.count(curAppFd) && state[curAppFd].pendingRpc) {
520  cancelRpcMessage(state[curAppFd].pendingRpc);
521  }
522 
523  state[curAppFd].appFd = INVALID_SOCKET;
524  state[curAppFd].localhost = false;
525  state[curAppFd]._header = "";
526  state[curAppFd]._request = "";
527  state[curAppFd]._response = "";
528  state[curAppFd]._connectionState = READ_HEADER;
529  state[curAppFd]._keepAlive = true;
530  state[curAppFd].pendingRpc = 0;
531 }
532 
534 {
535  scheduler->closeAppSocket(curAppFd);
536  resetConnectionState();
537 }
538 
540  BaseCallMessage *call)
541 {
542  state[curAppFd].pendingRpc = sendInternalRpcCall(destComp, call, NULL,
543  XMLRPC_TIMEOUT, 0,
544  curAppFd);
545 }
546 
548 {
549  // Packet from the application...
550  if (msg==packetNotification) {
551  EV << "[XmlRpcInterface::handleMessage() @ " << overlay->getThisNode().getIp()
552  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
553  << " Message from application. Queue length = " << packetBuffer.size()
554  << endl;
555  while (packetBuffer.size() > 0) {
556  // get packet from buffer and parse it
557  PacketBufferEntry packet =
558  *(packetBuffer.begin());
559  packetBuffer.pop_front();
560  curAppFd = packet.fd;
561 
562  switch (packet.func) {
564  handleAppTunPacket(packet.data, packet.length);
565  break;
566 
568  if (state.count(curAppFd) == 0) {
569  throw cRuntimeError("XmlRpcInterface::handleMessage(): "
570  "Received packet "
571  "from unknown socket!");
572  }
573 
574  handleRealworldPacket(packet.data, packet.length);
575  break;
576 
578  if (state.count(curAppFd)) {
579  throw cRuntimeError("XmlRpcInterface::handleMessage(): "
580  "Connection state table corrupt!");
581  }
582 
583  resetConnectionState();
584 
585  if (packet.addr != NULL) {
586  if (((sockaddr_in*)packet.addr)->sin_addr.s_addr
587  == inet_addr("127.0.0.1")) {
588  state[curAppFd].localhost = true;
589  }
590  delete packet.addr;
591  packet.addr = NULL;
592  }
593  break;
594 
596  if (state.count(curAppFd) == 0) {
597  throw cRuntimeError("XmlRpcInterface::handleMessage(): "
598  "Trying to close unknown "
599  "connection!");
600  }
601 
602  resetConnectionState();
603  state.erase(curAppFd);
604  }
605 
606  if (packet.data) {
607  delete[] packet.data;
608  }
609  }
610  } else if (msg->isSelfMessage()) {
611  // process rpc self-messages
612  BaseRpcMessage* rpcMessage = dynamic_cast<BaseRpcMessage*>(msg);
613  if (rpcMessage!=NULL) {
614  internalHandleRpcMessage(rpcMessage);
615  return;
616  }
617 
618  delete msg;
619  } else {
620  // RPCs
621  BaseRpcMessage* rpcMessage = dynamic_cast<BaseRpcMessage*>(msg);
622  if (rpcMessage!=NULL) {
623  internalHandleRpcMessage(rpcMessage);
624  return;
625  }
626  // common API
627  CommonAPIMessage* commonAPIMsg = dynamic_cast<CommonAPIMessage*>(msg);
628  if (commonAPIMsg != NULL)
629  handleCommonAPIPacket(commonAPIMsg);
630 
631  CompReadyMessage* readyMsg = dynamic_cast<CompReadyMessage*>(msg);
632  if (readyMsg != NULL)
633  handleReadyMessage(readyMsg);
634 
635  delete msg;
636  }
637 }
638 
640  const TransportAddress& dest,
641  cPolymorphic* context, int rpcId,
642  const OverlayKey&)
643 {
644  curAppFd = rpcId;
645 
646  if (state.count(curAppFd) == 0)
647  return;
648 
649  std::cout << "XmlRpcInterface(): XML-RPC failed!" << endl;
650  state[curAppFd]._response = generateFaultResponse("XML-RPC timeout", 22);
651  state[curAppFd]._connectionState = WRITE_RESPONSE;
652  if (!writeResponse() ) {
653  closeConnection();
654  }
655 }
656 
658 {
659  if ((msg->getReady() == false) || (msg->getComp() != OVERLAY_COMP)) {
660  return;
661  }
662 
663  if (appTunFd != INVALID_SOCKET) {
664  // set TUN interface address using the current NodeId
665  // TODO: this is ugly
666  const OverlayKey& key = overlay->getThisNode().getKey();
667 
668  if (OverlayKey::getLength() < 100) {
669  throw cRuntimeError("XmlRpcInterface::handleReadyMessage(): "
670  "P2PNS needs at least 100 bit nodeIds!");
671  }
672 
673  std::stringstream addr;
674  addr << "2001:001";
675  for (int i = 0; i < 100/4; i++) {
676  if (((i + 3) % 4) == 0) {
677  addr << ":";
678  }
679  addr << std::hex << key.getBitRange(OverlayKey::getLength() -
680  4 * (i + 1), 4);
681  }
682 
683  std::string cmd = "/sbin/ip addr add " + addr.str() + "/28 dev tun0";
684 
685  EV << "XmlRpcInterface::handleOverlayReady(): "
686  "Setting TUN interface address " << addr.str() << endl;
687 
688  if (system(cmd.c_str()) != 0) {
689  EV << "XmlRpcInterface::handleOverlayReady(): "
690  "Failed to set TUN interface address!" << endl;
691  }
692 
693  if (system("/sbin/ip link set tun0 up") != 0) {
694  EV << "XmlRpcInterface::handleOverlayReady(): "
695  "Failed to set TUN interface up!" << endl;
696  }
697 
698  p2pns->registerId(addr.str());
699  }
700 }
701 
702 void XmlRpcInterface::handleAppTunPacket(char *buf, uint32_t length)
703 {
704 #if not defined _WIN32 && not defined __APPLE__
705  EV << "XmlRpcInterface::handleAppTunPacket(): packet of "
706  << "length " << length << endl;
707 
708  if (!p2pns) {
709  throw cRuntimeError("XmlRpcInterface::handleAppTunPacket(): "
710  "P2PNS module missing on tier2!");
711  }
712 
713  if (OverlayKey::getLength() < 100) {
714  throw cRuntimeError("XmlRpcInterface::handleAppTunPacket(): "
715  "P2PNS needs at least 100 bit nodeIds!");
716  }
717 
718  if (length < 40) {
719  EV << "XmlRpcInterface::handleAppTunPacket(): packet too "
720  << "short - discarding packet!" << endl;
721  return;
722  }
723 
724  ip6_hdr* ip_buf = (ip6_hdr*) buf;
725  if (((ip_buf->ip6_vfc & 0xf0) >> 4) != 6) {
726  EV << "XmlRpcInterface::handleAppTunPacket(): received packet "
727  "is no IPv6 - discarding packet!" << endl;
728  return;
729  }
730 
731  OverlayKey destKey = OverlayKey(ntohl(ip_buf->ip6_dst.s6_addr32[0]));
732 
733  for (int i = 1; i < 4; i++) {
734  destKey = (destKey << 32) + OverlayKey(ntohl(ip_buf->ip6_dst.s6_addr32[i]));
735  }
736  destKey = destKey << (OverlayKey::getLength() - 100);
737 
738  p2pns->tunnel(destKey, BinaryValue(buf, length));
739 #endif
740 }
741 
743 {
744 #if not defined _WIN32 && not defined __APPLE__
745  Enter_Method_Silent();
746 
747  if (payload.size() == 0) {
748  return;
749  }
750 
751  int curBytesWritten = scheduler->sendBytes(&payload[0],
752  payload.size(),
753  0, 0, true, appTunFd);
754 
755  if (curBytesWritten <= 0) {
756  throw cRuntimeError("XmlRpcServerConnection::deliverTunneledMessage(): "
757  "Error writing to application TUN device.");
758  }
759 #endif
760 }
761 
762 void XmlRpcInterface::handleRealworldPacket(char *buf, uint32_t length)
763 {
764  if (state[curAppFd]._connectionState == READ_HEADER) {
765  if (!readHeader(buf, length)) {
766  // discard data, if the header is invalid
767  state[curAppFd]._header = "";
768  state[curAppFd]._request = "";
769  state[curAppFd]._response = "";
770  state[curAppFd]._connectionState = READ_HEADER;
771  return;
772  }
773  }
774 
775  if (state[curAppFd]._connectionState == READ_REQUEST)
776  if (!readRequest(buf, length))
777  return;
778 
779  if (state[curAppFd]._connectionState == WRITE_RESPONSE)
780  if (!writeResponse() ) {
781  closeConnection();
782  return;
783  }
784 
785  return;
786 }
787 
789  cPolymorphic* context,
790  int rpcId,
791  simtime_t rtt)
792 {
793  curAppFd = rpcId;
794 
795  if (state.count(curAppFd) == 0) {
796  return;
797  }
798 
799  RPC_SWITCH_START(msg)
801  if (state[curAppFd]._connectionState != EXECUTE_REQUEST) break;
802 
803  XmlRpcValue resultValue;
804  resultValue.setSize(_LookupResponse->getSiblingsArraySize());
805 
806  if (_LookupResponse->getIsValid() == true) {
807  for (uint32_t i=0; i < _LookupResponse->getSiblingsArraySize();
808  i++) {
809  resultValue[i].setSize(3);
810  resultValue[i][0] =
811  _LookupResponse->getSiblings(i).getIp().str();
812  resultValue[i][1] =
813  _LookupResponse->getSiblings(i).getPort();
814  resultValue[i][2] =
815  _LookupResponse->getSiblings(i).getKey().toString(16);
816  }
817  state[curAppFd]._response = generateResponse(resultValue.toXml());
818  } else {
819  std::cout << "XmlRpcInterface(): lookup() failed!" << endl;
820  state[curAppFd]._response = generateFaultResponse("lookup() failed", 22);
821  }
822 
823  state[curAppFd]._connectionState = WRITE_RESPONSE;
824  if (!writeResponse()) {
825  closeConnection();
826  }
827  break;
828  }
830  if (state[curAppFd]._connectionState != EXECUTE_REQUEST)
831  break;
832 
833  XmlRpcValue resultValue;
834 
835  if (_P2pnsRegisterResponse->getIsSuccess() == true) {
836  resultValue = 0;
837  state[curAppFd]._response = generateResponse(resultValue.toXml());
838  } else {
839  std::cout << "XmlRpcInterface(): register() failed!" << endl;
840  state[curAppFd]._response = generateFaultResponse("register() failed", 22);
841  }
842 
843  state[curAppFd]._connectionState = WRITE_RESPONSE;
844  if (!writeResponse() ) {
845  closeConnection();
846  }
847  break;
848  }
850  if (state[curAppFd]._connectionState != EXECUTE_REQUEST)
851  break;
852 
853  XmlRpcValue resultValue;
854  resultValue.setSize(_P2pnsResolveResponse->getAddressArraySize());
855 
856  if (_P2pnsResolveResponse->getIsSuccess() == true) {
857  for (uint i=0; i < _P2pnsResolveResponse->getAddressArraySize(); i++) {
858  resultValue[i].setSize(3);
859  BinaryValue& addr = _P2pnsResolveResponse->getAddress(i);
860  resultValue[i][0] = XmlRpcValue(&addr[0], addr.size());
861  resultValue[i][1] = (int)_P2pnsResolveResponse->getKind(i);
862  resultValue[i][2] = (int)_P2pnsResolveResponse->getId(i);
863  }
864  state[curAppFd]._response = generateResponse(resultValue.toXml());
865  } else {
866  std::cout << "XmlRpcInterface(): resolve() failed!" << endl;
867  state[curAppFd]._response = generateFaultResponse("resolve() failed: Name not found", 9);
868  }
869 
870  state[curAppFd]._connectionState = WRITE_RESPONSE;
871  if (!writeResponse() ) {
872  closeConnection();
873  }
874  break;
875  }
876  RPC_ON_RESPONSE(DHTputCAPI) {
877  if (state[curAppFd]._connectionState != EXECUTE_REQUEST)
878  break;
879 
880  XmlRpcValue resultValue;
881 
882  if (_DHTputCAPIResponse->getIsSuccess() == true) {
883  resultValue = 0;
884  state[curAppFd]._response = generateResponse(resultValue.toXml());
885  } else {
886  std::cout << "XmlRpcInterface(): put() failed!" << endl;
887  state[curAppFd]._response = generateFaultResponse("put() failed", 22);
888  }
889 
890  state[curAppFd]._connectionState = WRITE_RESPONSE;
891  if (!writeResponse() ) {
892  closeConnection();
893  }
894  break;
895  }
896  RPC_ON_RESPONSE(DHTgetCAPI) {
897  if (state[curAppFd]._connectionState != EXECUTE_REQUEST)
898  break;
899 
900  XmlRpcValue resultValue;
901  resultValue.setSize(2);
902  resultValue[0].setSize(_DHTgetCAPIResponse->getResultArraySize());
903 
904  if (_DHTgetCAPIResponse->getIsSuccess() == true) {
905  for (uint i=0; i < _DHTgetCAPIResponse->getResultArraySize(); i++) {
906  resultValue[i].setSize(2);
907  DhtDumpEntry& entry = _DHTgetCAPIResponse->getResult(i);
908  resultValue[0][i] = XmlRpcValue(&(*(entry.getValue().begin())),
909  entry.getValue().size());
910  }
911  resultValue[1] = std::string();
912 
913  state[curAppFd]._response = generateResponse(resultValue.toXml());
914  } else {
915  std::cout << "XmlRpcInterface(): get() failed!" << endl;
916  state[curAppFd]._response = generateFaultResponse("get() failed", 22);
917  }
918 
919  state[curAppFd]._connectionState = WRITE_RESPONSE;
920  if (!writeResponse() ) {
921  closeConnection();
922  }
923  break;
924  }
925  RPC_ON_RESPONSE(DHTdump) {
926  if (state[curAppFd]._connectionState != EXECUTE_REQUEST)
927  break;
928 
929  XmlRpcValue resultValue;
930  resultValue.setSize(_DHTdumpResponse->getRecordArraySize());
931 
932  for (uint32_t i=0; i < _DHTdumpResponse->getRecordArraySize();
933  i++) {
934  resultValue[i].setSize(3);
935  resultValue[i][0] =
936  _DHTdumpResponse->getRecord(i).getKey().toString(16);
937  resultValue[i][1] = XmlRpcValue(
938  &(*(_DHTdumpResponse->getRecord(i).getValue().begin())),
939  _DHTdumpResponse->getRecord(i).getValue().size());
940  resultValue[i][2] =
941  _DHTdumpResponse->getRecord(i).getTtl();
942  }
943 
944  state[curAppFd]._response = generateResponse(resultValue.toXml());
945 
946  state[curAppFd]._connectionState = WRITE_RESPONSE;
947  if (!writeResponse()) {
948  closeConnection();
949  }
950  break;
951 
952  }
953  RPC_SWITCH_END( )
954 }
955 
957 {
958  error("DHTXMLRealworldApp::handleCommonAPIPacket(): Unknown Packet!");
959 }
960 
961 bool XmlRpcInterface::readHeader(char* buf, uint32_t length)
962 {
963  // Read available data
964  bool eof = false;
965 
966  state[curAppFd]._header.append(std::string(buf, length));
967 
968  if (length <= 0) {
969  // Its only an error if we already have read some data
970  if (state[curAppFd]._header.length() > 0)
971  XmlRpcUtil::error("XmlRpcServerConnection::readHeader: error "
972  "while reading header.");
973  return false;
974  }
975 
976  XmlRpcUtil::log(4, "XmlRpcServerConnection::readHeader: read %d bytes.",
977  state[curAppFd]._header.length());
978  char *hp = (char*)state[curAppFd]._header.c_str(); // Start of header
979  char *ep = hp + state[curAppFd]._header.length(); // End of string
980  char *bp = 0; // Start of body
981  char *lp = 0; // Start of content-length value
982  char *kp = 0; // Start of connection value
983 
984  for (char *cp = hp; (bp == 0) && (cp < ep); ++cp) {
985  if ((ep - cp > 16) && (strncasecmp(cp, "Content-length: ", 16) == 0))
986  lp = cp + 16;
987  else if ((ep - cp > 12) && (strncasecmp(cp, "Connection: ", 12) == 0))
988  kp = cp + 12;
989  else if ((ep - cp > 4) && (strncmp(cp, "\r\n\r\n", 4) == 0))
990  bp = cp + 4;
991  else if ((ep - cp > 2) && (strncmp(cp, "\n\n", 2) == 0))
992  bp = cp + 2;
993  }
994 
995  // If we haven't gotten the entire header yet, return (keep reading)
996  if (bp == 0) {
997  // EOF in the middle of a request is an error, otherwise its ok
998  if (eof) {
999  XmlRpcUtil::log(4, "XmlRpcServerConnection::readHeader: EOF");
1000  if (state[curAppFd]._header.length() > 0)
1001  XmlRpcUtil::error("XmlRpcServerConnection::readHeader: EOF while reading header");
1002  return false; // Either way we close the connection
1003  }
1004 
1005  return true; // Keep reading
1006  }
1007 
1008  // Decode content length
1009  if (lp == 0) {
1010  XmlRpcUtil::error("XmlRpcServerConnection::readHeader: No Content-length specified");
1011  return false; // We could try to figure it out by parsing as we read, but for now...
1012  }
1013 
1014  state[curAppFd]._contentLength = atoi(lp);
1015  if (state[curAppFd]._contentLength <= 0) {
1017  "XmlRpcServerConnection::readHeader: Invalid Content-length specified (%d).",
1018  state[curAppFd]._contentLength);
1019  return false;
1020  }
1021 
1023  3,
1024  "XmlRpcServerConnection::readHeader: specified content length is %d.",
1025  state[curAppFd]._contentLength);
1026 
1027  // Otherwise copy non-header data to request buffer and set state to read request.
1028  state[curAppFd]._request = bp;
1029 
1030  // Parse out any interesting bits from the header (HTTP version, connection)
1031  state[curAppFd]._keepAlive = true;
1032  if (state[curAppFd]._header.find("HTTP/1.0") != std::string::npos) {
1033  if (kp == 0 || strncasecmp(kp, "keep-alive", 10) != 0)
1034  state[curAppFd]._keepAlive = false; // Default for HTTP 1.0 is to close the connection
1035  } else {
1036  if (kp != 0 && strncasecmp(kp, "close", 5) == 0)
1037  state[curAppFd]._keepAlive = false;
1038  }
1039  XmlRpcUtil::log(3, "KeepAlive: %d", state[curAppFd]._keepAlive);
1040 
1041  state[curAppFd]._header = "";
1042  state[curAppFd]._connectionState = READ_REQUEST;
1043  return true; // Continue monitoring this source
1044 }
1045 
1046 bool XmlRpcInterface::readRequest(char *buf, uint32_t length)
1047 {
1048  // If we dont have the entire request yet, read available data
1049  if (int(state[curAppFd]._request.length()) < state[curAppFd]._contentLength) {
1050  bool eof = false;
1051 
1052  state[curAppFd]._request.append(std::string(buf, length));
1053 
1054  if (length <= 0) {
1055  XmlRpcUtil::error("XmlRpcServerConnection::readRequest: read error.");
1056  return false;
1057  }
1058 
1059  // If we haven't gotten the entire request yet, return (keep reading)
1060  if (int(state[curAppFd]._request.length()) < state[curAppFd]._contentLength) {
1061  if (eof) {
1062  XmlRpcUtil::error("XmlRpcServerConnection::readRequest: EOF while reading request");
1063  return false; // Either way we close the connection
1064  }
1065  return true;
1066  }
1067  }
1068 
1069  // Otherwise, parse and dispatch the request
1070  XmlRpcUtil::log(3, "XmlRpcServerConnection::readRequest read %d bytes.",
1071  state[curAppFd]._request.length());
1072  //XmlRpcUtil::log(5, "XmlRpcServerConnection::readRequest:\n%s\n", state[curAppFd]._request.c_str());
1073 
1074  state[curAppFd]._connectionState = WRITE_RESPONSE;
1075 
1076  return true; // Continue monitoring this source
1077 }
1078 
1080 {
1081  if (state[curAppFd]._response.length() == 0) {
1082  state[curAppFd]._response = executeRequest(state[curAppFd]._request);
1083  state[curAppFd]._bytesWritten = 0;
1084 
1085  if (state[curAppFd]._connectionState == EXECUTE_REQUEST)
1086  return true;
1087 
1088  if (state[curAppFd]._response.length() == 0) {
1089  XmlRpcUtil::error("XmlRpcServerConnection::writeResponse: empty response.");
1090  return false;
1091  }
1092  }
1093 
1094  // Try to write the response
1095  int curBytesWritten = scheduler->sendBytes(state[curAppFd]._response.c_str(),
1096  state[curAppFd]._response.length(),
1097  0, 0, true, curAppFd);
1098 
1099  if (curBytesWritten <= 0) {
1100  XmlRpcUtil::error("XmlRpcServerConnection::writeResponse: write error.");
1101  return false;
1102  } else {
1103  state[curAppFd]._bytesWritten += curBytesWritten;
1104  }
1105 
1106  XmlRpcUtil::log(3,
1107  "XmlRpcServerConnection::writeResponse: wrote %d of %d bytes.",
1108  state[curAppFd]._bytesWritten, state[curAppFd]._response.length());
1109 
1110  // Prepare to read the next request
1111  if (state[curAppFd]._bytesWritten == int(state[curAppFd]._response.length())) {
1112  state[curAppFd]._header = "";
1113  state[curAppFd]._request = "";
1114  state[curAppFd]._response = "";
1115  state[curAppFd]._connectionState = READ_HEADER;
1116  }
1117 
1118  return state[curAppFd]._keepAlive; // Continue monitoring this source if true
1119 }