OverSim
MessageObserver.cc
Go to the documentation of this file.
1 //
2 // This program is free software; you can redistribute it and/or
3 // modify it under the terms of the GNU General Public License
4 // as published by the Free Software Foundation; either version 2
5 // of the License, or (at your option) any later version.
6 //
7 // This program is distributed in the hope that it will be useful,
8 // but WITHOUT ANY WARRANTY; without even the implied warranty of
9 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 // GNU General Public License for more details.
11 //
12 // You should have received a copy of the GNU General Public License
13 // along with this program; if not, write to the Free Software
14 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
15 //
16 
22 #include <assert.h>
23 #include <sstream>
24 #include <omnetpp.h>
25 #include <GlobalStatisticsAccess.h>
26 #include "MessageObserver.h"
27 #include "ALMTestTracedMessage_m.h"
28 
30 
32  gcTimer = new cMessage("garbage_collection");
33  gcInterval = 1.0;
34  cacheMaxAge = 10.0;
35  numLooped = 0;
36 }
37 
39  cancelAndDelete(gcTimer);
40 }
41 
43  WATCH_MAP(groups);
44  WATCH_MAP(joinedAt);
45  WATCH_MAP(receivedAt);
46  WATCH(numLooped);
47 
48  numLooped = 0;
49  gcInterval = par("gcInterval");
50  cacheMaxAge = par("cacheMaxAge");
51 
52  if (gcInterval > 0.0)
53  scheduleAt(OPP::simTime() + gcInterval, gcTimer);
54 
55  creationTime = OPP::simTime();
57 }
58 
60  uint64_t totalSent = 0;
61  uint64_t totalReceived = 0;
62  for (std::map<OverlayKey, MulticastGroup>::iterator i = groups.begin(); i != groups.end(); ++i) {
63  std::stringstream message;
64  message << "MessageObserver: Group " << i->first;
65  std::string name;
66 
67  name = message.str() + " Sent Messages";
68  recordScalar(name.c_str(), (double)i->second.sent);
69 
70  name = message.str() + " Received Messages";
71  recordScalar(name.c_str(), (double)i->second.received);
72 
73  name = message.str() + " Delivered Percentage";
74  recordScalar(name.c_str(), ((double)i->second.received * 100.0) / (double)i->second.sent);
75 
76  totalSent += i->second.sent;
77  totalReceived += i->second.received;
78  }
79 
80  recordScalar("MessageObserver: Total Sent Messages", (double)totalSent);
81  recordScalar("MessageObserver: Total Received Messages", (double)totalReceived);
82  recordScalar("MessageObserver: Total Delivered Percentage", ((double)totalReceived * 100.0) / (double)totalSent);
83 
85  if ( time >= GlobalStatistics::MIN_MEASURED ) {
86  globalStatistics->addStdDev("MessageObserver: Looped messages/s", (double)numLooped / time);
87  }
88 }
89 
90 void MessageObserver::handleMessage(cMessage* msg) {
91  if (msg == gcTimer) {
92  simtime_t now = OPP::simTime();
93  std::map<NodeMessagePair, simtime_t>::iterator i, iPrev;
94  i = receivedAt.begin();
95  while (i != receivedAt.end()) {
96  if (now - i->second >= cacheMaxAge) {
97  iPrev = i;
98  ++i;
99  receivedAt.erase(iPrev);
100  }
101  else {
102  ++i;
103  }
104  }
105  scheduleAt(OPP::simTime() + gcInterval, gcTimer);
106  }
107 }
108 
112 void MessageObserver::joinedGroup(int moduleId, OverlayKey groupId) {
113  groups[groupId].size += 1;
114  joinedAt[NodeGroupPair(moduleId, groupId)] = OPP::simTime();
115 }
116 
120 void MessageObserver::leftGroup(int moduleId, OverlayKey groupId) {
121  std::map<OverlayKey, MulticastGroup>::iterator iter = groups.find(groupId);
122  if (iter == groups.end()) {
123  EV << "Warning: MessageObserver asked to remove node from nonexistent group " << groupId.toString();
124  }
125  else if (iter->second.size == 0) {
126  EV << "Warning: MessageObserver asked to remove node from empty group " << groupId.toString();
127  }
128  else {
129  iter->second.size -= 1;
130  }
131  joinedAt.erase(NodeGroupPair(moduleId, groupId));
132 }
133 
139  if (msg == NULL) {
140  error("%s called with null message.", __PRETTY_FUNCTION__);
141  }
142 
143  std::map<OverlayKey, MulticastGroup>::iterator iter;
144  iter = groups.find(msg->getGroupId());
145  if (iter == groups.end()) {
146  EV << "Warning: MessageObserver notified of sent message for nonexistent group " << msg->getGroupId().toString();
147  }
148  else if (iter->second.size == 0) {
149  EV << "Warning: MessageObserver notified of sent message for empty group " << msg->getGroupId().toString();
150  }
151  else {
152  iter->second.sent += iter->second.size - 1;
153  }
154 }
155 
160  if (msg == NULL) {
161  error("%s called with null message.", __PRETTY_FUNCTION__);
162  }
163 
164  std::map<OverlayKey, MulticastGroup>::iterator iGroup;
165  iGroup = groups.find(msg->getGroupId());
166  if (iGroup == groups.end()) {
167  EV << "Warning: MessageObserver notified of received message for nonexistent group " << msg->getGroupId().toString();
168  }
169  else if (iGroup->second.size == 0) {
170  EV << "Warning: MessageObserver notified of received message for empty group " << msg->getGroupId().toString();
171  }
172  else if (msg->getSenderId() != msg->getReceiverId()) {
173 
174  // Only count if message was received after joining
175  std::map<NodeGroupPair, simtime_t>::iterator iJoinInfo;
176  iJoinInfo = joinedAt.find(NodeGroupPair(msg->getReceiverId(), msg->getGroupId()));
177 
178  if (iJoinInfo != joinedAt.end() && iJoinInfo->second < msg->getTimestamp()) {
179 
180  // Check if this message has not already been received
182  if (receivedAt.find(nmp) == receivedAt.end()) {
183  iGroup->second.received += 1;
184  receivedAt[nmp] = msg->getTimestamp();
185  }
186  }
187  }
188  else {
190  }
191 }
192 
196 void MessageObserver::nodeDead(int moduleId) {
197  // For each group, if node has joined group, decrease group member count by one
198  // and clear joined info.
199  for (std::map<OverlayKey, MulticastGroup>::iterator ig = groups.begin(); ig != groups.end(); ++ig) {
200  NodeGroupPair ngp = NodeGroupPair(moduleId, ig->first);
201  if (joinedAt.find(ngp) != joinedAt.end()) {
202  ig->second.size--;
203  joinedAt.erase(ngp);
204  }
205  }
206 }
207 
208 std::ostream& operator<< (std::ostream& os, MessageObserver::MulticastGroup const & mg) {
209  return os << "Nodes: " << mg.size << "; Messages Sent: " << mg.sent
210  << ", Received: " << mg.received << ", Dropped: " << (mg.sent - mg.received);
211 }
212 
213 std::ostream& operator<< (std::ostream& os, MessageObserver::NodeGroupPair const & ngp) {
214  cModule* module = OPP::cSimulation::getActiveSimulation()->getModule(ngp.first);
215  return os << "(" << (module != NULL ? module->getFullPath() : "Deleted node")
216  << ", " << ngp.second << ")";
217 }
218