24 #include <IPAddressResolver.h>
25 #include <NotificationBoard.h>
26 #include <UDPAppBase.h>
27 #include <UDPSocket.h>
43 notificationBoard = NULL;
60 CompType compType = getThisCompType();
73 debugOutput = par(
"debugOutput");
78 notificationBoard = NotificationBoardAccess().get();
81 notificationBoard->subscribe(
this, NF_OVERLAY_TRANSPORTADDRESS_CHANGED);
82 notificationBoard->subscribe(
this, NF_OVERLAY_NODE_LEAVE);
83 notificationBoard->subscribe(
this, NF_OVERLAY_NODE_GRACEFUL_LEAVE);
86 if (getParentModule()->getSubmodule(
"interfaceTable", 0) != NULL) {
87 thisNode.setIp(IPAddressResolver()
88 .addressOf(getParentModule()));
90 thisNode.setIp(IPAddressResolver()
91 .addressOf(getParentModule()->getParentModule()));
100 numOverlayReceived = 0;
101 bytesOverlaySent = 0;
102 bytesOverlayReceived = 0;
106 bytesUdpReceived = 0;
108 creationTime = simTime();
110 WATCH(numOverlaySent);
111 WATCH(numOverlayReceived);
112 WATCH(bytesOverlaySent);
113 WATCH(bytesOverlayReceived);
115 WATCH(numUdpReceived);
117 WATCH(bytesUdpReceived);
123 setTcpOut(gate(
"tcpOut"));
128 initializeApp(stage);
139 if (internalHandleMessage(msg)) {
143 if (msg->arrivedOn(
"from_lowerTier") ||
144 msg->arrivedOn(
"direct_in")) {
146 if (readyMsg != NULL) {
147 handleReadyMessage(readyMsg);
152 if (commonAPIMsg != NULL) {
153 handleCommonAPIMessage(commonAPIMsg);
154 }
else if (msg->arrivedOn(
"from_lowerTier")) {
156 cPacket* packet = check_and_cast<cPacket*>(msg);
158 bytesOverlayReceived += packet->getByteLength());
159 handleLowerMessage(msg);
162 }
else if (msg->arrivedOn(
"from_upperTier")) {
163 handleUpperMessage(msg);
164 }
else if (msg->arrivedOn(
"udpIn")) {
165 cPacket* packet = check_and_cast<cPacket*>(msg);
166 RECORD_STATS(numUdpReceived++; bytesUdpReceived += packet->getByteLength());
168 if (debugOutput && !ev.isDisabled()) {
169 UDPControlInfo* udpControlInfo =
170 check_and_cast<UDPControlInfo*>(msg->getControlInfo());
171 EV <<
"[BaseApp:handleMessage() @ " << thisNode.getIp()
172 <<
" (" << overlay->getThisNode().getKey().toString(16) <<
")]\n"
173 <<
" Received " << *msg <<
" from "
174 << udpControlInfo->getSrcAddr() << endl;
176 handleUDPMessage(msg);
177 }
else if(msg->arrivedOn(
"tcpIn")) {
178 handleTCPMessage(msg);
179 }
else if (msg->arrivedOn(
"trace_in")) {
180 handleTraceMessage(msg);
188 std::string name(this->getName());
190 if (name == std::string(
"tier1")) {
192 }
else if (name == std::string(
"tier2")) {
194 }
else if (name == std::string(
"tier3")) {
198 std::string parentName(this->getParentModule()->getName());
200 if (parentName == std::string(
"tier1")) {
202 }
else if (parentName == std::string(
"tier2")) {
204 }
else if (parentName == std::string(
"tier3")) {
207 throw cRuntimeError(
"BaseApp::getThisCompType(): "
208 "Unknown module type!");
216 Enter_Method_Silent();
217 if (category == NF_OVERLAY_TRANSPORTADDRESS_CHANGED) {
218 handleTransportAddressChangedNotification();
219 }
else if (category == NF_OVERLAY_NODE_LEAVE) {
220 handleNodeLeaveNotification();
221 }
else if (category == NF_OVERLAY_NODE_GRACEFUL_LEAVE) {
222 handleNodeGracefulLeaveNotification();
242 const std::vector<TransportAddress>& sourceRoute,
249 if (!(sourceRoute.size() == 1 && sourceRoute[0].isUnspecified())) {
251 for (uint32_t i = 0; i < sourceRoute.size(); ++i) {
255 routeMsg->encapsulate(msg);
262 sendDirect(routeMsg, overlay->getCompRpcGate(
OVERLAY_COMP));
265 if (debugOutput && !ev.isDisabled()) {
266 EV <<
"[BaseApp::callRoute() @ " << thisNode.getIp()
267 <<
" (" << overlay->getThisNode().getKey().toString(16) <<
")]\n"
268 <<
" Sending " << *msg
269 <<
" to destination key " << key
270 <<
" with source route ";
272 for (uint32_t i = 0; i < sourceRoute.size(); ++i) {
273 EV << sourceRoute[i] <<
" ";
280 RECORD_STATS(numOverlaySent++; bytesOverlaySent += msg->getByteLength());
303 forwardMsg->setDestKey(key);
304 forwardMsg->setNextHopNode(nextHopNode);
305 forwardMsg->setControlInfo(ctrlInfo);
306 forwardMsg->encapsulate(msg);
311 send(forwardMsg,
"to_lowerTier");
313 sendDirect(forwardMsg, overlay->getCompRpcGate(
OVERLAY_COMP));
319 EV <<
"[BaseApp::update() @ " << thisNode.getIp()
320 <<
" (" << overlay->getThisNode().getKey().toString(16) <<
")]\n"
321 <<
" " << node << (joined ?
" joined " :
" left ") <<
"siblings"
334 cPacket* tempMsg = commonAPIMsg->decapsulate();
340 if (overlayCtrlInfo != NULL) {
341 tempMsg->setControlInfo(overlayCtrlInfo);
344 switch (commonAPIMsg->
getType()) {
349 NodeHandle nextHopNode = overlay->getThisNode();
352 forward(&key, &tempMsg, &nextHopNode);
354 if(tempMsg != NULL) {
358 && nextHopNode != overlay->getThisNode())) {
359 forwardResponse(key, tempMsg, nextHopNode);
363 bytesOverlayReceived += tempMsg->getByteLength());
370 EV <<
"[BaseApp:handleCommonAPIMessage() @ "
371 << thisNode.getIp() <<
" ("
372 << overlay->getThisNode().getKey().toString(16) <<
")]\n"
373 <<
" Received " << *tempMsg <<
" from "
380 if (rpcMessage!=NULL) {
381 internalHandleRpcMessage(rpcMessage);
396 forward(&key, &tempMsg, &nextHopNode);
399 if(tempMsg != NULL) {
403 forwardResponse(key, tempMsg, nextHopNode);
447 throw cRuntimeError(
"This application cannot handle trace data. "
448 "You have to overwrite handleTraceMessage() in your "
449 "application to make trace files work");
454 RECORD_STATS(numOverlaySent++; bytesOverlaySent += msg->getByteLength());
456 send(msg,
"to_lowerTier");
462 msg->
setComp(getThisCompType());
465 overlay->sendMessageToAllComp(msg, getThisCompType());
471 simtime_t time = globalStatistics->calcMeasuredLifetime(creationTime);
473 string baseAppName = string(
"BaseApp (") += string(this->getName())
477 globalStatistics->addStdDev(baseAppName +
string(
"Sent Messages/s to "
479 numOverlaySent / time);
480 globalStatistics->addStdDev(baseAppName +
481 string(
"Received Messages/s from Overlay"),
482 numOverlayReceived / time);
483 globalStatistics->addStdDev(baseAppName +
string(
"Sent Bytes/s to "
485 bytesOverlaySent / time);
486 globalStatistics->addStdDev(baseAppName +
string(
"Received Bytes/s "
488 bytesOverlayReceived / time);
489 globalStatistics->addStdDev(baseAppName +
string(
"Sent Messages/s to "
492 globalStatistics->addStdDev(baseAppName +
493 string(
"Received Messages/s from UDP"),
494 numUdpReceived / time);
495 globalStatistics->addStdDev(baseAppName +
string(
"Sent Bytes/s to UDP"),
496 bytesUdpSent / time);
497 globalStatistics->addStdDev(baseAppName +
string(
"Received Bytes/s "
499 bytesUdpReceived / time);
513 EV <<
"[BaseApp::bindToPort() @ " << thisNode.getIp()
514 <<
": Binding to UDP port " << port << endl;
516 thisNode.setPort(port);
518 cMessage *msg =
new cMessage(
"UDP_C_BIND", UDP_C_BIND);
519 UDPControlInfo *ctrl =
new UDPControlInfo();
520 ctrl->setSrcPort(port);
521 ctrl->setSockId(UDPSocket::generateSocketId());
522 msg->setControlInfo(ctrl);
530 msg->removeControlInfo();
531 msg->setKind(UDP_C_DATA);
533 UDPControlInfo *ctrl =
new UDPControlInfo();
534 ctrl->setSrcPort(thisNode.getPort());
535 ctrl->setSrcAddr(thisNode.getIp());
536 ctrl->setDestAddr(destAddr.
getIp());
537 ctrl->setDestPort(destAddr.
getPort());
538 msg->setControlInfo(ctrl);
547 EV <<
"[BaseApp::sendMessageToUDP() @ " << thisNode.getIp()
548 <<
" (" << overlay->getThisNode().getKey().toString(16) <<
")]\n"
549 <<
" Sending " << *msg <<
" to " << destAddr.
getIp()
554 RECORD_STATS(numUdpSent++; bytesUdpSent += msg->getByteLength());
555 sendDelayed(msg, delay,
"udpOut");
566 cPolymorphic* context,
567 int rpcId, simtime_t rtt)
575 const std::vector<TransportAddress>&
578 callRoute(destKey, message, sourceRoute, routingType);
595 if (overlayCtrlInfo &&
610 UDPControlInfo* udpCtrlInfo =
611 check_and_cast<UDPControlInfo*>(call->getControlInfo());
613 tempNode =
TransportAddress(udpCtrlInfo->getSrcAddr(), udpCtrlInfo->getSrcPort());
614 destNode = &tempNode;
618 sendRpcResponse(transportType, compType,
619 *destNode, *destKey, call, response);