OverSim
realtimescheduler.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2006 Institut fuer Telematik, Universitaet Karlsruhe (TH)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 //
18 
24 #include "realtimescheduler.h"
25 
26 Register_PerRunConfigOption(CFGID_EXTERNALAPP_CONNECTION_LIMIT, "externalapp-connection-limit", CFG_INT, NULL, "TODO some documentation");
27 Register_PerRunConfigOption(CFGID_EXTERNALAPP_APP_PORT, "externalapp-app-port", CFG_INT, NULL, "TODO some documentation");
28 
29 inline std::ostream& operator<<(std::ostream& os, const timeval& tv)
30 {
31  return os << (unsigned long)tv.tv_sec << "s" << tv.tv_usec << "us";
32 }
33 
35 {
36  FD_ZERO(&all_fds);
37  maxfd = 0;
38  netw_fd = INVALID_SOCKET;
39  additional_fd = INVALID_SOCKET;
40  apptun_fd = INVALID_SOCKET;
41 }
42 
44 { }
45 
47 {
48  if (initsocketlibonce()!=0)
49  throw cRuntimeError("RealtimeScheduler: Cannot initialize socket library");
50 
51  gettimeofday(&baseTime, NULL);
52 
53  appModule = NULL;
54  appNotificationMsg = NULL;
55  module = NULL;
56  notificationMsg = NULL;
57 
58  appConnectionLimit = ev.getConfig()->getAsInt(CFGID_EXTERNALAPP_CONNECTION_LIMIT, 0);
59 
60  if (initializeNetwork()) {
61  opp_error("realtimeScheduler error: initializeNetwork failed\n");
62  }
63 }
64 
66 {}
67 
69 {
70  gettimeofday(&baseTime, NULL);
71  baseTime = timeval_substract(baseTime, SIMTIME_DBL(simTime()));
72 }
73 
74 void RealtimeScheduler::setInterfaceModule(cModule *mod, cMessage *notifMsg,
75  PacketBuffer* buffer, int mtu,
76  bool isApp)
77 {
78  if (!mod || !notifMsg || !buffer) {
79  throw cRuntimeError("RealtimeScheduler: setInterfaceModule(): "
80  "arguments must be non-NULL");
81  }
82 
83  if (!isApp) {
84  if (module) {
85  throw cRuntimeError("RealtimeScheduler: setInterfaceModule() "
86  "already called");
87  }
88  module = mod;
89  notificationMsg = notifMsg;
90  packetBuffer = buffer;
91  buffersize = mtu;
92  } else {
93  if (appModule) {
94  throw cRuntimeError("RealtimeScheduler: setInterfaceModule() "
95  "already called");
96  }
97  appModule = mod;
98  appNotificationMsg = notifMsg;
99  appPacketBuffer = buffer;
100  appBuffersize = mtu;
101  }
102 }
103 
104 void RealtimeScheduler::registerSocket(SOCKET fd, cModule *mod,
105  cMessage *notifMsg, PacketBuffer* buffer,
106  int mtu)
107 {
108  if (socketContextMap.find(fd) != socketContextMap.end()) {
109  throw cRuntimeError("RealtimeScheduler::registerSocket(): Socket"
110  "already registered!");
111  }
112 
113  socketContextMap[fd] = SocketContext(mod, notifMsg, buffer, mtu);
114 
115  FD_SET(fd, &all_fds);
116  if (fd > maxfd) {
117  maxfd = fd;
118  }
119 }
120 
122 {
123  bool newEvent = false;
124  // prepare sets for select()
125  fd_set readFD;
126  readFD = all_fds;
127 
128  timeval timeout;
129  timeout.tv_sec = 0;
130  timeout.tv_usec = usec;
131 
132  if (select(FD_SETSIZE, &readFD, NULL, NULL, &timeout) > 0) {
133  // Read on all sockets with data
134  for (SOCKET fd = 0; fd <= maxfd; fd++) {
135  if (FD_ISSET(fd, &readFD)) {
136  // Incoming data on netw_fd
137  if (fd == netw_fd) {
138  char* buf = new char[buffersize];
139  int nBytes;
140 
141  // FIXME: Ugly. But we want to support IPv4 and IPv6 here, so we
142  // reserve enough space for the "bigger" address.
143  sockaddr* from = (sockaddr*) new sockaddr_in;
144  socklen_t addrlen = sizeof(sockaddr_in);
145 
146  // FIXME: Ugly...
147  getsockname(netw_fd, from, &addrlen);
148  if ( from->sa_family != SOCK_DGRAM ) {
149  delete from;
150  from = 0;
151  addrlen = 0;
152  // use read() for TUN device
153  nBytes = read(netw_fd, buf, buffersize);
154  } else {
155  addrlen = sizeof(sockaddr_in);
156  nBytes = recvfrom(netw_fd, buf, buffersize, 0, from, &addrlen);
157  }
158 
159  if (nBytes < 0) {
160  ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
161  << " Error reading from network: " << strerror(sock_errno())
162  << endl;
163  delete[] buf;
164  buf = NULL;
165  opp_error("Read from network device returned an error");
166  } else if (nBytes == 0) {
167  ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
168  << " Received 0 byte long UDP packet!" << endl;
169  delete[] buf;
170  buf = NULL;
171  } else {
172  // write data to buffer
173  ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
174  << " Received " << nBytes << " bytes"
175  << endl;
176  packetBuffer->push_back(PacketBufferEntry(buf, nBytes, from, addrlen));
177  // schedule notificationMsg for the interface module
179  newEvent = true;
180  }
181  } else if ( fd == apptun_fd ) {
182  // Data on application TUN FD
183  char* buf = new char[appBuffersize];
184  // use read() for TUN device
185  int nBytes = read(fd, buf, appBuffersize);
186 
187  if (nBytes < 0) {
188  ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
189  << " Error reading from application TUN socket: "
190  << strerror(sock_errno())
191  << endl;
192  delete[] buf;
193  buf = NULL;
194  opp_error("Read from application TUN socket returned "
195  "an error");
196  } else if (nBytes == 0) {
197  ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
198  << " Received 0 byte long UDP packet!" << endl;
199  delete[] buf;
200  buf = NULL;
201  } else {
202  // write data to buffer
203  ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
204  << " Received " << nBytes << " bytes"
205  << endl;
206 
207  appPacketBuffer->push_back(PacketBufferEntry(buf,
209 
210  // schedule notificationMsg for the interface module
212  newEvent = true;
213  }
214  } else if ( fd == additional_fd ) {
215  // Data on additional FD
216  additionalFD();
217  newEvent = true;
218  } else if (socketContextMap.find(fd) != socketContextMap.end()){
219  // Data on socket in socketContextMap
220  SocketContext& fdContext = socketContextMap.at(fd);
221  char* buf = new char[fdContext.mtu];
222 
223  int error;
224  socklen_t size = sizeof(error);
225  if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (char*)&error, &size) < 0) {
226  perror("getsockopt()");
227  }
228 
229  int nBytes = recv(fd, buf, fdContext.mtu, 0);
230 #if 0
231  if (nBytes < 0) {
232  delete[] buf;
233  buf = NULL;
234  ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
235  << " Read error from socket in socketContextMap: "
236  << strerror(sock_errno()) << endl;
237  // opp_error("Read from network device returned an error (App)");
238  } else if (nBytes == 0) {
239  // Application closed Socket
240  ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
241  << " Application closed socket"
242  << endl;
243  delete[] buf;
244  buf = NULL;
245  closeAppSocket(fd);
246  newEvent = true;
247  } else {
248 #endif
249  // write data to buffer
250  ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
251  << " Received " << nBytes << " bytes"
252  << endl;
253 
254  fdContext.buffer->push_back(PacketBufferEntry(
255  buf, nBytes, PacketBufferEntry::PACKET_DATA, fd));
256 
257  // schedule notificationMsg for the interface module
258  sendNotificationMsg(fdContext.notifMsg, fdContext.mod);
259  newEvent = true;
260 #if 0
261  }
262 #endif
263  } else {
264  // Data on app FD
265  char* buf = new char[appBuffersize];
266  int nBytes = recv(fd, buf, appBuffersize, 0);
267  if (nBytes < 0) {
268  delete[] buf;
269  buf = NULL;
270  ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
271  << " Read error from application socket: "
272  << strerror(sock_errno()) << endl;
273  // opp_error("Read from network device returned an error (App)");
274  } else if (nBytes == 0) {
275  // Application closed Socket
276  ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
277  << " Application closed socket"
278  << endl;
279  delete[] buf;
280  buf = NULL;
281  closeAppSocket(fd);
282  newEvent = true;
283  } else {
284  // write data to buffer
285  ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
286  << " Received " << nBytes << " bytes"
287  << endl;
289  // schedule notificationMsg for the interface module
291  newEvent = true;
292  }
293  }
294  }
295  }
296  }
297  return newEvent;
298 }
299 
300 int RealtimeScheduler::receiveUntil(const timeval& targetTime)
301 {
302  // if there's more than 200ms to wait, wait in 100ms chunks
303  // in order to keep UI responsiveness by invoking ev.idle()
304  timeval curTime;
305  gettimeofday(&curTime, NULL);
306  while (targetTime.tv_sec-curTime.tv_sec >=2 ||
307  timeval_diff_usec(targetTime, curTime) >= 200000) {
308  if (receiveWithTimeout(100000)) { // 100ms
309  if (ev.idle()) return -1;
310  return 1;
311  }
312  if (ev.idle()) return -1;
313  gettimeofday(&curTime, NULL);
314  }
315 
316  // difference is now at most 100ms, do it at once
317  long usec = timeval_diff_usec(targetTime, curTime);
318  if (usec>0)
319  if (receiveWithTimeout(usec)) {
320  if (ev.idle()) return -1;
321  return 1;
322  }
323  if (ev.idle()) return -1;
324  return 0;
325 }
326 
328 {
329  // assert that we've been configured
330  if (!module)
331  throw cRuntimeError("RealtimeScheduler: setInterfaceModule() not called: it must be called from a module's initialize() function");
332  // FIXME: reimplement sanity check
333 // if (app_fd >= 0 && !appModule)
334 // throw cRuntimeError("RealtimeScheduler: setInterfaceModule() not called from application: it must be called from a module's initialize() function");
335 
336  // calculate target time
337  timeval targetTime;
338  cMessage *msg = sim->msgQueue.peekFirst();
339  if (!msg) {
340  // if there are no events, wait until something comes from outside
341  // TBD: obey simtimelimit, cpu-time-limit
342  targetTime.tv_sec = LONG_MAX;
343  targetTime.tv_usec = 0;
344  } else {
345  // use time of next event
346  simtime_t eventSimtime = msg->getArrivalTime();
347  targetTime = timeval_add(baseTime, SIMTIME_DBL(eventSimtime));
348  }
349 
350  // if needed, wait until that time arrives
351  timeval curTime;
352  gettimeofday(&curTime, NULL);
353  if (timeval_greater(targetTime, curTime)) {
354  int status = receiveUntil(targetTime);
355  if (status == -1) {
356  printf("WARNING: receiveUntil returned -1 (user interrupt)\n");
357  return NULL; // interrupted by user
358  } else if (status == 1) {
359  msg = sim->msgQueue.peekFirst(); // received something
360  }
361  } else {
362  // printf("WARNING: Lagging behind realtime!\n");
363  // we're behind -- customized versions of this class may
364  // alert if we're too much behind, whatever that means
365  }
366  // ok, return the message
367  return msg;
368 }
369 
371 {
372 #ifdef _WIN32
373  closesocket(fd);
374 #else
375  close(fd);
376 #endif
377  FD_CLR(fd, &all_fds);
378 
379  std::map<SOCKET, SocketContext>::iterator it = socketContextMap.find(fd);
380  if (it != socketContextMap.end()) {
381  it->second.buffer->push_back(PacketBufferEntry(
383  sendNotificationMsg(it->second.notifMsg, it->second.mod);
384  socketContextMap.erase(fd);
385  } else {
388  }
389 }
390 
391 void RealtimeScheduler::sendNotificationMsg(cMessage* msg, cModule* mod)
392 {
393  if (msg->isScheduled()) return; // Notification already scheduled
394  timeval curTime;
395  gettimeofday(&curTime, NULL);
396  curTime = timeval_substract(curTime, baseTime);
397  simtime_t t = curTime.tv_sec + curTime.tv_usec*1e-6;
398 
399  // if t < simTime, clock would go backwards. this would be bad...
400  // (this could happen as timeval has a lower number of digits that simtime_t)
401  if (t < simTime()) t = simTime();
402 
403  msg->setSentFrom(mod, -1, simTime());
404  msg->setArrival(mod,-1,t);
405  simulation.msgQueue.insert(msg);
406 }
407 
408 ssize_t RealtimeScheduler::sendBytes(const char *buf,
409  size_t numBytes,
410  sockaddr* addr,
411  socklen_t addrlen,
412  bool isApp,
413  SOCKET fd)
414 {
415  if (!buf) {
416  ev << "[RealtimeScheduler::sendBytes()]\n"
417  << " Error sending packet: buf = NULL"
418  << endl;
419  return -1;
420  }
421  if (!isApp) {
422  if( numBytes > buffersize ) {
423  ev << "[RealtimeScheduler::sendBytes()]\n"
424  << " Trying to send oversized packet: size " << numBytes << " mtu " << buffersize
425  << endl;
426  opp_error("Can't send packet: too large"); //FIXME: Throw exception instead
427  }
428 
429  if ( netw_fd == INVALID_SOCKET ) {
430  ev << "[RealtimeScheduler::sendBytes()]\n"
431  << " Can't send packet to network: no tun/udp socket"
432  << endl;
433  return 0;
434  }
435  int nBytes;
436  if (addr) {
437  nBytes = sendto(netw_fd, buf, numBytes, 0, addr, addrlen);
438  } else {
439  // TUN
440  nBytes = write(netw_fd, buf, numBytes);
441  }
442  if (nBytes < 0) {
443  ev << "[RealtimeScheduler::sendBytes()]\n"
444  << " Error sending data to network: " << strerror(sock_errno()) << "\n"
445  << " FD = " << netw_fd << ", numBytes = " << numBytes << ", addrlen = " << addrlen
446  << endl;
447  }
448  return nBytes;
449 
450  } else if (socketContextMap.find(fd) != socketContextMap.end()){
451  // Data on socket in socketContextMap
452  SocketContext& fdContext = socketContextMap.at(fd);
453  if (numBytes > fdContext.mtu) {
454  ev << "[RealtimeScheduler::sendBytes()]\n"
455  << " Trying to send oversized packet: size " << numBytes << "\n"
456  << " mtu " << fdContext.mtu
457  << endl;
458  opp_error("Can't send packet: too large"); //FIXME: Throw exception instead
459  }
460  return send(fd, buf, numBytes, 0 /*MSG_NOSIGNAL*/);
461  } else {
462  if (numBytes > appBuffersize) {
463  ev << "[RealtimeScheduler::sendBytes()]\n"
464  << " Trying to send oversized packet: size " << numBytes << "\n"
465  << " mtu " << appBuffersize
466  << endl;
467  opp_error("Can't send packet: too large"); //FIXME: Throw exception instead
468  }
469  // If no fd is given, select a "random" one
470  if (fd == INVALID_SOCKET) {
471  for (fd = 0; fd <= maxfd; fd++) {
472  if (fd == netw_fd) continue;
473  if (fd == additional_fd) continue;
474  if (FD_ISSET(fd, &all_fds)) break;
475  }
476  if (fd > maxfd) {
477  throw cRuntimeError("Can't send packet to Application: no socket");
478  }
479  }
480  if (fd == apptun_fd) {
481  // Application TUN FD
482  return write(fd, buf, numBytes);
483  } else {
484  return send(fd, buf, numBytes, 0);
485  }
486  }
487  // TBD check for errors
488  return -1;
489 }
490