OverSim
GlobalTraceManager.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2006 Institut fuer Telematik, Universitaet Karlsruhe (TH)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 //
18 
24 #include <sys/types.h>
25 #include <sys/stat.h>
26 #include <fcntl.h>
27 #include <errno.h>
28 #ifndef _WIN32
29 #include <sys/mman.h>
30 #endif
31 
33 #include <GlobalNodeListAccess.h>
34 #include <GlobalTraceManager_m.h>
35 #include <TraceChurn.h>
36 
37 #include "GlobalTraceManager.h"
38 
40 
42 {
43  Enter_Method_Silent();
44 
45  // Nothing to do for us if there is no traceFile
46  if (strlen(par("traceFile")) == 0)
47  return;
48 
51 
52  offset = 0;
53 
54  // Open file and get size
55  fd = open(par("traceFile"), O_RDONLY);
56 
57  if (!fd) {
58  throw cRuntimeError(("Can't open file " + par("traceFile").stdstringValue() +
59  std::string(strerror(errno))).c_str());
60  }
61 
62  struct stat filestat;
63 
64  if (fstat(fd, &filestat)) {
65  throw cRuntimeError(("Error calling stat: " + std::string(strerror(errno))).c_str());
66  }
67 
68  filesize = filestat.st_size;
69  remain = filesize;
70  EV << "[GlobalTraceManager::initialize()]\n"
71  << " Successfully opened trace file " << par("traceFile").stdstringValue()
72  << ". Size: " << filesize
73  << endl;
74 
75  nextRead = new cMessage("NextRead");
76  scheduleAt(0, nextRead);
77 
78 }
79 
81 {
82  nextRead = NULL;
83 }
84 
86 {
87  cancelAndDelete(nextRead);
88 }
89 
91 {
92 #ifdef _WIN32
93  // TODO: port for MINGW
94  throw cRuntimeError("GlobalTraceManager::readNextBlock():"
95  "Not available on WIN32 yet!");
96 #else
97  double time = -1;
98  int nodeID;
99  char* bufend;
100  int line = 1;
101 
102  if (remain > 0) {
103  // If rest of the file is bigger than maximal chunk size, set chunksize
104  // to max; the last mapped page is used as margin.
105  // Else map the whole remainder of the file at a time, no margin is needed
106  // in this case
107  chunksize = remain < getpagesize()*readPages ? remain : getpagesize()*readPages;
108  marginsize = remain == chunksize ? 0 : getpagesize();
109 
110  start = (char*) mmap(0, chunksize, PROT_READ|PROT_WRITE, MAP_PRIVATE, fd, filesize - remain);
111 
112  if (start == MAP_FAILED) {
113  throw cRuntimeError(("Error mapping file to memory:" +
114  std::string(strerror(errno))).c_str());
115  }
116 
117  buf = start + offset;
118  // While the read pointer has not reached the margin, continue parsing
119  while( buf < start + chunksize - marginsize) { // FIXME: Assuming max line length of getpagesize()
120 
121  time = strtod(buf, &bufend);
122  if (bufend==buf) {
123  throw cRuntimeError("Error parsing file: Expected time as double");
124  }
125  buf = bufend;
126 
127  nodeID = strtol(buf, &bufend, 0);
128  if (bufend==buf) {
129  throw cRuntimeError("Error parsing file: Expected ID as long int");
130  }
131  buf = bufend;
132 
133  while( isspace(buf[0]) ) buf++;
134 
135  bufend = strchr(buf, '\n');
136  if (!bufend) {
137  throw cRuntimeError("Error parsing file: Missing command or no newline at end of line!");
138  }
139  bufend[0]='\0';
140  scheduleNextEvent(time, nodeID, buf, line++);
141  buf += strlen(buf)+1;
142 
143  while( isspace(buf[0]) ) buf++;
144  }
145 
146  // Compute offset for the next read
147  offset = (buf - start) - (chunksize - marginsize);
149  munmap( start, chunksize );
150  }
151 
152  if (time > 0) {
153  scheduleAt(time, nextRead);
154  } else {
155  // TODO: Schedule simulation end?
156  }
157 #endif
158 }
159 
160 void GlobalTraceManager::scheduleNextEvent(double time, int nodeId, char* buf, int line)
161 {
163 
164  msg->setInternalNodeId(nodeId);
165  msg->setLineNumber(line);
166  scheduleAt(time, msg);
167 }
168 
170 {
172  ->createNode(nodeId);
173 }
174 
176 {
177  // TODO: calculate proper deletion time with *.underlayConfigurator.gracefulLeaveDelay
179  ->deleteNode(nodeId);
180 }
181 
183 {
184  if (!msg->isSelfMessage()) {
185  delete msg;
186  return;
187  } else if ( msg == nextRead ) {
188  readNextBlock();
189  return;
190  }
191 
192  GlobalTraceManagerMessage* traceMsg = check_and_cast<GlobalTraceManagerMessage*>(msg);
193 
194  if (strstr(traceMsg->getName(), "JOIN") == traceMsg->getName()) {
195  createNode(traceMsg->getInternalNodeId());
196  } else if (strstr(traceMsg->getName(), "LEAVE") == traceMsg->getName()) {
197  deleteNode(traceMsg->getInternalNodeId());
198  } else if (strstr(traceMsg->getName(), "CONNECT_NODETYPES") == traceMsg->getName()) {
199  std::vector<std::string> strVec = cStringTokenizer(msg->getName()).asVector();
200 
201  if (strVec.size() != 3) {
202  throw cRuntimeError("GlobalTraceManager::"
203  "handleMessage(): Invalid command");
204  }
205 
206  int firstNodeType = atoi(strVec[1].c_str());
207  int secondNodeType = atoi(strVec[2].c_str());
208 
209  globalNodeList->connectNodeTypes(firstNodeType, secondNodeType);
210  } else if (strstr(traceMsg->getName(), "DISCONNECT_NODETYPES") == traceMsg->getName()) {
211  std::vector<std::string> strVec = cStringTokenizer(msg->getName()).asVector();
212 
213  if (strVec.size() != 3) {
214  throw cRuntimeError("GlobalTraceManager::"
215  "handleMessage(): Invalid command");
216  }
217 
218  int firstNodeType = atoi(strVec[1].c_str());
219  int secondNodeType = atoi(strVec[2].c_str());
220 
221  globalNodeList->disconnectNodeTypes(firstNodeType, secondNodeType);
222  } else if (strstr(traceMsg->getName(), "MERGE_BOOTSTRAPNODES") == traceMsg->getName()) {
223  std::vector<std::string> strVec = cStringTokenizer(msg->getName()).asVector();
224 
225  if (strVec.size() != 4) {
226  throw cRuntimeError("GlobalTraceManager::"
227  "handleMessage(): Invalid command");
228  }
229 
230  int toPartition = atoi(strVec[1].c_str());
231  int fromPartition = atoi(strVec[2].c_str());
232  int numNodes = atoi(strVec[3].c_str());
233 
234  globalNodeList->mergeBootstrapNodes(toPartition, fromPartition, numNodes);
235  } else {
236  sendDirect(msg, getAppGateById(traceMsg->getInternalNodeId()));
237  return; // don't delete Message
238  }
239 
240  delete msg;
241 }
242 
244  return check_and_cast<TraceChurn*>(underlayConfigurator->getChurnGenerator(0))
245  ->getAppGateById(nodeId);
246 }
247