24 #include <platdep/sockets.h>
25 #if not defined _WIN32
26 #include <arpa/inet.h>
29 #if not defined _WIN32 && not defined __APPLE__
30 #include <netinet/ip6.h>
40 using namespace XmlRpc;
55 (
dynamic_cast<XmlRpcInterface*
>(_server))->p2pnsRegister(params, result);
60 return std::string(
"Register a name with P2PNS");
75 (
dynamic_cast<XmlRpcInterface*
>(_server))->p2pnsResolve(params, result);
80 return std::string(
"Resolve a name with P2PNS");
100 return std::string(
"Lookup key in local "
121 return std::string(
"Lookup key with KBR layer");
141 return std::string(
"Store a value in the DHT");
161 return std::string(
"Get a value from the DHT");
181 return std::string(
"Dump all local records from the DHT");
196 (
dynamic_cast<XmlRpcInterface*
>(_server))->joinOverlay(params, result);
201 return std::string(
"Join the overlay with a specific nodeID");
207 if ((params.
size() != 5) ||
213 throw XmlRpcException(
"register(base64 name, int kind, int id, base64 address, int ttl): "
214 "Invalid argument type");
216 if (overlay->getCompModule(
TIER2_COMP) == NULL)
217 throw XmlRpcException(
"register(base64 name, int kind, int id, base64 address, int ttl): "
220 if (!isPrivileged()) {
221 throw XmlRpcException(
"register(base64 name, int kind, base64 address, "
222 "int ttl): Not allowed");
225 state[curAppFd]._connectionState = EXECUTE_REQUEST;
229 registerCall->
setKind((
int)params[1]);
230 registerCall->
setId((
int)params[2]);
232 registerCall->
setTtl(params[4]);
234 sendInternalRpcWithTimeout(
TIER2_COMP, registerCall);
239 if ((params.
size() != 2) ||
242 throw XmlRpcException(
"resolve(base64 name, int kind): Invalid argument type");
244 if (overlay->getCompModule(
TIER2_COMP) == NULL)
245 throw XmlRpcException(
"resolve(base64 name, int kind): No P2PNS service");
247 state[curAppFd]._connectionState = EXECUTE_REQUEST;
252 resolveCall->
setKind((
int)params[1]);
253 resolveCall->
setId(0);
255 sendInternalRpcWithTimeout(
TIER2_COMP, resolveCall);
261 if ((params.
size() != 3)
266 "boolean safe): Invalid argument type");
272 if (keyString.size() > 0) {
274 params[1], params[2]);
276 nextHops = overlay->local_lookup(overlay->getThisNode().getKey(),
277 params[1], params[2]);
280 for (uint32_t i=0; i < nextHops->size(); i++) {
281 result[i][0] = (*nextHops)[i].getIp().str();
282 result[i][1] = (*nextHops)[i].getPort();
283 result[i][2] = (*nextHops)[i].getKey().toString(16);
291 if (((params.
size() != 2)
294 && ((params.
size() != 3)
299 "int RoutingType)): Invalid argument type");
301 if ((
int)params[1] > overlay->getMaxNumSiblings())
303 "int RoutingType)): numSibling to big");
305 if (params.
size() == 3) {
314 "int RoutingType)): invalid routingType");
318 state[curAppFd]._connectionState = EXECUTE_REQUEST;
326 if (params.
size() == 3) {
335 if ((params.
size() != 1)
339 if (!isPrivileged()) {
352 if ((params.
size() != 4)
358 ", string application): Invalid argument type");
360 if (!isPrivileged()) {
362 ", string application): Not allowed");
365 if (overlay->getCompModule(
TIER1_COMP) == NULL)
367 ", string application): No DHT service");
369 state[curAppFd]._connectionState = EXECUTE_REQUEST;
377 dhtPutMsg->
setTtl(params[2]);
380 sendInternalRpcWithTimeout(
TIER1_COMP, dhtPutMsg);
385 if ((params.
size() != 4)
391 ", string application): Invalid argument type");
393 if (overlay->getCompModule(
TIER1_COMP) == NULL)
395 ", string application): No DHT service");
397 state[curAppFd]._connectionState = EXECUTE_REQUEST;
404 sendInternalRpcWithTimeout(
TIER1_COMP, dhtGetMsg);
409 if (params.
size() != 1) {
413 if (!isPrivileged()) {
417 if (overlay->getCompModule(
TIER1_COMP) == NULL)
420 state[curAppFd]._connectionState = EXECUTE_REQUEST;
431 return state[curAppFd].localhost;
443 packetNotification =
new cMessage(
"packetNotification");
445 limitAccess = par(
"limitAccess");
451 appTunFd = scheduler->getAppTunFd();
458 _lookup =
new Lookup(
this);
461 _put =
new Put(
this);
462 _get =
new Get(
this);
466 enableIntrospection(
true);
468 curAppFd = INVALID_SOCKET;
484 _removeContact = NULL;
487 _searchContact = NULL;
490 _pull_notification = NULL;
492 packetNotification = NULL;
506 delete _removeContact;
509 delete _searchContact;
512 delete _pull_notification;
514 cancelAndDelete(packetNotification);
519 if (state.count(curAppFd) && state[curAppFd].pendingRpc) {
520 cancelRpcMessage(state[curAppFd].pendingRpc);
523 state[curAppFd].appFd = INVALID_SOCKET;
524 state[curAppFd].localhost =
false;
525 state[curAppFd]._header =
"";
526 state[curAppFd]._request =
"";
527 state[curAppFd]._response =
"";
528 state[curAppFd]._connectionState = READ_HEADER;
529 state[curAppFd]._keepAlive =
true;
530 state[curAppFd].pendingRpc = 0;
535 scheduler->closeAppSocket(curAppFd);
536 resetConnectionState();
542 state[curAppFd].pendingRpc = sendInternalRpcCall(destComp, call, NULL,
550 if (msg==packetNotification) {
551 EV <<
"[XmlRpcInterface::handleMessage() @ " << overlay->getThisNode().getIp()
552 <<
" (" << overlay->getThisNode().getKey().toString(16) <<
")]\n"
553 <<
" Message from application. Queue length = " << packetBuffer.size()
555 while (packetBuffer.size() > 0) {
558 *(packetBuffer.begin());
559 packetBuffer.pop_front();
560 curAppFd = packet.
fd;
562 switch (packet.
func) {
564 handleAppTunPacket(packet.
data, packet.
length);
568 if (state.count(curAppFd) == 0) {
569 throw cRuntimeError(
"XmlRpcInterface::handleMessage(): "
571 "from unknown socket!");
574 handleRealworldPacket(packet.
data, packet.
length);
578 if (state.count(curAppFd)) {
579 throw cRuntimeError(
"XmlRpcInterface::handleMessage(): "
580 "Connection state table corrupt!");
583 resetConnectionState();
585 if (packet.
addr != NULL) {
586 if (((sockaddr_in*)packet.
addr)->sin_addr.s_addr
587 == inet_addr(
"127.0.0.1")) {
588 state[curAppFd].localhost =
true;
596 if (state.count(curAppFd) == 0) {
597 throw cRuntimeError(
"XmlRpcInterface::handleMessage(): "
598 "Trying to close unknown "
602 resetConnectionState();
603 state.erase(curAppFd);
607 delete[] packet.
data;
610 }
else if (msg->isSelfMessage()) {
613 if (rpcMessage!=NULL) {
614 internalHandleRpcMessage(rpcMessage);
622 if (rpcMessage!=NULL) {
623 internalHandleRpcMessage(rpcMessage);
628 if (commonAPIMsg != NULL)
629 handleCommonAPIPacket(commonAPIMsg);
632 if (readyMsg != NULL)
633 handleReadyMessage(readyMsg);
641 cPolymorphic* context,
int rpcId,
646 if (state.count(curAppFd) == 0)
649 std::cout <<
"XmlRpcInterface(): XML-RPC failed!" << endl;
650 state[curAppFd]._response = generateFaultResponse(
"XML-RPC timeout", 22);
651 state[curAppFd]._connectionState = WRITE_RESPONSE;
652 if (!writeResponse() ) {
663 if (appTunFd != INVALID_SOCKET) {
666 const OverlayKey& key = overlay->getThisNode().getKey();
669 throw cRuntimeError(
"XmlRpcInterface::handleReadyMessage(): "
670 "P2PNS needs at least 100 bit nodeIds!");
673 std::stringstream addr;
675 for (
int i = 0; i < 100/4; i++) {
676 if (((i + 3) % 4) == 0) {
683 std::string cmd =
"/sbin/ip addr add " + addr.str() +
"/28 dev tun0";
685 EV <<
"XmlRpcInterface::handleOverlayReady(): "
686 "Setting TUN interface address " << addr.str() << endl;
688 if (system(cmd.c_str()) != 0) {
689 EV <<
"XmlRpcInterface::handleOverlayReady(): "
690 "Failed to set TUN interface address!" << endl;
693 if (system(
"/sbin/ip link set tun0 up") != 0) {
694 EV <<
"XmlRpcInterface::handleOverlayReady(): "
695 "Failed to set TUN interface up!" << endl;
698 p2pns->registerId(addr.str());
704 #if not defined _WIN32 && not defined __APPLE__
705 EV <<
"XmlRpcInterface::handleAppTunPacket(): packet of "
706 <<
"length " << length << endl;
709 throw cRuntimeError(
"XmlRpcInterface::handleAppTunPacket(): "
710 "P2PNS module missing on tier2!");
714 throw cRuntimeError(
"XmlRpcInterface::handleAppTunPacket(): "
715 "P2PNS needs at least 100 bit nodeIds!");
719 EV <<
"XmlRpcInterface::handleAppTunPacket(): packet too "
720 <<
"short - discarding packet!" << endl;
724 ip6_hdr* ip_buf = (ip6_hdr*) buf;
725 if (((ip_buf->ip6_vfc & 0xf0) >> 4) != 6) {
726 EV <<
"XmlRpcInterface::handleAppTunPacket(): received packet "
727 "is no IPv6 - discarding packet!" << endl;
733 for (
int i = 1; i < 4; i++) {
734 destKey = (destKey << 32) +
OverlayKey(ntohl(ip_buf->ip6_dst.s6_addr32[i]));
744 #if not defined _WIN32 && not defined __APPLE__
745 Enter_Method_Silent();
747 if (payload.size() == 0) {
751 int curBytesWritten = scheduler->sendBytes(&payload[0],
753 0, 0,
true, appTunFd);
755 if (curBytesWritten <= 0) {
756 throw cRuntimeError(
"XmlRpcServerConnection::deliverTunneledMessage(): "
757 "Error writing to application TUN device.");
764 if (state[curAppFd]._connectionState == READ_HEADER) {
765 if (!readHeader(buf, length)) {
767 state[curAppFd]._header =
"";
768 state[curAppFd]._request =
"";
769 state[curAppFd]._response =
"";
770 state[curAppFd]._connectionState = READ_HEADER;
775 if (state[curAppFd]._connectionState == READ_REQUEST)
776 if (!readRequest(buf, length))
779 if (state[curAppFd]._connectionState == WRITE_RESPONSE)
780 if (!writeResponse() ) {
789 cPolymorphic* context,
795 if (state.count(curAppFd) == 0) {
801 if (state[curAppFd]._connectionState != EXECUTE_REQUEST)
break;
804 resultValue.
setSize(_LookupResponse->getSiblingsArraySize());
806 if (_LookupResponse->getIsValid() ==
true) {
807 for (uint32_t i=0; i < _LookupResponse->getSiblingsArraySize();
811 _LookupResponse->getSiblings(i).getIp().str();
813 _LookupResponse->getSiblings(i).getPort();
815 _LookupResponse->getSiblings(i).getKey().toString(16);
817 state[curAppFd]._response = generateResponse(resultValue.
toXml());
819 std::cout <<
"XmlRpcInterface(): lookup() failed!" << endl;
820 state[curAppFd]._response = generateFaultResponse(
"lookup() failed", 22);
823 state[curAppFd]._connectionState = WRITE_RESPONSE;
824 if (!writeResponse()) {
830 if (state[curAppFd]._connectionState != EXECUTE_REQUEST)
835 if (_P2pnsRegisterResponse->getIsSuccess() ==
true) {
837 state[curAppFd]._response = generateResponse(resultValue.
toXml());
839 std::cout <<
"XmlRpcInterface(): register() failed!" << endl;
840 state[curAppFd]._response = generateFaultResponse(
"register() failed", 22);
843 state[curAppFd]._connectionState = WRITE_RESPONSE;
844 if (!writeResponse() ) {
850 if (state[curAppFd]._connectionState != EXECUTE_REQUEST)
854 resultValue.
setSize(_P2pnsResolveResponse->getAddressArraySize());
856 if (_P2pnsResolveResponse->getIsSuccess() ==
true) {
857 for (uint i=0; i < _P2pnsResolveResponse->getAddressArraySize(); i++) {
859 BinaryValue& addr = _P2pnsResolveResponse->getAddress(i);
860 resultValue[i][0] =
XmlRpcValue(&addr[0], addr.size());
861 resultValue[i][1] = (int)_P2pnsResolveResponse->getKind(i);
862 resultValue[i][2] = (int)_P2pnsResolveResponse->getId(i);
864 state[curAppFd]._response = generateResponse(resultValue.
toXml());
866 std::cout <<
"XmlRpcInterface(): resolve() failed!" << endl;
867 state[curAppFd]._response = generateFaultResponse(
"resolve() failed: Name not found", 9);
870 state[curAppFd]._connectionState = WRITE_RESPONSE;
871 if (!writeResponse() ) {
877 if (state[curAppFd]._connectionState != EXECUTE_REQUEST)
882 if (_DHTputCAPIResponse->getIsSuccess() ==
true) {
884 state[curAppFd]._response = generateResponse(resultValue.
toXml());
886 std::cout <<
"XmlRpcInterface(): put() failed!" << endl;
887 state[curAppFd]._response = generateFaultResponse(
"put() failed", 22);
890 state[curAppFd]._connectionState = WRITE_RESPONSE;
891 if (!writeResponse() ) {
897 if (state[curAppFd]._connectionState != EXECUTE_REQUEST)
902 resultValue[0].
setSize(_DHTgetCAPIResponse->getResultArraySize());
904 if (_DHTgetCAPIResponse->getIsSuccess() ==
true) {
905 for (uint i=0; i < _DHTgetCAPIResponse->getResultArraySize(); i++) {
907 DhtDumpEntry& entry = _DHTgetCAPIResponse->getResult(i);
911 resultValue[1] = std::string();
913 state[curAppFd]._response = generateResponse(resultValue.
toXml());
915 std::cout <<
"XmlRpcInterface(): get() failed!" << endl;
916 state[curAppFd]._response = generateFaultResponse(
"get() failed", 22);
919 state[curAppFd]._connectionState = WRITE_RESPONSE;
920 if (!writeResponse() ) {
926 if (state[curAppFd]._connectionState != EXECUTE_REQUEST)
930 resultValue.
setSize(_DHTdumpResponse->getRecordArraySize());
932 for (uint32_t i=0; i < _DHTdumpResponse->getRecordArraySize();
936 _DHTdumpResponse->getRecord(i).getKey().toString(16);
938 &(*(_DHTdumpResponse->getRecord(i).getValue().begin())),
939 _DHTdumpResponse->getRecord(i).getValue().size());
941 _DHTdumpResponse->getRecord(i).getTtl();
944 state[curAppFd]._response = generateResponse(resultValue.
toXml());
946 state[curAppFd]._connectionState = WRITE_RESPONSE;
947 if (!writeResponse()) {
958 error(
"DHTXMLRealworldApp::handleCommonAPIPacket(): Unknown Packet!");
966 state[curAppFd]._header.append(std::string(buf, length));
970 if (state[curAppFd]._header.length() > 0)
972 "while reading header.");
976 XmlRpcUtil::log(4,
"XmlRpcServerConnection::readHeader: read %d bytes.",
977 state[curAppFd]._header.length());
978 char *hp = (
char*)state[curAppFd]._header.c_str();
979 char *ep = hp + state[curAppFd]._header.length();
984 for (
char *cp = hp; (bp == 0) && (cp < ep); ++cp) {
985 if ((ep - cp > 16) && (strncasecmp(cp,
"Content-length: ", 16) == 0))
987 else if ((ep - cp > 12) && (strncasecmp(cp,
"Connection: ", 12) == 0))
989 else if ((ep - cp > 4) && (strncmp(cp,
"\r\n\r\n", 4) == 0))
991 else if ((ep - cp > 2) && (strncmp(cp,
"\n\n", 2) == 0))
1000 if (state[curAppFd]._header.length() > 0)
1001 XmlRpcUtil::error(
"XmlRpcServerConnection::readHeader: EOF while reading header");
1010 XmlRpcUtil::error(
"XmlRpcServerConnection::readHeader: No Content-length specified");
1014 state[curAppFd]._contentLength = atoi(lp);
1015 if (state[curAppFd]._contentLength <= 0) {
1017 "XmlRpcServerConnection::readHeader: Invalid Content-length specified (%d).",
1018 state[curAppFd]._contentLength);
1024 "XmlRpcServerConnection::readHeader: specified content length is %d.",
1025 state[curAppFd]._contentLength);
1028 state[curAppFd]._request = bp;
1031 state[curAppFd]._keepAlive =
true;
1032 if (state[curAppFd]._header.find(
"HTTP/1.0") != std::string::npos) {
1033 if (kp == 0 || strncasecmp(kp,
"keep-alive", 10) != 0)
1034 state[curAppFd]._keepAlive =
false;
1036 if (kp != 0 && strncasecmp(kp,
"close", 5) == 0)
1037 state[curAppFd]._keepAlive =
false;
1041 state[curAppFd]._header =
"";
1042 state[curAppFd]._connectionState = READ_REQUEST;
1049 if (
int(state[curAppFd]._request.length()) < state[curAppFd]._contentLength) {
1052 state[curAppFd]._request.append(std::string(buf, length));
1060 if (
int(state[curAppFd]._request.length()) < state[curAppFd]._contentLength) {
1062 XmlRpcUtil::error(
"XmlRpcServerConnection::readRequest: EOF while reading request");
1070 XmlRpcUtil::log(3,
"XmlRpcServerConnection::readRequest read %d bytes.",
1071 state[curAppFd]._request.length());
1074 state[curAppFd]._connectionState = WRITE_RESPONSE;
1081 if (state[curAppFd]._response.length() == 0) {
1082 state[curAppFd]._response = executeRequest(state[curAppFd]._request);
1083 state[curAppFd]._bytesWritten = 0;
1085 if (state[curAppFd]._connectionState == EXECUTE_REQUEST)
1088 if (state[curAppFd]._response.length() == 0) {
1095 int curBytesWritten = scheduler->sendBytes(state[curAppFd]._response.c_str(),
1096 state[curAppFd]._response.length(),
1097 0, 0,
true, curAppFd);
1099 if (curBytesWritten <= 0) {
1103 state[curAppFd]._bytesWritten += curBytesWritten;
1107 "XmlRpcServerConnection::writeResponse: wrote %d of %d bytes.",
1108 state[curAppFd]._bytesWritten, state[curAppFd]._response.length());
1111 if (state[curAppFd]._bytesWritten ==
int(state[curAppFd]._response.length())) {
1112 state[curAppFd]._header =
"";
1113 state[curAppFd]._request =
"";
1114 state[curAppFd]._response =
"";
1115 state[curAppFd]._connectionState = READ_HEADER;
1118 return state[curAppFd]._keepAlive;