OverSim
RealtimeScheduler Class Reference

This class implements a event scheduler for OMNeT++ It makes the simulation run in realtime (i.e. More...

#include <realtimescheduler.h>

Inheritance diagram for RealtimeScheduler:
AppTunOutScheduler TunOutScheduler UdpOutScheduler

Classes

class  SocketContext

Public Member Functions

 RealtimeScheduler ()
 Constructor.
virtual ~RealtimeScheduler ()
 Destructor.
virtual void startRun ()
 Called at the beginning of a simulation run.
virtual void endRun ()
 Called at the end of a simulation run.
virtual void executionResumed ()
 Recalculates "base time" from current wall clock time.
virtual void setInterfaceModule (cModule *module, cMessage *notificationMsg, PacketBuffer *buffer, int mtu, bool isApp=false)
 To be called from the module which wishes to receive data from the tun device.
void registerSocket (SOCKET fd, cModule *mod, cMessage *notifMsg, PacketBuffer *buffer, int mtu)
virtual cMessage * getNextEvent ()
 Scheduler function – it comes from cScheduler interface.
void sendNotificationMsg (cMessage *msg, cModule *mod)
 send notification msg to module
virtual ssize_t sendBytes (const char *buf, size_t numBytes, sockaddr *addr=0, socklen_t addrlen=0, bool isApp=false, SOCKET fd=INVALID_SOCKET)
 Send data to network.
void closeAppSocket (SOCKET fd)
 Close the application TCP socket.
virtual SOCKET getAppTunFd ()
 Returns the FD for the application TUN socket.

Protected Member Functions

virtual int initializeNetwork ()=0
 Initialize the network.
virtual void additionalFD ()
 This function is called from main loop if data is accessible from "additional_fd".
virtual bool receiveWithTimeout (long usec)
 Waits for incoming data on the tun device.
virtual int receiveUntil (const timeval &targetTime)
 Tries to read data until the given time is up.

Protected Attributes

std::map< SOCKET, SocketContextsocketContextMap
fd_set all_fds
SOCKET maxfd
SOCKET netw_fd
SOCKET apptun_fd
cModule * module
cMessage * notificationMsg
PacketBufferpacketBuffer
size_t buffersize
cModule * appModule
cMessage * appNotificationMsg
PacketBufferappPacketBuffer
size_t appBuffersize
int appConnectionLimit
SOCKET additional_fd
timeval baseTime

Detailed Description

This class implements a event scheduler for OMNeT++ It makes the simulation run in realtime (i.e.

1 simsec == 1 sec) It must be subclassed; its subclasses must handle network traffic from/to the simulation

Definition at line 65 of file realtimescheduler.h.

Constructor & Destructor Documentation

RealtimeScheduler::RealtimeScheduler ( )

Constructor.

Definition at line 34 of file realtimescheduler.cc.

: cScheduler()
{
FD_ZERO(&all_fds);
maxfd = 0;
netw_fd = INVALID_SOCKET;
additional_fd = INVALID_SOCKET;
apptun_fd = INVALID_SOCKET;
}
RealtimeScheduler::~RealtimeScheduler ( )
virtual

Destructor.

Definition at line 43 of file realtimescheduler.cc.

{ }

Member Function Documentation

virtual void RealtimeScheduler::additionalFD ( )
inlineprotectedvirtual

This function is called from main loop if data is accessible from "additional_fd".

This FD can be set in initializeNetwork by concrete implementations.

Reimplemented in AppTunOutScheduler, TunOutScheduler, and UdpOutScheduler.

Definition at line 117 of file realtimescheduler.h.

Referenced by receiveWithTimeout().

{};
void RealtimeScheduler::closeAppSocket ( SOCKET  fd)

Close the application TCP socket.

Definition at line 370 of file realtimescheduler.cc.

Referenced by receiveWithTimeout().

{
#ifdef _WIN32
closesocket(fd);
#else
close(fd);
#endif
FD_CLR(fd, &all_fds);
std::map<SOCKET, SocketContext>::iterator it = socketContextMap.find(fd);
if (it != socketContextMap.end()) {
it->second.buffer->push_back(PacketBufferEntry(
sendNotificationMsg(it->second.notifMsg, it->second.mod);
socketContextMap.erase(fd);
} else {
}
}
void RealtimeScheduler::endRun ( )
virtual

Called at the end of a simulation run.

Definition at line 65 of file realtimescheduler.cc.

{}
void RealtimeScheduler::executionResumed ( )
virtual

Recalculates "base time" from current wall clock time.

Definition at line 68 of file realtimescheduler.cc.

{
gettimeofday(&baseTime, NULL);
baseTime = timeval_substract(baseTime, SIMTIME_DBL(simTime()));
}
virtual SOCKET RealtimeScheduler::getAppTunFd ( )
inlinevirtual

Returns the FD for the application TUN socket.

Returns
the application TUN socket FD

Definition at line 225 of file realtimescheduler.h.

{ return apptun_fd; };
cMessage * RealtimeScheduler::getNextEvent ( )
virtual

Scheduler function – it comes from cScheduler interface.

Definition at line 327 of file realtimescheduler.cc.

{
// assert that we've been configured
if (!module)
throw cRuntimeError("RealtimeScheduler: setInterfaceModule() not called: it must be called from a module's initialize() function");
// FIXME: reimplement sanity check
// if (app_fd >= 0 && !appModule)
// throw cRuntimeError("RealtimeScheduler: setInterfaceModule() not called from application: it must be called from a module's initialize() function");
// calculate target time
timeval targetTime;
cMessage *msg = sim->msgQueue.peekFirst();
if (!msg) {
// if there are no events, wait until something comes from outside
// TBD: obey simtimelimit, cpu-time-limit
targetTime.tv_sec = LONG_MAX;
targetTime.tv_usec = 0;
} else {
// use time of next event
simtime_t eventSimtime = msg->getArrivalTime();
targetTime = timeval_add(baseTime, SIMTIME_DBL(eventSimtime));
}
// if needed, wait until that time arrives
timeval curTime;
gettimeofday(&curTime, NULL);
if (timeval_greater(targetTime, curTime)) {
int status = receiveUntil(targetTime);
if (status == -1) {
printf("WARNING: receiveUntil returned -1 (user interrupt)\n");
return NULL; // interrupted by user
} else if (status == 1) {
msg = sim->msgQueue.peekFirst(); // received something
}
} else {
// printf("WARNING: Lagging behind realtime!\n");
// we're behind -- customized versions of this class may
// alert if we're too much behind, whatever that means
}
// ok, return the message
return msg;
}
virtual int RealtimeScheduler::initializeNetwork ( )
protectedpure virtual

Initialize the network.

Implemented in AppTunOutScheduler, TunOutScheduler, and UdpOutScheduler.

Referenced by startRun().

int RealtimeScheduler::receiveUntil ( const timeval &  targetTime)
protectedvirtual

Tries to read data until the given time is up.

Parameters
targetTimestop waiting after this time is up
Returns
1 if data is read, -1 if there is an error, 0 if no data is read

Definition at line 300 of file realtimescheduler.cc.

Referenced by getNextEvent().

{
// if there's more than 200ms to wait, wait in 100ms chunks
// in order to keep UI responsiveness by invoking ev.idle()
timeval curTime;
gettimeofday(&curTime, NULL);
while (targetTime.tv_sec-curTime.tv_sec >=2 ||
timeval_diff_usec(targetTime, curTime) >= 200000) {
if (receiveWithTimeout(100000)) { // 100ms
if (ev.idle()) return -1;
return 1;
}
if (ev.idle()) return -1;
gettimeofday(&curTime, NULL);
}
// difference is now at most 100ms, do it at once
long usec = timeval_diff_usec(targetTime, curTime);
if (usec>0)
if (receiveWithTimeout(usec)) {
if (ev.idle()) return -1;
return 1;
}
if (ev.idle()) return -1;
return 0;
}
bool RealtimeScheduler::receiveWithTimeout ( long  usec)
protectedvirtual

Waits for incoming data on the tun device.

Parameters
usecTimeout after which to quit waiting (in µsec)
Returns
true if data was read, false otherwise

Definition at line 121 of file realtimescheduler.cc.

Referenced by receiveUntil().

{
bool newEvent = false;
// prepare sets for select()
fd_set readFD;
readFD = all_fds;
timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = usec;
if (select(FD_SETSIZE, &readFD, NULL, NULL, &timeout) > 0) {
// Read on all sockets with data
for (SOCKET fd = 0; fd <= maxfd; fd++) {
if (FD_ISSET(fd, &readFD)) {
// Incoming data on netw_fd
if (fd == netw_fd) {
char* buf = new char[buffersize];
int nBytes;
// FIXME: Ugly. But we want to support IPv4 and IPv6 here, so we
// reserve enough space for the "bigger" address.
sockaddr* from = (sockaddr*) new sockaddr_in;
socklen_t addrlen = sizeof(sockaddr_in);
// FIXME: Ugly...
getsockname(netw_fd, from, &addrlen);
if ( from->sa_family != SOCK_DGRAM ) {
delete from;
from = 0;
addrlen = 0;
// use read() for TUN device
nBytes = read(netw_fd, buf, buffersize);
} else {
addrlen = sizeof(sockaddr_in);
nBytes = recvfrom(netw_fd, buf, buffersize, 0, from, &addrlen);
}
if (nBytes < 0) {
ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
<< " Error reading from network: " << strerror(sock_errno())
<< endl;
delete[] buf;
buf = NULL;
opp_error("Read from network device returned an error");
} else if (nBytes == 0) {
ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
<< " Received 0 byte long UDP packet!" << endl;
delete[] buf;
buf = NULL;
} else {
// write data to buffer
ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
<< " Received " << nBytes << " bytes"
<< endl;
packetBuffer->push_back(PacketBufferEntry(buf, nBytes, from, addrlen));
// schedule notificationMsg for the interface module
newEvent = true;
}
} else if ( fd == apptun_fd ) {
// Data on application TUN FD
char* buf = new char[appBuffersize];
// use read() for TUN device
int nBytes = read(fd, buf, appBuffersize);
if (nBytes < 0) {
ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
<< " Error reading from application TUN socket: "
<< strerror(sock_errno())
<< endl;
delete[] buf;
buf = NULL;
opp_error("Read from application TUN socket returned "
"an error");
} else if (nBytes == 0) {
ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
<< " Received 0 byte long UDP packet!" << endl;
delete[] buf;
buf = NULL;
} else {
// write data to buffer
ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
<< " Received " << nBytes << " bytes"
<< endl;
// schedule notificationMsg for the interface module
newEvent = true;
}
} else if ( fd == additional_fd ) {
// Data on additional FD
newEvent = true;
} else if (socketContextMap.find(fd) != socketContextMap.end()){
// Data on socket in socketContextMap
SocketContext& fdContext = socketContextMap.at(fd);
char* buf = new char[fdContext.mtu];
int error;
socklen_t size = sizeof(error);
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (char*)&error, &size) < 0) {
perror("getsockopt()");
}
int nBytes = recv(fd, buf, fdContext.mtu, 0);
#if 0
if (nBytes < 0) {
delete[] buf;
buf = NULL;
ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
<< " Read error from socket in socketContextMap: "
<< strerror(sock_errno()) << endl;
// opp_error("Read from network device returned an error (App)");
} else if (nBytes == 0) {
// Application closed Socket
ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
<< " Application closed socket"
<< endl;
delete[] buf;
buf = NULL;
newEvent = true;
} else {
#endif
// write data to buffer
ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
<< " Received " << nBytes << " bytes"
<< endl;
fdContext.buffer->push_back(PacketBufferEntry(
buf, nBytes, PacketBufferEntry::PACKET_DATA, fd));
// schedule notificationMsg for the interface module
sendNotificationMsg(fdContext.notifMsg, fdContext.mod);
newEvent = true;
#if 0
}
#endif
} else {
// Data on app FD
char* buf = new char[appBuffersize];
int nBytes = recv(fd, buf, appBuffersize, 0);
if (nBytes < 0) {
delete[] buf;
buf = NULL;
ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
<< " Read error from application socket: "
<< strerror(sock_errno()) << endl;
// opp_error("Read from network device returned an error (App)");
} else if (nBytes == 0) {
// Application closed Socket
ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
<< " Application closed socket"
<< endl;
delete[] buf;
buf = NULL;
newEvent = true;
} else {
// write data to buffer
ev << "[RealtimeScheduler::receiveWithTimeout()]\n"
<< " Received " << nBytes << " bytes"
<< endl;
// schedule notificationMsg for the interface module
newEvent = true;
}
}
}
}
}
return newEvent;
}
void RealtimeScheduler::registerSocket ( SOCKET  fd,
cModule *  mod,
cMessage *  notifMsg,
PacketBuffer buffer,
int  mtu 
)

Definition at line 104 of file realtimescheduler.cc.

{
if (socketContextMap.find(fd) != socketContextMap.end()) {
throw cRuntimeError("RealtimeScheduler::registerSocket(): Socket"
"already registered!");
}
socketContextMap[fd] = SocketContext(mod, notifMsg, buffer, mtu);
FD_SET(fd, &all_fds);
if (fd > maxfd) {
maxfd = fd;
}
}
ssize_t RealtimeScheduler::sendBytes ( const char *  buf,
size_t  numBytes,
sockaddr *  addr = 0,
socklen_t  addrlen = 0,
bool  isApp = false,
SOCKET  fd = INVALID_SOCKET 
)
virtual

Send data to network.

Parameters
bufA pointer to the data to be send
numBytesthe length of the data
isAppset to "true" if called from a realworldApp
addrIf needed, the destination address
addrlenThe length of the address
fdIf connected to more than one external app, set to the corresponding FD. If left to default and multiple apps are connected, the data will be send to one arbitrarily chosen app.
Returns
The number of bytes written, -1 on error

Definition at line 408 of file realtimescheduler.cc.

Referenced by SimpleGameClient::handleLowerMessage(), SimpleGameClient::handleRealworldPacket(), SimpleGameClient::handleTimerEvent(), RealworldConnector::transmitToNetwork(), and SimpleGameClient::updateNeighbors().

{
if (!buf) {
ev << "[RealtimeScheduler::sendBytes()]\n"
<< " Error sending packet: buf = NULL"
<< endl;
return -1;
}
if (!isApp) {
if( numBytes > buffersize ) {
ev << "[RealtimeScheduler::sendBytes()]\n"
<< " Trying to send oversized packet: size " << numBytes << " mtu " << buffersize
<< endl;
opp_error("Can't send packet: too large"); //FIXME: Throw exception instead
}
if ( netw_fd == INVALID_SOCKET ) {
ev << "[RealtimeScheduler::sendBytes()]\n"
<< " Can't send packet to network: no tun/udp socket"
<< endl;
return 0;
}
int nBytes;
if (addr) {
nBytes = sendto(netw_fd, buf, numBytes, 0, addr, addrlen);
} else {
// TUN
nBytes = write(netw_fd, buf, numBytes);
}
if (nBytes < 0) {
ev << "[RealtimeScheduler::sendBytes()]\n"
<< " Error sending data to network: " << strerror(sock_errno()) << "\n"
<< " FD = " << netw_fd << ", numBytes = " << numBytes << ", addrlen = " << addrlen
<< endl;
}
return nBytes;
} else if (socketContextMap.find(fd) != socketContextMap.end()){
// Data on socket in socketContextMap
SocketContext& fdContext = socketContextMap.at(fd);
if (numBytes > fdContext.mtu) {
ev << "[RealtimeScheduler::sendBytes()]\n"
<< " Trying to send oversized packet: size " << numBytes << "\n"
<< " mtu " << fdContext.mtu
<< endl;
opp_error("Can't send packet: too large"); //FIXME: Throw exception instead
}
return send(fd, buf, numBytes, 0 /*MSG_NOSIGNAL*/);
} else {
if (numBytes > appBuffersize) {
ev << "[RealtimeScheduler::sendBytes()]\n"
<< " Trying to send oversized packet: size " << numBytes << "\n"
<< " mtu " << appBuffersize
<< endl;
opp_error("Can't send packet: too large"); //FIXME: Throw exception instead
}
// If no fd is given, select a "random" one
if (fd == INVALID_SOCKET) {
for (fd = 0; fd <= maxfd; fd++) {
if (fd == netw_fd) continue;
if (fd == additional_fd) continue;
if (FD_ISSET(fd, &all_fds)) break;
}
if (fd > maxfd) {
throw cRuntimeError("Can't send packet to Application: no socket");
}
}
if (fd == apptun_fd) {
// Application TUN FD
return write(fd, buf, numBytes);
} else {
return send(fd, buf, numBytes, 0);
}
}
// TBD check for errors
return -1;
}
void RealtimeScheduler::sendNotificationMsg ( cMessage *  msg,
cModule *  mod 
)

send notification msg to module

Parameters
msgThe notification Message
modThe destination

Definition at line 391 of file realtimescheduler.cc.

Referenced by UdpOutScheduler::additionalFD(), TunOutScheduler::additionalFD(), AppTunOutScheduler::additionalFD(), closeAppSocket(), and receiveWithTimeout().

{
if (msg->isScheduled()) return; // Notification already scheduled
timeval curTime;
gettimeofday(&curTime, NULL);
curTime = timeval_substract(curTime, baseTime);
simtime_t t = curTime.tv_sec + curTime.tv_usec*1e-6;
// if t < simTime, clock would go backwards. this would be bad...
// (this could happen as timeval has a lower number of digits that simtime_t)
if (t < simTime()) t = simTime();
msg->setSentFrom(mod, -1, simTime());
msg->setArrival(mod,-1,t);
simulation.msgQueue.insert(msg);
}
void RealtimeScheduler::setInterfaceModule ( cModule *  module,
cMessage *  notificationMsg,
PacketBuffer buffer,
int  mtu,
bool  isApp = false 
)
virtual

To be called from the module which wishes to receive data from the tun device.

The method must be called from the module's initialize() function.

Parameters
modulePointer to the module that wants to receive the data
notificationMsgA pointer to a message that will be scheduled if there is data to read
bufferA pointer to the buffer the data will be written into
mtuMax allowed packet size
isAppset to "true" if called from a realworldApp

Definition at line 74 of file realtimescheduler.cc.

Referenced by RealworldConnector::initialize(), SimpleGameClient::initializeApp(), and XmlRpcInterface::initializeApp().

{
if (!mod || !notifMsg || !buffer) {
throw cRuntimeError("RealtimeScheduler: setInterfaceModule(): "
"arguments must be non-NULL");
}
if (!isApp) {
if (module) {
throw cRuntimeError("RealtimeScheduler: setInterfaceModule() "
"already called");
}
module = mod;
notificationMsg = notifMsg;
packetBuffer = buffer;
buffersize = mtu;
} else {
if (appModule) {
throw cRuntimeError("RealtimeScheduler: setInterfaceModule() "
"already called");
}
appModule = mod;
appNotificationMsg = notifMsg;
appPacketBuffer = buffer;
}
}
void RealtimeScheduler::startRun ( )
virtual

Called at the beginning of a simulation run.

Definition at line 46 of file realtimescheduler.cc.

{
if (initsocketlibonce()!=0)
throw cRuntimeError("RealtimeScheduler: Cannot initialize socket library");
gettimeofday(&baseTime, NULL);
appModule = NULL;
module = NULL;
appConnectionLimit = ev.getConfig()->getAsInt(CFGID_EXTERNALAPP_CONNECTION_LIMIT, 0);
opp_error("realtimeScheduler error: initializeNetwork failed\n");
}
}

Member Data Documentation

size_t RealtimeScheduler::appBuffersize
protected

Definition at line 101 of file realtimescheduler.h.

Referenced by receiveWithTimeout(), sendBytes(), and setInterfaceModule().

int RealtimeScheduler::appConnectionLimit
protected
SOCKET RealtimeScheduler::apptun_fd
protected
timeval RealtimeScheduler::baseTime
protected

Definition at line 107 of file realtimescheduler.h.

Referenced by executionResumed(), getNextEvent(), sendNotificationMsg(), and startRun().

size_t RealtimeScheduler::buffersize
protected

Definition at line 95 of file realtimescheduler.h.

Referenced by receiveWithTimeout(), sendBytes(), and setInterfaceModule().

cModule* RealtimeScheduler::module
protected

Definition at line 92 of file realtimescheduler.h.

Referenced by getNextEvent(), receiveWithTimeout(), setInterfaceModule(), and startRun().

cMessage* RealtimeScheduler::notificationMsg
protected

Definition at line 93 of file realtimescheduler.h.

Referenced by receiveWithTimeout(), setInterfaceModule(), and startRun().

PacketBuffer* RealtimeScheduler::packetBuffer
protected

Definition at line 94 of file realtimescheduler.h.

Referenced by receiveWithTimeout(), and setInterfaceModule().

std::map<SOCKET, SocketContext> RealtimeScheduler::socketContextMap
protected

Definition at line 83 of file realtimescheduler.h.

Referenced by closeAppSocket(), receiveWithTimeout(), registerSocket(), and sendBytes().


The documentation for this class was generated from the following files: