OverSim
PubSubMMOG Class Reference

#include <PubSubMMOG.h>

Inheritance diagram for PubSubMMOG:
BaseOverlay BaseRpc BaseTcpSupport TopologyVis RpcListener

Public Member Functions

virtual ~PubSubMMOG ()
virtual void initializeOverlay (int stage)
 Initializes derived-class-attributes.
virtual void finishOverlay ()
 collects statistical data in derived class
virtual void handleUDPMessage (BaseOverlayMessage *msg)
 Processes messages from underlay.
virtual void handleTimerEvent (cMessage *msg)
virtual void handleAppMessage (cMessage *msg)
 Processes "timer" self-messages.
virtual void receiveChangeNotification (int category, const cPolymorphic *details)
 callback-method for events at the NotificationBoard
virtual bool handleRpcCall (BaseCallMessage *msg)
 Processes Remote-Procedure-Call invocation messages.
virtual void handleRpcResponse (BaseResponseMessage *msg, cPolymorphic *context, int rpcId, simtime_t rtt)
 This method is called if an RPC response has been received.
virtual void handleRpcTimeout (BaseCallMessage *msg, const TransportAddress &dest, cPolymorphic *context, int rpcId, const OverlayKey &destKey)
 This method is called if an RPC timeout has been reached.
- Public Member Functions inherited from BaseOverlay
 BaseOverlay ()
virtual ~BaseOverlay ()
 Virtual destructor.
States getState ()
bool isMalicious ()
 Returns true, if node is malicious.
bool isInSimpleMultiOverlayHost ()
 Returns true if overlay is one in an array, inside a SimpleMultiOverlayHost.
const simtime_t & getCreationTime ()
void join (const OverlayKey &nodeID=OverlayKey::UNSPECIFIED_KEY)
 Join the overlay with a given nodeID.
virtual NodeVectorlocal_lookup (const OverlayKey &key, int num, bool safe)
 finds nodes closest to the given OverlayKey
virtual NodeVectorneighborSet (int num)
virtual bool isSiblingFor (const NodeHandle &node, const OverlayKey &key, int numSiblings, bool *err)
 Query if a node is among the siblings for a given key.
virtual int getMaxNumSiblings ()
 Query the maximum number of siblings (nodes close to a key) that are maintained by this overlay protocol.
virtual int getMaxNumRedundantNodes ()
 Query the maximum number of redundant next hop nodes that are returned by findNode().
void sendMessageToUDP (const TransportAddress &dest, cPacket *msg, simtime_t delay=SIMTIME_ZERO)
 Sends message to underlay.
void sendToKey (const OverlayKey &key, BaseOverlayMessage *message, int numSiblings=1, const std::vector< TransportAddress > &sourceRoute=TransportAddress::UNSPECIFIED_NODES, RoutingType routingType=DEFAULT_ROUTING)
 Sends a message to an overlay node, with the generic routing algorithm.
virtual OverlayKey distance (const OverlayKey &x, const OverlayKey &y, bool useAlternative=false) const
 This method should implement the distance between two keys.
void registerComp (CompType compType, cModule *module)
cModule * getCompModule (CompType compType)
cGate * getCompRpcGate (CompType compType)
void sendMessageToAllComp (cMessage *msg, CompType srcComp)
bool providesKbr ()
virtual uint8_t getBitsPerDigit ()
bool getMeasureAuthBlock ()
BootstrapListgetBootstrapList () const
virtual OverlayKey estimateMeanDistance ()
 returns mean distance between OverlayKeys in the network
virtual uint32_t estimateOverlaySize ()
 estimates the current number of nodes online
- Public Member Functions inherited from BaseRpc
 BaseRpc ()
const NodeHandlegetThisNode ()
 Returns the NodeHandle of this node.
simtime_t getUdpTimeout ()
- Public Member Functions inherited from RpcListener
virtual ~RpcListener ()
 destructor
- Public Member Functions inherited from BaseTcpSupport
virtual void socketDataArrived (int connId, void *yourPtr, cPacket *msg, bool urgent)
virtual void socketEstablished (int connId, void *yourPtr)
virtual void socketPeerClosed (int connId, void *yourPtr)
virtual void socketFailure (int connId, void *yourPtr, int code)
virtual void socketStatusArrived (int connId, void *yourPtr, TCPStatusInfo *status)
- Public Member Functions inherited from TopologyVis
 TopologyVis ()
void showOverlayNeighborArrow (const NodeHandle &neighbor, bool flush=true, const char *displayString=NULL)
 Draws an arrow from this node to neighbor.
void deleteOverlayNeighborArrow (const NodeHandle &neighbor)
 Removes an arrow from this node to neighbor.

Protected Member Functions

void setBootstrapedIcon ()
void handleMove (GameAPIPositionMessage *posMsg)
void handleMoveMessage (PubSubMoveMessage *moveMsg)
void handleMoveListMessage (PubSubMoveListMessage *moveMsg)
void handleJoinResponse (PubSubJoinResponse *joinResp)
void handleSubscriptionCall (PubSubSubscriptionCall *subCall)
void handleSubscriptionResponse (PubSubSubscriptionResponse *subResp)
void handleResponsibleNodeResponse (PubSubResponsibleNodeResponse *subResp)
void handleTakeOver (PubSubTakeOverSubspaceCall *toCall)
void handleHelpResponse (PubSubHelpResponse *helpResp)
void handleBackupCall (PubSubBackupCall *backupCall)
void handleBackupResponse (PubSubBackupResponse *backupResp)
void handleIntermediateCall (PubSubIntermediateCall *intermediateCall)
void handleIntermediateResponse (PubSubIntermediateResponse *intermediateResp)
void handleAdoptChildCall (PubSubAdoptChildCall *adoptCall)
void handleAdoptChildResponse (PubSubAdoptChildResponse *adoptResp)
void handlePingCall (PubSubPingCall *hearbeatCall)
void handlePingResponse (PubSubPingResponse *pingResp)
void takeOverNewSubspace (PubSubSubspaceId subspaceId)
void takeOverSubspace (PubSubSubspaceResponsible &subspaceId, bool isNew)
void sendHearbeatToChildren ()
void sendPingToChildren ()
void handleParentTimeout (PubSubTimer *timer)
void handleBackupCallTimeout (PubSubBackupCall *backupCall, const TransportAddress &oldNode)
void handlePingCallTimeout (PubSubPingCall *pingCall, const TransportAddress &oldNode)
void handleSubscriptionCallTimeout (PubSubSubscriptionCall *subscriptionCall, const TransportAddress &oldNode)
void handleUnsubscriptionMessage (PubSubUnsubscriptionMessage *unsMsg)
void handleNodeLeftMessage (PubSubNodeLeftMessage *leftMsg)
void handleReplacementMessage (PubSubReplacementMessage *replaceMsg)
void handleReleaseIntermediate (PubSubReleaseIntermediateMessage *releaseMsg)
void handleIntermediateBackup (PubSubBackupIntermediateMessage *backupMsg)
void handleSubscriptionBackup (PubSubBackupSubscriptionMessage *backupMsg)
void handleUnsubscribeBackup (PubSubBackupUnsubscribeMessage *backupMsg)
void unsubscribeChild (const NodeHandle &node, PubSubSubspaceResponsible &subspace)
void sendMessageToChildren (PubSubSubspaceResponsible &subspace, BaseOverlayMessage *toIntermediates, BaseOverlayMessage *toBackup, BaseOverlayMessage *toPlayers)
void publishEvents ()
void startTimer (PubSubTimer *timer)
- Protected Member Functions inherited from BaseOverlay
int numInitStages () const
 Sets init stage.
void bindToPort (int port)
 Tells UDP we want to get all packets arriving on the given port.
virtual void route (const OverlayKey &key, CompType destComp, CompType srcComp, cPacket *msg, const std::vector< TransportAddress > &sourceRoute=TransportAddress::UNSPECIFIED_NODES, RoutingType routingType=DEFAULT_ROUTING)
 Routes message through overlay.
void callDeliver (BaseOverlayMessage *msg, const OverlayKey &destKey)
 Calls deliver function in application.
void callForward (const OverlayKey &key, BaseRouteMessage *msg, const NodeHandle &nextHopNode)
 Calls forward function in application.
void callUpdate (const NodeHandle &node, bool joined)
 Informs application about state changes of nodes or newly joined nodes.
void handleMessage (cMessage *msg)
 Checks for message type and calls corresponding method.
void handleBaseOverlayMessage (BaseOverlayMessage *msg, const OverlayKey &destKey=OverlayKey::UNSPECIFIED_KEY)
 Handles a BaseOverlayMessage

virtual void handleTransportAddressChangedNotification ()
 This method gets call if the node has a new TransportAddress (IP address) because he changed his access network.
virtual void handleNodeLeaveNotification ()
 This method gets call **.gracefulLeaveDelay seconds before it is killed.
virtual void handleNodeGracefulLeaveNotification ()
 This method gets call **.gracefulLeaveDelay seconds before it is killed if this node is among the gracefulLeaveProbability nodes.
virtual void recordOverlaySentStats (BaseOverlayMessage *msg)
 Collect overlay specific sent messages statistics.
void setOverlayReady (bool ready)
 Sets the overlay ready icon and register/deregisters the node at the GlobalNodeList.
virtual AbstractLookupcreateLookup (RoutingType routingType=DEFAULT_ROUTING, const BaseOverlayMessage *msg=NULL, const cPacket *findNodeExt=NULL, bool appLookup=false)
 Creates an abstract iterative lookup instance.
virtual void removeLookup (AbstractLookup *lookup)
 Removes the abstract lookup instance.
virtual NodeVectorfindNode (const OverlayKey &key, int numRedundantNodes, int numSiblings, BaseOverlayMessage *msg=NULL)
 Implements the find node call.
virtual void joinOverlay ()
 Join the overlay with a given nodeID in thisNode.key.
virtual void joinForeignPartition (const NodeHandle &node)
 Join another overlay partition with the given node as bootstrap node.
virtual bool handleFailedNode (const TransportAddress &failed)
 Handles a failed node.
virtual void lookupRpc (LookupCall *call)
virtual void nextHopRpc (NextHopCall *call)
void countFindNodeCall (const FindNodeCall *call)
void countFailedNodeCall (const FailedNodeCall *call)
bool internalHandleRpcCall (BaseCallMessage *msg)
 Handles internal rpc requests.
void internalHandleRpcResponse (BaseResponseMessage *msg, cPolymorphic *context, int rpcId, simtime_t rtt)
 Handles rpc responses internal in base classes

void internalHandleRpcTimeout (BaseCallMessage *msg, const TransportAddress &dest, cPolymorphic *context, int rpcId, const OverlayKey &destKey)
 Handles rpc timeouts internal in base classes

void internalSendRouteRpc (BaseRpcMessage *message, const OverlayKey &destKey, const std::vector< TransportAddress > &sourceRoute, RoutingType routingType)
CompType getThisCompType ()
 Return the component type of this module.
- Protected Member Functions inherited from BaseRpc
void initRpcs ()
 Initializes Remote-Procedure state.
void finishRpcs ()
 Deinitializes Remote-Procedure state.
virtual void internalHandleRpcMessage (BaseRpcMessage *msg)
 Handles incoming rpc messages and delegates them to the corresponding listeners or handlers.
uint32_t sendRouteRpcCall (CompType destComp, const TransportAddress &dest, const OverlayKey &destKey, BaseCallMessage *msg, cPolymorphic *context=NULL, RoutingType routingType=DEFAULT_ROUTING, simtime_t timeout=-1, int retries=0, int rpcId=-1, RpcListener *rpcListener=NULL)
 Routes a Remote-Procedure-Call message to an OverlayKey.
uint32_t sendRouteRpcCall (CompType destComp, const OverlayKey &destKey, BaseCallMessage *msg, cPolymorphic *context=NULL, RoutingType routingType=DEFAULT_ROUTING, simtime_t timeout=-1, int retries=0, int rpcId=-1, RpcListener *rpcListener=NULL)
 Routes a Remote-Procedure-Call message to an OverlayKey.
uint32_t sendRouteRpcCall (CompType destComp, const TransportAddress &dest, BaseCallMessage *msg, cPolymorphic *context=NULL, RoutingType routingType=DEFAULT_ROUTING, simtime_t timeout=-1, int retries=0, int rpcId=-1, RpcListener *rpcListener=NULL)
 Sends a Remote-Procedure-Call message using the overlay's UDP port
This replaces ROUTE_DIRECT calls!
uint32_t sendUdpRpcCall (const TransportAddress &dest, BaseCallMessage *msg, cPolymorphic *context=NULL, simtime_t timeout=-1, int retries=0, int rpcId=-1, RpcListener *rpcListener=NULL)
 Sends a Remote-Procedure-Call message to the underlay

uint32_t sendInternalRpcCall (CompType destComp, BaseCallMessage *msg, cPolymorphic *context=NULL, simtime_t timeout=-1, int retries=0, int rpcId=-1, RpcListener *rpcListener=NULL)
 Sends an internal Remote-Procedure-Call between two tiers

void cancelRpcMessage (uint32_t nonce)
 Cancels a Remote-Procedure-Call.
void cancelAllRpcs ()
 Cancels all RPCs.
void sendRpcResponse (TransportType transportType, CompType destComp, const TransportAddress &dest, const OverlayKey &destKey, BaseCallMessage *call, BaseResponseMessage *response)
 Send Remote-Procedure response message and deletes call message.
void sendRpcResponse (BaseCallMessage *call, BaseResponseMessage *response)
 Send Remote-Procedure response message to UDP and deletes call message.
int pingNode (const TransportAddress &dest, simtime_t timeout=-1, int retries=0, cPolymorphic *context=NULL, const char *caption="PING", RpcListener *rpcListener=NULL, int rpcId=-1, TransportType transportType=INVALID_TRANSPORT)
 ping a node by its TransportAddress
virtual void pingResponse (PingResponse *pingResponse, cPolymorphic *context, int rpcId, simtime_t rtt)
virtual void pingTimeout (PingCall *pingCall, const TransportAddress &dest, cPolymorphic *context, int rpcId)
bool internalHandleMessage (cMessage *msg)
- Protected Member Functions inherited from RpcListener
virtual void handleRpcResponse (BaseResponseMessage *msg, const RpcState &rpcState, simtime_t rtt)
 This method is called if an RPC response has been received.
virtual void handleRpcTimeout (const RpcState &rpcState)
 This method is called if an RPC timeout has been reached.
- Protected Member Functions inherited from BaseTcpSupport
void handleTCPMessage (cMessage *msg)
 Member function to handle incoming TCP messages.
void bindAndListenTcp (int port)
 Member function to bind service to the specified port and listen afterwards.
bool isAlreadyConnected (TransportAddress address)
 Member function to check if the service is already connected.
void establishTcpConnection (TransportAddress address)
 Member function to establish a connection to the specified node.
void sendTcpData (cPacket *msg, TransportAddress address)
 Member function to send TCP data to the specified node.
virtual void handleConnectionEvent (EvCode code, TransportAddress address)
 Member function to handle passive connection events.
virtual void handleDataReceived (TransportAddress address, cPacket *msg, bool urgent)
 Member function to handle incoming data.
virtual void handleIncomingConnection (TransportAddress address)
 Member function to handle newly opened connections.
void closeTcpConnection (TransportAddress address)
 Member function to close an established connection.
void setTcpOut (cGate *gate)
 Member function to set local gate towards the TCP module during init phase.
cGate * getTcpOut ()
 Member function to get local gate towards the TCP module.
- Protected Member Functions inherited from TopologyVis
void initVis (cModule *terminal)

Protected Attributes

std::list< PubSubSubspacesubscribedSubspaces
std::map< PubSubSubspaceId,
PubSubSubspaceResponsible
responsibleSubspaces
std::map< PubSubSubspaceId,
PubSubSubspaceResponsible
backupSubspaces
std::map< PubSubSubspaceId,
PubSubSubspaceIntermediate
intermediateSubspaces
int subspaceSize
int AOIWidth
int numSubspaces
int parentTimeout
int maxChildren
bool allowOldMoveMessages
unsigned int currentRegionX
unsigned int currentRegionY
int movementRate
int maxMoveDelay
PubSubTimerheartbeatTimer
PubSubTimerchildPingTimer
PubSubTimereventDeliveryTimer
cMessage * joinTimer
TransportAddress lobbyServer
int numEventsWrongTimeslot
int numEventsCorrectTimeslot
int numPubSubSignalingMessages
int pubSubSignalingMessagesSize
int numMoveMessages
int moveMessagesSize
int numMoveListMessages
int moveListMessagesSize
int respMoveListMessagesSize
int lostMovementLists
int receivedMovementLists
- Protected Attributes inherited from BaseOverlay
int numAppDataForwarded
 number of forwarded app data packets
int bytesAppDataForwarded
 number of forwarded app data bytes at out-gate
int numAppLookupForwarded
 number of forwarded app lookup packets
int bytesAppLookupForwarded
 number of forwarded app lookup bytes at out-gate
int numMaintenanceForwarded
 number of forwarded maintenance packets
int bytesMaintenanceForwarded
 number of forwarded maintenance bytes at out-gate
int numFindNodeSent
int bytesFindNodeSent
int numFindNodeResponseSent
int bytesFindNodeResponseSent
int numFailedNodeSent
int bytesFailedNodeSent
int numFailedNodeResponseSent
int bytesFailedNodeResponseSent
std::vector< HopDelayRecord * > singleHopDelays
simtime_t creationTime
 simtime when the node has been created
GlobalNodeListglobalNodeList
 pointer to GlobalNodeList in this node
NotificationBoard * notificationBoard
 pointer to NotificationBoard in this node
UnderlayConfiguratorunderlayConfigurator
 pointer to UnderlayConfigurator in this node
BootstrapListbootstrapList
 pointer to the BootstrapList module
GlobalParametersglobalParameters
 pointer to the GlobalParameters module
uint32_t overlayId
 identifies the overlay this node belongs to (used for multiple overlays)
bool debugOutput
 debug output ?
RoutingType defaultRoutingType
bool useCommonAPIforward
 forward messages to applications?
bool collectPerHopDelay
 collect delay for single hops
bool routeMsgAcks
 send ACK when receiving route message
uint32_t recNumRedundantNodes
 numRedundantNodes for recursive routing
bool recordRoute
 record visited hops on route
bool drawOverlayTopology
bool rejoinOnFailure
bool sendRpcResponseToLastHop
 needed by KBR protocols for NAT support
bool dropFindNodeAttack
 if node is malicious, it tries a findNode attack
bool isSiblingAttack
 if node is malicious, it tries a isSibling attack
bool invalidNodesAttack
 if node is malicious, it tries a invalidNode attack
bool dropRouteMessageAttack
 if node is malicious, it drops all received BaseRouteMessages
int localPort
 used UDP-port
int hopCountMax
 maximum hop count
bool measureAuthBlock
 if true, measure the overhead of signatures in rpc messages
bool restoreContext
 if true, a node rejoins with its old nodeId and malicious state
int numDropped
 number of dropped packets
int bytesDropped
 number of dropped bytes
cOutVector delayVector
 statistical output vector for packet-delays
cOutVector hopCountVector
 statistical output vector for hop-counts
States state
IterativeLookupConfiguration iterativeLookupConfig
RecursiveLookupConfiguration recursiveLookupConfig
LookupSet lookups
bool kbr
 set this to true, if the overlay provides KBR services
- Protected Attributes inherited from BaseRpc
NodeHandle thisNode
 NodeHandle to this node.
BaseOverlayoverlay
bool debugOutput
 debug output ?
GlobalStatisticsglobalStatistics
 pointer to GlobalStatistics module in this node
CompType thisCompType
NeighborCacheneighborCache
 pointer to the neighbor cache
CryptoModulecryptoModule
 pointer to CryptoModule
int numPingSent
int bytesPingSent
int numPingResponseSent
int bytesPingResponseSent
- Protected Attributes inherited from TopologyVis
cModule * thisTerminal
GlobalNodeListglobalNodeList
 pointer to corresponding node

Additional Inherited Members

- Public Types inherited from BaseOverlay
enum  States {
  INIT = 0, BOOTSTRAP = 1, DISCOVERY = 2, PREJOIN = 3,
  JOIN = 4, POSTJOIN = 5, READY = 6, REFRESH = 7,
  SHUTDOWN = 8, FAILED = 9, RSET = JOIN, BSET = POSTJOIN
}
- Protected Types inherited from BaseOverlay
typedef UNORDERED_SET
< AbstractLookup
*, lookupHashFcn,
lookupHashFcn
LookupSet

Detailed Description

Definition at line 33 of file PubSubMMOG.h.

Constructor & Destructor Documentation

PubSubMMOG::~PubSubMMOG ( )
virtual

Definition at line 2006 of file PubSubMMOG.cc.

{
// Delete all waiting move messages
std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
for( it = responsibleSubspaces.begin(); it != responsibleSubspaces.end(); ++it) {
deque<PubSubMoveMessage*>::iterator msgIt;
for( msgIt = it->second.waitingMoveMessages.begin(); msgIt != it->second.waitingMoveMessages.end(); ++msgIt ){
cancelAndDelete( *msgIt );
}
it->second.waitingMoveMessages.clear();
}
cancelAndDelete(heartbeatTimer);
}

Member Function Documentation

void PubSubMMOG::finishOverlay ( )
virtual

collects statistical data in derived class

Reimplemented from BaseOverlay.

Definition at line 1977 of file PubSubMMOG.cc.

{
if (time < GlobalStatistics::MIN_MEASURED) return;
globalStatistics->addStdDev("PubSubMMOG: Sent Signaling Messages/s",
globalStatistics->addStdDev("PubSubMMOG: Sent Signaling bytes/s",
globalStatistics->addStdDev("PubSubMMOG: Sent Move Messages/s",
numMoveMessages / time);
globalStatistics->addStdDev("PubSubMMOG: Sent Move bytes/s",
globalStatistics->addStdDev("PubSubMMOG: Sent MoveList Messages/s",
globalStatistics->addStdDev("PubSubMMOG: Sent MoveList bytes/s",
globalStatistics->addStdDev("PubSubMMOG: Received Move Events (correct timeslot)/s",
globalStatistics->addStdDev("PubSubMMOG: Received Move Events (wrong timeslot)/s",
globalStatistics->addStdDev("PubSubMMOG: Responsible Nodes: Send MoveList Bytes/s",
globalStatistics->addStdDev("PubSubMMOG: Lost or too long delayed MoveLists/s",
globalStatistics->addStdDev("PubSubMMOG: Received valid MoveLists/s",
}
void PubSubMMOG::handleAdoptChildCall ( PubSubAdoptChildCall adoptCall)
protected

Definition at line 1026 of file PubSubMMOG.cc.

{
std::map<PubSubSubspaceId, PubSubSubspaceIntermediate>::iterator it;
if( it == intermediateSubspaces.end() ) {
EV << "[PubSubMMOG::handleAdoptChildCall() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Received Adopt Child Call for unknown Subspace!\n"
<< endl;
cancelAndDelete( adoptCall );
return;
}
it->second.addChild( adoptCall->getChild() );
PubSubAdoptChildResponse* adoptResp = new PubSubAdoptChildResponse("I adopted child");
adoptResp->setSubspaceId( adoptCall->getSubspaceId() );
adoptResp->setChild( adoptCall->getChild() );
adoptResp->setBitLength( PUBSUB_ADOPTCHILDRESPONSE_L( adoptResp ));
pubSubSignalingMessagesSize+= adoptResp->getByteLength()
);
sendRpcResponse( adoptCall, adoptResp );
}
void PubSubMMOG::handleAdoptChildResponse ( PubSubAdoptChildResponse adoptResp)
protected

Definition at line 1051 of file PubSubMMOG.cc.

{
std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
if( it == responsibleSubspaces.end() ) {
EV << "[PubSubMMOG::handleAdoptChildResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Received AdoptChild Response for unknown Subspace!\n"
<< endl;
return;
}
// FIXME: just for testing
PubSubSubspaceResponsible& subspace = it->second;
int iii = subspace.getTotalChildrenCount();
if( iii != subspace.getTotalChildrenCount() ){
opp_error("Huh?");
}
// Find intermediate node in subspace
deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
for( iit = it->second.intermediateNodes.begin(); iit != it->second.intermediateNodes.end(); ++iit ){
if( !iit->node.isUnspecified() && iit->node == adoptResp->getSrcNode() ){
// if adoption was for a child intermediate node, nothing is to be done
int intermediatePos = iit - it->second.intermediateNodes.begin();
for( int pos = (intermediatePos+1) * maxChildren; pos < (int) it->second.intermediateNodes.size() &&
pos < (intermediatePos+2) * maxChildren; ++pos )
{
if( !it->second.intermediateNodes[pos].node.isUnspecified() &&
adoptResp->getChild() == it->second.intermediateNodes[pos].node ){
return;
}
}
// child is a "real" child->remove it from cache
if( !it->second.cachedChildren.erase( adoptResp->getChild() ) ){
// if node got deleted in the meantime, inform parent...
PubSubNodeLeftMessage* goneMsg = new PubSubNodeLeftMessage("Node left Subspace");
goneMsg->setNode( adoptResp->getChild() );
goneMsg->setSubspaceId( it->second.getId().getId() );
goneMsg->setBitLength( PUBSUB_NODELEFT_L( goneMsg ));
pubSubSignalingMessagesSize+= goneMsg->getByteLength()
);
sendMessageToUDP( adoptResp->getSrcNode(), goneMsg );
return;
}
// move child to intermediate node's childrenlist
if( !iit->children.insert( adoptResp->getChild() ).second ){
// Node was already in children list, fix children count
}
iit->waitingChildren--;
// FIXME: just for testing
PubSubSubspaceResponsible& subspace = it->second;
int iii = subspace.getTotalChildrenCount();
if( iii != subspace.getTotalChildrenCount() ){
opp_error("Huh?");
}
// Inform Backup
if( !it->second.getBackupNode().isUnspecified() ){
PubSubBackupSubscriptionMessage* backupMsg = new PubSubBackupSubscriptionMessage("Backup: node got a new parent");
backupMsg->setSubspaceId( adoptResp->getSubspaceId() );
backupMsg->setChild( adoptResp->getChild() );
backupMsg->setParent( adoptResp->getSrcNode() );
backupMsg->setBitLength( PUBSUB_BACKUPSUBSCRIPTION_L( backupMsg ));
pubSubSignalingMessagesSize+= backupMsg->getByteLength()
);
sendMessageToUDP( it->second.getBackupNode(), backupMsg );
return;
}
}
}
EV << "[PubSubMMOG::handleAdoptChildResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Received AdoptChild Response for unknown child!\n"
<< endl;
}
void PubSubMMOG::handleAppMessage ( cMessage *  msg)
virtual

Processes "timer" self-messages.

Parameters
msgA self-message Processes non-commonAPI messages
msgnon-commonAPIMessage

Reimplemented from BaseOverlay.

Definition at line 315 of file PubSubMMOG.cc.

{
if( GameAPIPositionMessage *posMsg = dynamic_cast<GameAPIPositionMessage*>(msg) ) {
if( state == READY ) {
handleMove( posMsg );
} else if ( state == JOIN ) {
// We are not connected to our responsible node, inform app
CompReadyMessage* msg = new CompReadyMessage("Overlay not READY!");
msg->setReady(false);
send( msg, "appOut");
} else if ( state == INIT ) {
// This is only called for the first MOVE message
// Trigger login
PubSubJoinCall* joinMsg = new PubSubJoinCall("Login");
joinMsg->setPosition( posMsg->getPosition() );
// FIXME: Ressource handling not yet supported!
joinMsg->setRessources( 4 );
// tell app to wait until login is confirmed...
CompReadyMessage* readyMsg = new CompReadyMessage("Overlay not READY!");
readyMsg->setReady(false);
readyMsg->setComp(getThisCompType());
send( readyMsg, "appOut");
currentRegionX = (unsigned int) (posMsg->getPosition().x/subspaceSize);
currentRegionY = (unsigned int) (posMsg->getPosition().y/subspaceSize);
}
delete msg;
}
}
void PubSubMMOG::handleBackupCall ( PubSubBackupCall backupCall)
protected

Definition at line 799 of file PubSubMMOG.cc.

{
int intId = backupCall->getSubspaceId();
PubSubSubspaceId subspaceId(intId, numSubspaces);
// Start Heartbeat Timer
PubSubTimer* parentTimeout = new PubSubTimer("ParentTimeout");
parentTimeout->setType( PUBSUB_PARENT_TIMEOUT );
parentTimeout->setSubspaceId( intId );
startTimer( parentTimeout );
// insert subspace into responsible list
PubSubSubspaceResponsible subspace( subspaceId );
subspace.setResponsibleNode( backupCall->getSrcNode() );
subspace.setHeartbeatTimer( parentTimeout );
// recounstruct load balancing tree
for( unsigned int i = 0; i < backupCall->getIntermediatesArraySize(); ++i ){
iNode.node = backupCall->getIntermediates(i);
subspace.intermediateNodes.push_back( iNode );
}
for( unsigned int i = 0; i < backupCall->getChildrenArraySize(); ++i ){
int pos = backupCall->getChildrenPos( i );
if( pos == -2 ){
subspace.cachedChildren.insert( make_pair( backupCall->getChildren(i), false ));
} else if( pos == -1 ){
subspace.children.insert( backupCall->getChildren(i) );
} else {
subspace.intermediateNodes[pos].children.insert( backupCall->getChildren(i) );
}
}
backupSubspaces.insert( make_pair(subspaceId, subspace) );
PubSubBackupResponse* backupResp = new PubSubBackupResponse("I'll be your backup");
backupResp->setSubspaceId( intId );
backupResp->setBitLength( PUBSUB_BACKUPRESPONSE_L( backupResp ));
pubSubSignalingMessagesSize+= backupResp->getByteLength()
);
sendRpcResponse( backupCall, backupResp );
}
void PubSubMMOG::handleBackupCallTimeout ( PubSubBackupCall backupCall,
const TransportAddress oldNode 
)
protected

Definition at line 1288 of file PubSubMMOG.cc.

{
// FIXME: cast oldNode to NodeHandle
// Inform Lobbyserver over failed node
PubSubFailedNodeMessage* failedNode = new PubSubFailedNodeMessage("Node failed");
failedNode->setFailedNode( oldNode );
failedNode->setBitLength( PUBSUB_FAILEDNODE_L( failedNode ));
pubSubSignalingMessagesSize+= failedNode->getByteLength()
);
sendMessageToUDP( lobbyServer, failedNode );
// Request new Backup
PubSubHelpCall* helpCall = new PubSubHelpCall("I need a backup node");
helpCall->setHelpType( PUBSUB_BACKUP );
helpCall->setSubspaceId( backupCall->getSubspaceId() );
helpCall->setBitLength( PUBSUB_HELPCALL_L( helpCall ));
pubSubSignalingMessagesSize+= helpCall->getByteLength()
);
// find appropriate subspace and mark backup as failed
PubSubSubspaceId subspaceId(backupCall->getSubspaceId(), numSubspaces);
std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
it = responsibleSubspaces.find( subspaceId );
if( it == responsibleSubspaces.end() ) {
return;
}
it->second.setBackupNode( NodeHandle::UNSPECIFIED_NODE );
}
void PubSubMMOG::handleBackupResponse ( PubSubBackupResponse backupResp)
protected

Definition at line 844 of file PubSubMMOG.cc.

{
// Nothing to be done
// HandleHelpResponse() already did everything important
}
void PubSubMMOG::handleHelpResponse ( PubSubHelpResponse helpResp)
protected

Definition at line 722 of file PubSubMMOG.cc.

{
// lobby server answered our call for help
// (i.e. he sends us a candidate for backup/intermediate nodes
if( helpResp->getHelpType() == PUBSUB_BACKUP ){
PubSubBackupCall* backupCall = new PubSubBackupCall("Become my backup node!");
backupCall->setSubspaceId( helpResp->getSubspaceId() );
// Find the subspace in the subspace map
std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
if( it == responsibleSubspaces.end() ){
EV << "[PubSubMMOG::handleHelpResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " received helpResponse for unknown subspace" << helpResp->getSubspaceId() << "\n"
<< endl;
return;
}
PubSubSubspaceResponsible& subspace = it->second;
// Assume the new backup will not refuse his task
subspace.setBackupNode( helpResp->getNode() );
// FIXME: just for testing
int iii = subspace.getTotalChildrenCount();
if( iii != subspace.getTotalChildrenCount() ){
opp_error("Huh?");
}
// backup the load balancing tree
backupCall->setChildrenArraySize( subspace.getTotalChildrenCount() );
backupCall->setIntermediatesArraySize( subspace.intermediateNodes.size() );
set<NodeHandle>::iterator childIt;
map<NodeHandle, bool>::iterator childMapIt;
unsigned int i = 0;
for( childMapIt = subspace.cachedChildren.begin(); childMapIt != subspace.cachedChildren.end(); ++childMapIt ){
backupCall->setChildren(i, childMapIt->first);
backupCall->setChildrenPos(i, -2);
++i;
}
for( childIt = subspace.children.begin(); childIt != subspace.children.end(); ++childIt ){
backupCall->setChildren(i, *childIt);
backupCall->setChildrenPos(i, -1);
++i;
}
for( unsigned int ii = 0; ii < subspace.intermediateNodes.size(); ++ii ){
backupCall->setIntermediates(ii, iNode.node);
for( childIt = iNode.children.begin(); childIt != iNode.children.end(); ++childIt ){
backupCall->setChildren(i, *childIt);
backupCall->setChildrenPos(i, ii);
++i;
}
}
backupCall->setBitLength( PUBSUB_BACKUPCALL_L( backupCall ));
pubSubSignalingMessagesSize+= backupCall->getByteLength()
);
sendUdpRpcCall( helpResp->getNode(), backupCall );
} else if( helpResp->getHelpType() == PUBSUB_INTERMEDIATE ){
PubSubIntermediateCall* intermediateCall = new PubSubIntermediateCall("Become my intermediate node!");
intermediateCall->setSubspaceId( helpResp->getSubspaceId() );
intermediateCall->setBitLength( PUBSUB_INTERMEDIATECALL_L( intermediateCall ));
pubSubSignalingMessagesSize+= intermediateCall->getByteLength()
);
sendUdpRpcCall( helpResp->getNode(), intermediateCall );
}
}
void PubSubMMOG::handleIntermediateBackup ( PubSubBackupIntermediateMessage backupMsg)
protected

Definition at line 1512 of file PubSubMMOG.cc.

{
// find appropriate subspace
PubSubSubspaceId subspaceId(backupMsg->getSubspaceId(), numSubspaces);
std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
it = backupSubspaces.find( subspaceId );
if( it == backupSubspaces.end() ) {
return;
}
if( backupMsg->getPos() >= (int) it->second.intermediateNodes.size() ){
it->second.intermediateNodes.resize( backupMsg->getPos() + 1 );
}
it->second.intermediateNodes[ backupMsg->getPos() ].node = backupMsg->getNode();
}
void PubSubMMOG::handleIntermediateCall ( PubSubIntermediateCall intermediateCall)
protected

Definition at line 850 of file PubSubMMOG.cc.

{
// insert subspace into intermediate list
PubSubSubspaceId subspaceId(intermediateCall->getSubspaceId(), numSubspaces);
PubSubSubspaceIntermediate subspace( subspaceId );
subspace.setResponsibleNode( intermediateCall->getSrcNode() );
subspace.setTimestamp(0);
intermediateSubspaces.insert( make_pair(subspaceId, subspace) );
PubSubIntermediateResponse* iResp = new PubSubIntermediateResponse("I'll be your intermediate node");
iResp->setSubspaceId( intermediateCall->getSubspaceId() );
iResp->setBitLength( PUBSUB_INTERMEDIATERESPONSE_L( iResp ));
pubSubSignalingMessagesSize+= iResp->getByteLength()
);
sendRpcResponse( intermediateCall, iResp );
}
void PubSubMMOG::handleIntermediateResponse ( PubSubIntermediateResponse intermediateResp)
protected

Definition at line 869 of file PubSubMMOG.cc.

{
// we found a new intermediate node for a subspace
std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
if( it == responsibleSubspaces.end() ) {
EV << "[PubSubMMOG::handleIntermediateResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Received Intermediate Response for unknown Subspace!\n"
<< endl;
return;
}
PubSubSubspaceResponsible& subspace = it->second;
iNode.node = intermediateResp->getSrcNode();
// if there is any broken intermediate node in list, replace it
bool newIntermediate = true;
deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
for( iit = subspace.intermediateNodes.begin(); iit != subspace.intermediateNodes.end(); ++iit ){
if( iit->node.isUnspecified() ){
iit->node = iNode.node;
newIntermediate = false;
break;
}
}
if( iit == subspace.intermediateNodes.end() ){
subspace.intermediateNodes.push_back( iNode );
iit = subspace.intermediateNodes.end() - 1;
}
// inform Backup
if( !subspace.getBackupNode().isUnspecified() ){
PubSubBackupIntermediateMessage* backupMsg = new PubSubBackupIntermediateMessage("Backup: new Intermediate");
backupMsg->setSubspaceId( intermediateResp->getSubspaceId() );
backupMsg->setNode( iNode.node );
backupMsg->setPos( iit - subspace.intermediateNodes.begin() );
backupMsg->setBitLength( PUBSUB_BACKUPINTERMEDIATE_L( backupMsg ));
pubSubSignalingMessagesSize+= backupMsg->getByteLength()
);
sendMessageToUDP( subspace.getBackupNode(), backupMsg );
}
// if needed, send adopt to parent
int intermediatePos = iit - subspace.intermediateNodes.begin();
int parentPos = intermediatePos/maxChildren -1;
if( parentPos >= 0 && !subspace.intermediateNodes[parentPos].node.isUnspecified() ){
PubSubAdoptChildCall* adoptCall = new PubSubAdoptChildCall("Adopt (intermediate) Node");
adoptCall->setSubspaceId( intermediateResp->getSubspaceId() );
adoptCall->setChild( iit->node );
adoptCall->setBitLength( PUBSUB_ADOPTCHILDCALL_L( adoptCall ));
pubSubSignalingMessagesSize+= adoptCall->getByteLength()
);
sendUdpRpcCall( subspace.intermediateNodes[parentPos].node, adoptCall );
}
if( newIntermediate ){
// move one child from iNodes's parent to cache
if( parentPos >= 0 ) {
// parent is an intermediate node
if( parent.children.begin() != parent.children.end() ){
bool fixNeeded = false;
if( !subspace.cachedChildren.insert( make_pair( *(parent.children.begin()), false )).second ){
fixNeeded = true;
}
if( !subspace.getBackupNode().isUnspecified() ){
PubSubBackupSubscriptionMessage* backupMsg = new PubSubBackupSubscriptionMessage("Backup: nodes moved to cache");
backupMsg->setSubspaceId( intermediateResp->getSubspaceId() );
backupMsg->setChild( *(parent.children.begin()) );
backupMsg->setOldParent( parent.node );
backupMsg->setBitLength( PUBSUB_BACKUPSUBSCRIPTION_L( backupMsg ));
pubSubSignalingMessagesSize+= backupMsg->getByteLength()
);
sendMessageToUDP( subspace.getBackupNode(), backupMsg );
}
PubSubNodeLeftMessage* goneMsg = new PubSubNodeLeftMessage("Node left: moved");
goneMsg->setNode( *(parent.children.begin()) );
goneMsg->setSubspaceId( intermediateResp->getSubspaceId() );
goneMsg->setBitLength( PUBSUB_NODELEFT_L( goneMsg ));
pubSubSignalingMessagesSize+= goneMsg->getByteLength()
);
sendMessageToUDP( parent.node, goneMsg );
parent.children.erase( parent.children.begin() );
if( fixNeeded ){
}
}
} else {
// we are parent
if( subspace.children.begin() != subspace.children.end() ){
bool fixNeeded = false;
if( !subspace.cachedChildren.insert( make_pair( *(subspace.children.begin()), false )).second ){
fixNeeded = true;
}
if( !subspace.getBackupNode().isUnspecified() ){
PubSubBackupSubscriptionMessage* backupMsg = new PubSubBackupSubscriptionMessage("Backup: nodes moved to cache");
backupMsg->setSubspaceId( intermediateResp->getSubspaceId() );
backupMsg->setChild( *(subspace.children.begin()) );
backupMsg->setOldParent( thisNode );
backupMsg->setBitLength( PUBSUB_BACKUPSUBSCRIPTION_L( backupMsg ));
pubSubSignalingMessagesSize+= backupMsg->getByteLength()
);
sendMessageToUDP( subspace.getBackupNode(), backupMsg );
}
subspace.children.erase( *(subspace.children.begin()) );
if( fixNeeded ){
}
}
}
} else {
// send adopt for all children intermediates
for( int pos = (intermediatePos+1) * maxChildren; pos < (int) subspace.intermediateNodes.size() &&
pos < (intermediatePos+2) * maxChildren; ++pos ){
if( subspace.intermediateNodes[pos].node.isUnspecified() ) continue;
PubSubAdoptChildCall* adoptCall = new PubSubAdoptChildCall("Adopt (intermediate) Node");
adoptCall->setSubspaceId( intermediateResp->getSubspaceId() );
adoptCall->setChild( subspace.intermediateNodes[pos].node );
adoptCall->setBitLength( PUBSUB_ADOPTCHILDCALL_L( adoptCall ));
pubSubSignalingMessagesSize+= adoptCall->getByteLength()
);
sendUdpRpcCall( iit->node, adoptCall );
}
}
// move as many cached children to the new node as possible
std::map<NodeHandle,bool>::iterator childIt;
for( childIt = subspace.cachedChildren.begin(); childIt != subspace.cachedChildren.end(); ++childIt ){
if( childIt->second ) continue;
PubSubAdoptChildCall* adoptCall = new PubSubAdoptChildCall("Adopt Node");
adoptCall->setSubspaceId( intermediateResp->getSubspaceId() );
adoptCall->setChild( childIt->first );
adoptCall->setBitLength( PUBSUB_ADOPTCHILDCALL_L( adoptCall ));
pubSubSignalingMessagesSize+= adoptCall->getByteLength()
);
sendUdpRpcCall( intermediateResp->getSrcNode(), adoptCall );
childIt->second = true;
if( (unsigned int) maxChildren == ++(iit->waitingChildren) ) break;
}
}
void PubSubMMOG::handleJoinResponse ( PubSubJoinResponse joinResp)
protected

Definition at line 367 of file PubSubMMOG.cc.

{
NodeHandle respNode = joinResp->getResponsibleNode();
PubSubSubspace sub(region);
sub.setResponsibleNode( respNode );
subscribedSubspaces.push_back( sub );
if( respNode.isUnspecified() ) {
PubSubResponsibleNodeCall* respCall = new PubSubResponsibleNodeCall("Request Responsible NodeHandle");
respCall->setBitLength( PUBSUB_RESPONSIBLENODECALL_L( respCall ) );
pubSubSignalingMessagesSize+= respCall->getByteLength()
);
sendUdpRpcCall( lobbyServer, respCall, NULL, 5, 5 ); // FIXME: Make it a parameter...
} else {
PubSubSubscriptionCall* subCall = new PubSubSubscriptionCall("JoinSubspace");
subCall->setSubspaceId( region.getId() );
subCall->setBitLength( PUBSUB_SUBSCRIPTIONCALL_L( subCall ));
pubSubSignalingMessagesSize+= subCall->getByteLength()
);
sendUdpRpcCall( respNode, subCall );
}
}
void PubSubMMOG::handleMove ( GameAPIPositionMessage posMsg)
protected

Definition at line 541 of file PubSubMMOG.cc.

{
currentRegionX = (unsigned int) (posMsg->getPosition().x/subspaceSize);
currentRegionY = (unsigned int) (posMsg->getPosition().y/subspaceSize);
set<PubSubSubspaceId> expectedRegions;
int minX = (int) ((posMsg->getPosition().x - AOIWidth)/subspaceSize);
if( minX < 0 ) minX = 0;
int maxX = (int) ((posMsg->getPosition().x + AOIWidth)/subspaceSize);
if( maxX >= numSubspaces ) maxX = numSubspaces -1;
int minY = (int) ((posMsg->getPosition().y - AOIWidth)/subspaceSize);
if( minY < 0 ) minY = 0;
int maxY = (int) ((posMsg->getPosition().y + AOIWidth)/subspaceSize);
if( maxY >= numSubspaces ) maxY = numSubspaces -1;
// FIXME: make parameter: unsubscription size
int minUnsubX = (int) ((posMsg->getPosition().x - 1.5*AOIWidth)/subspaceSize);
if( minUnsubX < 0 ) minUnsubX = 0;
int maxUnsubX = (int) ((posMsg->getPosition().x + 1.5*AOIWidth)/subspaceSize);
if( maxUnsubX >= numSubspaces ) maxUnsubX = numSubspaces -1;
int minUnsubY = (int) ((posMsg->getPosition().y - 1.5*AOIWidth)/subspaceSize);
if( minUnsubY < 0 ) minUnsubY = 0;
int maxUnsubY = (int) ((posMsg->getPosition().y + 1.5+AOIWidth)/subspaceSize);
if( maxUnsubY >= numSubspaces ) maxUnsubY = numSubspaces -1;
for( int x = minX; x <= maxX; ++x ){
for( int y = minY; y <= maxY; ++y ){
expectedRegions.insert( PubSubSubspaceId( x, y, numSubspaces ));
}
}
list<PubSubSubspace>::iterator subIt = subscribedSubspaces.begin();
PubSubSubspace* subspace = NULL;
while( subIt != subscribedSubspaces.end() ){
if( subIt->getId() == region ){
subspace = &*subIt;
}
expectedRegions.erase( subIt->getId() );
// unsubscribe region if to far away
if( subIt->getId().getX() < minX || subIt->getId().getX() > maxX ||
subIt->getId().getY() < minY || subIt->getId().getY() > maxY ){
if( !subIt->getResponsibleNode().isUnspecified() ){
PubSubUnsubscriptionMessage* unsubMsg = new PubSubUnsubscriptionMessage("Unsubscribe from subspace");
unsubMsg->setSubspaceId( subIt->getId().getId() );
unsubMsg->setSrc( thisNode );
unsubMsg->setBitLength( PUBSUB_UNSUBSCRIPTION_L( unsubMsg ));
pubSubSignalingMessagesSize+= unsubMsg->getByteLength()
);
sendMessageToUDP( subIt->getResponsibleNode(), unsubMsg );
}
// Erase subspace from subscribedList and increase iterator
subscribedSubspaces.erase( subIt++ );
} else {
++subIt;
}
}
// if any "near" region is not yet subscribed, subscribe
for( set<PubSubSubspaceId>::iterator regionIt = expectedRegions.begin(); regionIt != expectedRegions.end(); ++regionIt ){
PubSubSubspace sub( *regionIt );
subscribedSubspaces.push_back( sub );
PubSubResponsibleNodeCall* respCall = new PubSubResponsibleNodeCall("Request Responsible NodeHandle");
respCall->setSubspacePos( Vector2D(regionIt->getX(), regionIt->getY()) );
respCall->setBitLength( PUBSUB_RESPONSIBLENODECALL_L( respCall ));
pubSubSignalingMessagesSize+= respCall->getByteLength()
);
sendUdpRpcCall( lobbyServer, respCall, NULL, 5, 5 ); // FIXME: Make it a parameter...
}
if( subspace && !subspace->getResponsibleNode().isUnspecified() ){
PubSubMoveMessage* moveMsg = new PubSubMoveMessage("Player move");
moveMsg->setSubspaceId( region.getId() );
moveMsg->setPlayer( thisNode );
moveMsg->setPosition( posMsg->getPosition() );
moveMsg->setTimestamp( simTime() );
moveMsg->setBitLength( PUBSUB_MOVE_L( moveMsg ));
moveMessagesSize+= moveMsg->getByteLength()
);
sendMessageToUDP( subspace->getResponsibleNode(), moveMsg );
} else {
// trying to move to not-yet subscribed region
// FIXME: change state to JOIN?
}
}
void PubSubMMOG::handleMoveListMessage ( PubSubMoveListMessage moveMsg)
protected

Definition at line 666 of file PubSubMMOG.cc.

{
simtime_t timestamp = moveMsg->getTimestamp();
// If I'm intermediate node for this subspace, forward message to children
std::map<PubSubSubspaceId, PubSubSubspaceIntermediate>::iterator it;
if( it != intermediateSubspaces.end() ){
// Forward only if the message has not already been forwarded
if( it->second.getLastTimestamp() < moveMsg->getTimestamp() ){
set<NodeHandle>::iterator childIt;
for( childIt = it->second.children.begin(); childIt != it->second.children.end(); ++childIt ){
sendMessageToUDP( *childIt, (BaseOverlayMessage*) moveMsg->dup() );
moveListMessagesSize+= moveMsg->getByteLength()
);
}
it->second.setTimestamp( timestamp );
}
}
// If I'm subscribed to the subspace, transfer a GameAPIMoveList to app
std::list<PubSubSubspace>::iterator subIt;
for( subIt = subscribedSubspaces.begin(); subIt != subscribedSubspaces.end(); ++subIt ){
if( subIt->getId().getId() == moveMsg->getSubspaceId() ){
if( subIt->getLastTimestamp() < moveMsg->getTimestamp() ){
GameAPIListMessage* moveList = new GameAPIListMessage("player position update");
for( unsigned int i = 0; i < moveMsg->getPlayerArraySize(); ++i ){
moveList->setAddNeighbor( i, moveMsg->getPlayer(i) );
moveList->setNeighborPosition( i, moveMsg->getPosition(i) );
globalStatistics->addStdDev("PubSubMMOG: MoveDelay",
SIMTIME_DBL(simTime() - timestamp + moveMsg->getPositionAge(i)) );
);
}
send( moveList, "appOut" );
if( timestamp < simTime() - maxMoveDelay ){
} else {
}
if( subIt->getLastTimestamp() != 0) lostMovementLists += (int)(SIMTIME_DBL(timestamp - subIt->getLastTimestamp())*movementRate -1);
);
subIt->setTimestamp( timestamp );
}
return;
}
}
}
void PubSubMMOG::handleMoveMessage ( PubSubMoveMessage moveMsg)
protected

Definition at line 635 of file PubSubMMOG.cc.

{
std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
if( it == responsibleSubspaces.end() ){
EV << "[PubSubMMOG::handleMoveMessage() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " received moveMessage for unknown subspace" << moveMsg->getSubspaceId() << "\n"
<< endl;
return;
}
// If message arrived in the correct timeslot, store move message until deadline
// Note: This assumes, we get no messages with future timestamps. At least in
// the simulation, this assumption will hold.
// The allowOldMoveMessages parameter allows overriding the timeslot barriers and forward all
// messages.
if( allowOldMoveMessages || moveMsg->getTimestamp() >= eventDeliveryTimer->getArrivalTime() - 1.0/(2*movementRate) ){
it->second.waitingMoveMessages.push_back( moveMsg );
} else {
EV << "[PubSubMMOG::handleMoveMessage() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " received moveMesage with Timestamp: " << moveMsg->getTimestamp() << "\n"
<< " deadline was: " << eventDeliveryTimer->getArrivalTime() - 1.0/(2*movementRate) << "\n"
<< endl;
cancelAndDelete( moveMsg );
}
}
void PubSubMMOG::handleNodeLeftMessage ( PubSubNodeLeftMessage leftMsg)
protected

Definition at line 432 of file PubSubMMOG.cc.

{
std::map<PubSubSubspaceId, PubSubSubspaceIntermediate>::iterator it;
if( it == intermediateSubspaces.end() ) return;
it->second.removeChild( leftMsg->getNode() );
}
void PubSubMMOG::handleParentTimeout ( PubSubTimer timer)
protected

Definition at line 1260 of file PubSubMMOG.cc.

{
// our parent timed out. we have to take over the subspace...
std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
it = backupSubspaces.find( subspaceId );
if( it == backupSubspaces.end() ) {
delete timer;
return;
}
// increase fail count; if to high, take over subspace
it->second.incHeartbeatFailCount();
if( it->second.getHeartbeatFailCount() > 1 ) { // FIXME: make it a parameter
// Delete Timer
cancelAndDelete( timer );
it->second.setHeartbeatTimer( NULL );
// Take over Subspace
takeOverSubspace( it->second );
backupSubspaces.erase( it );
} else {
startTimer( timer );
}
}
void PubSubMMOG::handlePingCall ( PubSubPingCall hearbeatCall)
protected

Definition at line 1140 of file PubSubMMOG.cc.

{
int subspaceId = pingCall->getSubspaceId();
if( pingCall->getPingType() == PUBSUB_PING_BACKUP ){
// reset heartbeat timer
std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
it = backupSubspaces.find( PubSubSubspaceId(pingCall->getSubspaceId(), numSubspaces) );
if( it == backupSubspaces.end() ) {
EV << "[PubSubMMOG::handlePingCall() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Received PingCall for unknown Subspace!\n"
<< endl;
// FIXME: Somebody thinks we are his backup. What shall we do?
} else {
it->second.resetHeartbeatFailCount();
startTimer( it->second.getHeartbeatTimer() );
}
}
PubSubPingResponse* pingResp = new PubSubPingResponse("PingResponse");
pingResp->setSubspaceId( subspaceId );
pingResp->setBitLength( PUBSUB_PINGRESPONSE_L( pingResp ));
pubSubSignalingMessagesSize+= pingResp->getByteLength()
);
sendRpcResponse( pingCall, pingResp );
}
void PubSubMMOG::handlePingCallTimeout ( PubSubPingCall pingCall,
const TransportAddress oldNode 
)
protected

Definition at line 1322 of file PubSubMMOG.cc.

{
// Inform Lobbyserver over failed node
const NodeHandle& oldNodeHandle = dynamic_cast<const NodeHandle&>(oldNode);
// FIXME: use oldNodeHandle instead of oldNode
PubSubFailedNodeMessage* failedNode = new PubSubFailedNodeMessage("Node failed");
failedNode->setBitLength( PUBSUB_FAILEDNODE_L( failedNode ));
pubSubSignalingMessagesSize+= failedNode->getByteLength()
);
failedNode->setFailedNode( oldNode );
sendMessageToUDP( lobbyServer, failedNode );
// find appropriate subspace
PubSubSubspaceId subspaceId(pingCall->getSubspaceId(), numSubspaces);
std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
it = responsibleSubspaces.find( subspaceId );
if( it == responsibleSubspaces.end() ) {
return;
}
PubSubSubspaceResponsible& subspace = it->second;
if( pingCall->getPingType() == PUBSUB_PING_CHILD ){
// remove child
unsubscribeChild( oldNodeHandle, subspace );
} else if( pingCall->getPingType() == PUBSUB_PING_BACKUP ){
// if we didn't already have (or asked for) a new backup
if( !subspace.getBackupNode().isUnspecified() &&
subspace.getBackupNode() == oldNodeHandle )
{
// our backup timed out. we have to request a new one...
// delete old backup entry
// Request new Backup
PubSubHelpCall* helpCall = new PubSubHelpCall("I need a backup node");
helpCall->setHelpType( PUBSUB_BACKUP );
helpCall->setSubspaceId( pingCall->getSubspaceId() );
helpCall->setBitLength( PUBSUB_HELPCALL_L( helpCall ));
pubSubSignalingMessagesSize+= helpCall->getByteLength()
);
}
} else if( pingCall->getPingType() == PUBSUB_PING_INTERMEDIATE ){
// one intermediate node timed out. we have to request a new one...
// delete old intermediate entry
deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
for( iit = subspace.intermediateNodes.begin(); iit != subspace.intermediateNodes.end(); ++iit ){
if( !iit->node.isUnspecified() && oldNode == iit->node ) break;
}
if( iit == subspace.intermediateNodes.end() ) return;
NodeHandle oldNode = iit->node;
// inform Backup
if( !subspace.getBackupNode().isUnspecified() ){
PubSubBackupIntermediateMessage* backupMsg = new PubSubBackupIntermediateMessage("Backup: Intermediate failed");
backupMsg->setSubspaceId( pingCall->getSubspaceId() );
backupMsg->setPos( iit - it->second.intermediateNodes.begin() );
backupMsg->setBitLength( PUBSUB_BACKUPINTERMEDIATE_L( backupMsg ));
pubSubSignalingMessagesSize+= backupMsg->getByteLength()
);
sendMessageToUDP( subspace.getBackupNode(), backupMsg );
}
bool fixNeeded = false;
// Take over all children until new intermediate is found.
set<NodeHandle>::iterator childIt;
for( childIt = iit->children.begin(); childIt != iit->children.end(); ++childIt ){
if( !subspace.cachedChildren.insert( make_pair(*childIt, false)).second ){
fixNeeded = true;
}
// Inform Backup
if( !subspace.getBackupNode().isUnspecified() ){
PubSubBackupSubscriptionMessage* backupMsg = new PubSubBackupSubscriptionMessage("Backup: nodes moved to cache");
backupMsg->setSubspaceId( pingCall->getSubspaceId() );
backupMsg->setChild( *childIt );
backupMsg->setOldParent( oldNodeHandle );
backupMsg->setBitLength( PUBSUB_BACKUPSUBSCRIPTION_L( backupMsg ));
pubSubSignalingMessagesSize+= backupMsg->getByteLength()
);
sendMessageToUDP( subspace.getBackupNode(), backupMsg );
}
}
iit->children.clear();
if( fixNeeded ) subspace.fixTotalChildrenCount();
// inform parent of intermediate node
int parentPos = (iit - subspace.intermediateNodes.begin())/maxChildren -1;
if( parentPos >= 0 ){
if( !parent.node.isUnspecified() ){
PubSubNodeLeftMessage* goneMsg = new PubSubNodeLeftMessage("Intermediate left Subspace");
goneMsg->setNode( oldNodeHandle );
goneMsg->setSubspaceId( subspace.getId().getId() );
goneMsg->setBitLength( PUBSUB_NODELEFT_L( goneMsg ));
pubSubSignalingMessagesSize+= goneMsg->getByteLength()
);
sendMessageToUDP( parent.node, goneMsg );
}
}
// Request new Intermediate
PubSubHelpCall* helpCall = new PubSubHelpCall("I need an intermediate node");
helpCall->setSubspaceId( pingCall->getSubspaceId() );
helpCall->setBitLength( PUBSUB_HELPCALL_L( helpCall ));
pubSubSignalingMessagesSize+= helpCall->getByteLength()
);
}
// FIXME: just for testing
int iii = subspace.getTotalChildrenCount();
if( iii != subspace.getTotalChildrenCount() ){
opp_error("Huh?");
}
}
void PubSubMMOG::handlePingResponse ( PubSubPingResponse pingResp)
protected

Definition at line 1170 of file PubSubMMOG.cc.

{
}
void PubSubMMOG::handleReleaseIntermediate ( PubSubReleaseIntermediateMessage releaseMsg)
protected

Definition at line 1506 of file PubSubMMOG.cc.

{
PubSubSubspaceId subspaceId(releaseMsg->getSubspaceId(), numSubspaces);
intermediateSubspaces.erase( subspaceId );
}
void PubSubMMOG::handleReplacementMessage ( PubSubReplacementMessage replaceMsg)
protected

Definition at line 1484 of file PubSubMMOG.cc.

{
PubSubSubspaceId subspaceId(replaceMsg->getSubspaceId(), numSubspaces);
// There's a new responsible node for a subspace
// Replace the old one in the intermediateSubspaces map...
std::map<PubSubSubspaceId, PubSubSubspaceIntermediate>::iterator it;
it = intermediateSubspaces.find( subspaceId );
if( it != intermediateSubspaces.end() ) {
it->second.setResponsibleNode( replaceMsg->getNewResponsibleNode() );
}
// ... and in the subsribed subspaces list
std::list<PubSubSubspace>::iterator iit;
for( iit = subscribedSubspaces.begin(); iit != subscribedSubspaces.end(); ++iit ){
if( iit->getId() == subspaceId ) {
iit->setResponsibleNode( replaceMsg->getNewResponsibleNode() );
return;
}
}
}
void PubSubMMOG::handleResponsibleNodeResponse ( PubSubResponsibleNodeResponse subResp)
protected

Definition at line 398 of file PubSubMMOG.cc.

{
int subspaceId = subResp->getSubspaceId();
NodeHandle respNode = subResp->getResponsibleNode();
std::list<PubSubSubspace>::iterator it = subscribedSubspaces.begin();
while( it != subscribedSubspaces.end() ) {
if( it->getId().getId() == subspaceId) break;
++it;
}
if( it != subscribedSubspaces.end() ) {
it->setResponsibleNode( respNode );
PubSubSubscriptionCall* subCall = new PubSubSubscriptionCall("JoinSubspace");
subCall->setSubspaceId( subspaceId );
subCall->setBitLength( PUBSUB_SUBSCRIPTIONCALL_L( subCall ));
pubSubSignalingMessagesSize+= subCall->getByteLength()
);
sendUdpRpcCall( respNode, subCall );
}
}
bool PubSubMMOG::handleRpcCall ( BaseCallMessage msg)
virtual

Processes Remote-Procedure-Call invocation messages.


This method should be overloaded when the overlay provides RPC functionality.

Returns
true, if rpc has been handled

Reimplemented from BaseRpc.

Definition at line 103 of file PubSubMMOG.cc.

{
// delegate messages
RPC_DELEGATE( PubSubSubscription, handleSubscriptionCall );
RPC_DELEGATE( PubSubTakeOverSubspace, handleTakeOver );
RPC_DELEGATE( PubSubBackup, handleBackupCall );
RPC_DELEGATE( PubSubIntermediate, handleIntermediateCall );
RPC_DELEGATE( PubSubAdoptChild, handleAdoptChildCall );
RPC_DELEGATE( PubSubPing, handlePingCall );
return RPC_HANDLED;
}
void PubSubMMOG::handleRpcResponse ( BaseResponseMessage msg,
cPolymorphic *  context,
int  rpcId,
simtime_t  rtt 
)
virtual

This method is called if an RPC response has been received.

Parameters
msgThe response message.
contextPointer to an optional state object. The object has to be handled/deleted by the handleRpcResponse() code
rpcIdThe RPC id.
rttThe Round-Trip-Time of this RPC

Reimplemented from RpcListener.

Definition at line 119 of file PubSubMMOG.cc.

{
RPC_ON_RESPONSE( PubSubJoin ) {
handleJoinResponse( _PubSubJoinResponse );
EV << "[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Received a PubSubJoin RPC Response: id=" << rpcId << "\n"
<< " msg=" << *_PubSubJoinResponse << " rtt=" << rtt
<< endl;
break;
}
RPC_ON_RESPONSE( PubSubSubscription ) {
handleSubscriptionResponse( _PubSubSubscriptionResponse );
EV << "[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Received a PubSubSubscription RPC Response: id=" << rpcId << "\n"
<< " msg=" << *_PubSubSubscriptionResponse << " rtt=" << rtt
<< endl;
break;
}
RPC_ON_RESPONSE( PubSubResponsibleNode ) {
handleResponsibleNodeResponse( _PubSubResponsibleNodeResponse );
EV << "[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Received a PubSubResponsibleNode RPC Response: id=" << rpcId << "\n"
<< " msg=" << *_PubSubResponsibleNodeResponse << " rtt=" << rtt
<< endl;
break;
}
RPC_ON_RESPONSE( PubSubHelp ) {
handleHelpResponse( _PubSubHelpResponse );
EV << "[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Received a PubSubHelp RPC Response: id=" << rpcId << "\n"
<< " msg=" << *_PubSubHelpResponse << " rtt=" << rtt
<< endl;
break;
}
RPC_ON_RESPONSE( PubSubBackup ) {
handleBackupResponse( _PubSubBackupResponse );
EV << "[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Received a PubSubBackup RPC Response: id=" << rpcId << "\n"
<< " msg=" << *_PubSubBackupResponse << " rtt=" << rtt
<< endl;
break;
}
RPC_ON_RESPONSE( PubSubIntermediate ) {
handleIntermediateResponse( _PubSubIntermediateResponse );
EV << "[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Received a PubSubIntermediate RPC Response: id=" << rpcId << "\n"
<< " msg=" << *_PubSubIntermediateResponse << " rtt=" << rtt
<< endl;
break;
}
RPC_ON_RESPONSE( PubSubAdoptChild ) {
handleAdoptChildResponse( _PubSubAdoptChildResponse );
EV << "[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Received a PubSubAdoptChild RPC Response: id=" << rpcId << "\n"
<< " msg=" << *_PubSubAdoptChildResponse << " rtt=" << rtt
<< endl;
break;
}
RPC_ON_RESPONSE( PubSubPing ) {
handlePingResponse( _PubSubPingResponse );
EV << "[PubSubMMOG::handleRpcResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Received a PubSubPing RPC Response: id=" << rpcId << "\n"
<< " msg=" << *_PubSubPingResponse << " rtt=" << rtt
<< endl;
break;
}
}
void PubSubMMOG::handleRpcTimeout ( BaseCallMessage msg,
const TransportAddress dest,
cPolymorphic *  context,
int  rpcId,
const OverlayKey destKey 
)
virtual

This method is called if an RPC timeout has been reached.

Parameters
msgThe original RPC message.
destThe destination node
contextPointer to an optional state object. The object has to be handled/deleted by the handleRpcResponse() code
rpcIdThe RPC id.
destKeythe destination OverlayKey

Reimplemented from RpcListener.

Definition at line 199 of file PubSubMMOG.cc.

{
RPC_ON_CALL( PubSubBackup ) {
handleBackupCallTimeout( _PubSubBackupCall, dest );
EV << "[PubSubMMOG::handleRpcTimeout() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Backup RPC Call timed out: id=" << rpcId << "\n"
<< " msg=" << *_PubSubBackupCall
<< " oldNode=" << dest
<< endl;
break;
}
RPC_ON_CALL( PubSubPing ) {
handlePingCallTimeout( _PubSubPingCall, dest );
EV << "[PubSubMMOG::handleRpcTimeout() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Ping RPC Call timed out: id=" << rpcId << "\n"
<< " msg=" << *_PubSubPingCall
<< " oldNode=" << dest
<< endl;
break;
}
RPC_ON_CALL( PubSubSubscription ) {
handleSubscriptionCallTimeout( _PubSubSubscriptionCall, dest );
EV << "[PubSubMMOG::handleRpcTimeout() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Subscription RPC Call timed out: id=" << rpcId << "\n"
<< " msg=" << *_PubSubSubscriptionCall
<< " oldNode=" << dest
<< endl;
break;
}
// FIXME:
// AdoptCall missing!
// IntermediateCall missing!
// (ResponsibleNodeCall missing)
// (HelpCall missing)
}
void PubSubMMOG::handleSubscriptionBackup ( PubSubBackupSubscriptionMessage backupMsg)
protected

Definition at line 1528 of file PubSubMMOG.cc.

{
// Note: this funktion may break subspace's tatalChildrenCall
// You have to use fixTotalChildrenCount before using the subspace
// find appropriate subspace
PubSubSubspaceId subspaceId(backupMsg->getSubspaceId(), numSubspaces);
std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
it = backupSubspaces.find( subspaceId );
if( it == backupSubspaces.end() ) {
return;
}
PubSubSubspaceResponsible& subspace = it->second;
deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
if( !backupMsg->getOldParent().isUnspecified() ){
// oldParent set -> move child
if( backupMsg->getOldParent() == subspace.getResponsibleNode() ){
// direct child -> cache
subspace.removeChild( backupMsg->getChild() );
subspace.cachedChildren.insert(make_pair( backupMsg->getChild(), false) );
} else {
// from I -> chache
for( iit = subspace.intermediateNodes.begin(); iit != subspace.intermediateNodes.end(); ++iit ){
// if( !iit->node.isUnspecified() && iit->node == backupMsg->getOldParent() ){
iit->children.erase( backupMsg->getChild() );
// }
}
subspace.cachedChildren.insert(make_pair( backupMsg->getChild(), false) );
}
} else if( backupMsg->getParent().isUnspecified() ){
// parent not set -> new child to chache
subspace.cachedChildren.insert(make_pair( backupMsg->getChild(), false) );
} else if( backupMsg->getParent() == subspace.getResponsibleNode() ){
// new direct child
subspace.addChild( backupMsg->getChild() );
} else {
// move child from cache to intermediate
subspace.cachedChildren.erase( backupMsg->getChild() );
for( iit = subspace.intermediateNodes.begin(); iit != subspace.intermediateNodes.end(); ++iit ){
if( !iit->node.isUnspecified() && iit->node == backupMsg->getParent() ){
iit->children.insert( backupMsg->getChild() );
}
}
// FIXME: check for errors
}
}
void PubSubMMOG::handleSubscriptionCall ( PubSubSubscriptionCall subCall)
protected

Definition at line 442 of file PubSubMMOG.cc.

{
std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
if( it == responsibleSubspaces.end() ) {
resp = new PubSubSubscriptionResponse("Subscription failed");
resp->setFailed( true );
} else {
resp = new PubSubSubscriptionResponse("Subscription successful");
backupMsg = new PubSubBackupSubscriptionMessage("Backup: new subscription");
backupMsg->setSubspaceId( subCall->getSubspaceId() );
backupMsg->setChild( subCall->getSrcNode() );
if( it->second.addChild( subCall->getSrcNode() )) {
// We have still room for the child
backupMsg->setParent( thisNode );
} else {
// Child has to go to an intermediate node...
if( PubSubSubspaceResponsible::IntermediateNode* iNode = it->second.getNextFreeIntermediate() ){
// find intermediate node with free slots
PubSubAdoptChildCall* adoptCall = new PubSubAdoptChildCall("Adopt child");
adoptCall->setChild( subCall->getSrcNode() );
adoptCall->setSubspaceId( subCall->getSubspaceId() );
adoptCall->setBitLength( PUBSUB_ADOPTCHILDCALL_L( adoptCall ));
sendUdpRpcCall( iNode->node, adoptCall );
pubSubSignalingMessagesSize+= adoptCall->getByteLength()
);
iNode->waitingChildren++;
} else {
// no free slots available, create new intermediate node
// FIXME: when getting two subscriptions at once, we're requesting too many intermediates
PubSubHelpCall* helpCall = new PubSubHelpCall("I need an intermediate node");
helpCall->setSubspaceId( subCall->getSubspaceId() );
helpCall->setBitLength( PUBSUB_HELPCALL_L( helpCall ));
pubSubSignalingMessagesSize+= helpCall->getByteLength()
);
}
}
}
resp->setBitLength( PUBSUB_SUBSCRIPTIONRESPONSE_L( resp ));
sendRpcResponse( subCall, resp );
pubSubSignalingMessagesSize+= resp->getByteLength()
);
if( it == responsibleSubspaces.end() ) return;
// FIXME: just for testing
PubSubSubspaceResponsible& subspace = it->second;
int iii = subspace.getTotalChildrenCount();
if( iii != subspace.getTotalChildrenCount() ){
opp_error("Huh?");
}
if( !it->second.getBackupNode().isUnspecified() ){
backupMsg->setBitLength( PUBSUB_BACKUPSUBSCRIPTION_L( backupMsg ));
pubSubSignalingMessagesSize+= backupMsg->getByteLength()
);
sendMessageToUDP( it->second.getBackupNode(), backupMsg );
} else {
delete backupMsg;
}
}
void PubSubMMOG::handleSubscriptionCallTimeout ( PubSubSubscriptionCall subscriptionCall,
const TransportAddress oldNode 
)
protected

Definition at line 1458 of file PubSubMMOG.cc.

{
// FIXME: cast oldNode to NodeHandle
// our subscription call timed out. This means the responsible node is dead...
// Inform Lobbyserver over failed node
PubSubFailedNodeMessage* failedNode = new PubSubFailedNodeMessage("Node failed");
failedNode->setFailedNode( oldNode );
failedNode->setBitLength( PUBSUB_FAILEDNODE_L( failedNode ));
pubSubSignalingMessagesSize+= failedNode->getByteLength()
);
sendMessageToUDP( lobbyServer, failedNode );
// Ask for new responsible node
PubSubResponsibleNodeCall* respCall = new PubSubResponsibleNodeCall("Request Responsible NodeHandle");
PubSubSubspaceId subspaceId( subscriptionCall->getSubspaceId(), numSubspaces);
respCall->setSubspacePos( Vector2D(subspaceId.getX(), subspaceId.getY()) );
respCall->setBitLength( PUBSUB_RESPONSIBLENODECALL_L( respCall ));
pubSubSignalingMessagesSize+= respCall->getByteLength()
);
sendUdpRpcCall( lobbyServer, respCall, NULL, 5, 5 ); // FIXME: Make it a parameter...
}
void PubSubMMOG::handleSubscriptionResponse ( PubSubSubscriptionResponse subResp)
protected

Definition at line 351 of file PubSubMMOG.cc.

{
if( subResp->getFailed() ) {
// TODO: get new resp node...
} else {
if( state != READY ){
CompReadyMessage* readyMsg = new CompReadyMessage("Overlay READY!");
readyMsg->setReady(true);
readyMsg->setComp(getThisCompType());
sendDelayed( readyMsg, ceil(simTime()) - simTime(), "appOut" );
}
}
}
void PubSubMMOG::handleTakeOver ( PubSubTakeOverSubspaceCall toCall)
protected

Definition at line 519 of file PubSubMMOG.cc.

{
PubSubSubspaceId region((int) toCall->getSubspacePos().x, (int) toCall->getSubspacePos().y, numSubspaces);
PubSubTakeOverSubspaceResponse* toResp = new PubSubTakeOverSubspaceResponse("Accept subspace responsibility");
toResp->setSubspacePos( toCall->getSubspacePos() );
toResp->setBitLength( PUBSUB_TAKEOVERSUBSPACERESPONSE_L( toResp ));
pubSubSignalingMessagesSize+= toResp->getByteLength()
);
sendRpcResponse( toCall, toResp );
}
void PubSubMMOG::handleTimerEvent ( cMessage *  msg)
virtual

Reimplemented from BaseRpc.

Definition at line 275 of file PubSubMMOG.cc.

{
if( PubSubTimer* timer = dynamic_cast<PubSubTimer*>(msg) ) {
switch( timer->getType() ) {
startTimer( timer );
break;
startTimer( timer );
break;
break;
startTimer( timer );
break;
}
} else if( msg == joinTimer ) {
// send a fake ready message to app to get initial position
// Note: This is not consistent to the paper, where the lobby server
// positions player. But it is needed for consistency with other MMOG protocols
CompReadyMessage* msg = new CompReadyMessage("fake READY");
msg->setReady(true);
send( msg, "appOut");
delete joinTimer;
joinTimer = NULL;
// send initial AOI size to the application
// Note: This is not consistent to the paper.
// But it is needed for this nodes movement generation within the application layer.
GameAPIResizeAOIMessage* gameMsg = new GameAPIResizeAOIMessage("RESIZE_AOI");
gameMsg->setAOIsize(AOIWidth);
send( gameMsg, "appOut");
}
}
void PubSubMMOG::handleUDPMessage ( BaseOverlayMessage msg)
virtual

Processes messages from underlay.

Parameters
msgMessage from UDP

Reimplemented from BaseOverlay.

Definition at line 244 of file PubSubMMOG.cc.

{
if( PubSubMoveListMessage* moveMsg = dynamic_cast<PubSubMoveListMessage*>(msg) ){
delete moveMsg;
} else if( PubSubMoveMessage* moveMsg = dynamic_cast<PubSubMoveMessage*>(msg) ){
handleMoveMessage( moveMsg );
} else if( PubSubUnsubscriptionMessage* unsMsg = dynamic_cast<PubSubUnsubscriptionMessage*>(msg) ){
delete unsMsg;
} else if( PubSubNodeLeftMessage* leftMsg = dynamic_cast<PubSubNodeLeftMessage*>(msg) ){
delete leftMsg;
} else if( PubSubReplacementMessage* replaceMsg = dynamic_cast<PubSubReplacementMessage*>(msg) ){
handleReplacementMessage( replaceMsg );
delete replaceMsg;
} else if( PubSubBackupSubscriptionMessage* backupMsg = dynamic_cast<PubSubBackupSubscriptionMessage*>(msg) ){
delete backupMsg;
} else if( PubSubBackupUnsubscribeMessage* backupMsg = dynamic_cast<PubSubBackupUnsubscribeMessage*>(msg) ){
delete backupMsg;
} else if( PubSubBackupIntermediateMessage* backupMsg = dynamic_cast<PubSubBackupIntermediateMessage*>(msg) ){
delete backupMsg;
} else if( PubSubReleaseIntermediateMessage* releaseMsg = dynamic_cast<PubSubReleaseIntermediateMessage*>(msg) ){
delete releaseMsg;
}
}
void PubSubMMOG::handleUnsubscribeBackup ( PubSubBackupUnsubscribeMessage backupMsg)
protected

Definition at line 1579 of file PubSubMMOG.cc.

{
// Note: this funktion may break subspace's tatalChildrenCall
// You have to use fixTotalChildrenCount before using the subspace
// find appropriate subspace
PubSubSubspaceId subspaceId(backupMsg->getSubspaceId(), numSubspaces);
std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
it = backupSubspaces.find( subspaceId );
if( it == backupSubspaces.end() ) {
return;
}
PubSubSubspaceResponsible& subspace = it->second;
deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
set<NodeHandle>::iterator childIt;
if( !subspace.removeChild(backupMsg->getChild()) && !subspace.cachedChildren.erase( backupMsg->getChild()) ){
for( iit = subspace.intermediateNodes.begin(); iit != subspace.intermediateNodes.end(); ++iit ){
iit->children.erase( backupMsg->getChild() );
}
}
if( !backupMsg->getIntermediate().isUnspecified() ){
// remove intermediate
for( iit = subspace.intermediateNodes.begin(); iit != subspace.intermediateNodes.end(); ++iit ){
if( !iit->node.isUnspecified() && iit->node == backupMsg->getIntermediate() ){
for( childIt = iit->children.begin(); childIt != iit->children.end(); ++childIt ){
// FIXME: note really stable. let the resp node inform us about child moves
// remove children of last intermediate
if( subspace.getNumChildren() + subspace.getNumIntermediates() < maxChildren ){
// we have room for the child
subspace.children.insert( *childIt );
} else {
// Node has to go to some intermediate
// cache it
subspace.cachedChildren.insert( make_pair(*childIt, true) );
}
}
subspace.intermediateNodes.erase( iit );
break;
}
}
}
}
void PubSubMMOG::handleUnsubscriptionMessage ( PubSubUnsubscriptionMessage unsMsg)
protected

Definition at line 422 of file PubSubMMOG.cc.

{
std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
if( it != responsibleSubspaces.end() ) {
unsubscribeChild( unsMsg->getSrc(), it->second );
}
}
void PubSubMMOG::initializeOverlay ( int  stage)
virtual

Initializes derived-class-attributes.


Initializes derived-class-attributes, called by BaseOverlay::initialize(). By default this method is called once. If more stages are needed one can overload numInitStages() and add more stages.

Parameters
stagethe init stage

Reimplemented from BaseOverlay.

Definition at line 36 of file PubSubMMOG.cc.

{
// because of IPAddressResolver, we need to wait until interfaces are registered,
// address auto-assignment takes place etc.
if(stage != MIN_STAGE_OVERLAY) return;
// TODO: use BootstrapList instead (but this currently preferes
// nodes from the same partition)
joinTimer = new cMessage("join timer");
simtime_t joinTime = ceil(simTime() + (simtime_t) par("joinDelay"));
scheduleAt( joinTime, joinTimer );
movementRate = par("movementRate");
eventDeliveryTimer = new PubSubTimer("event delivery timer");
scheduleAt( joinTime + 1.0/(2*movementRate), eventDeliveryTimer );
numSubspaces = par("numSubspaces");
subspaceSize = (int) ( (unsigned int) par("areaDimension") / numSubspaces);
maxChildren = par("maxChildren");
AOIWidth = par("AOIWidth");
maxMoveDelay = par("maxMoveDelay");
parentTimeout = par("parentTimeout");
heartbeatTimer = new PubSubTimer("HeartbeatTimer");
childPingTimer = new PubSubTimer("ChildPingTimer");
allowOldMoveMessages = par("allowOldMoveMessages");
WATCH( numMoveMessages );
WATCH( moveMessagesSize );
WATCH_LIST( subscribedSubspaces );
WATCH_MAP( responsibleSubspaces );
WATCH_MAP( backupSubspaces );
WATCH_MAP( intermediateSubspaces );
}
void PubSubMMOG::publishEvents ( )
protected

Definition at line 1851 of file PubSubMMOG.cc.

{
// FOr all (responsible) subspaces
int numRespSubspaces = responsibleSubspaces.size();
std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
for( it = responsibleSubspaces.begin(); it != responsibleSubspaces.end(); ++it ){
PubSubSubspaceResponsible& subspace = it->second;
// Prepare a movement list message aggregating all stored move messages
PubSubMoveListMessage* moveList = new PubSubMoveListMessage("Movement list");
moveList->setTimestamp( simTime() );
moveList->setSubspaceId( subspace.getId().getId() );
moveList->setPlayerArraySize( subspace.waitingMoveMessages.size() );
moveList->setPositionArraySize( subspace.waitingMoveMessages.size() );
moveList->setPositionAgeArraySize( subspace.waitingMoveMessages.size() );
std::deque<PubSubMoveMessage*>::iterator msgIt;
int pos = 0;
for( msgIt = subspace.waitingMoveMessages.begin(); msgIt != subspace.waitingMoveMessages.end(); ++msgIt ){
moveList->setPlayer( pos, (*msgIt)->getPlayer() );
moveList->setPosition( pos, (*msgIt)->getPosition() );
moveList->setPositionAge( pos, simTime() - (*msgIt)->getCreationTime() );
pos++;
cancelAndDelete( *msgIt );
}
subspace.waitingMoveMessages.clear();
moveList->setBitLength( PUBSUB_MOVELIST_L( moveList ));
// Send message to all direct children...
for( set<NodeHandle>::iterator childIt = subspace.children.begin();
childIt != subspace.children.end(); ++childIt )
{
moveListMessagesSize+= moveList->getByteLength();
respMoveListMessagesSize+= (int)((double) moveList->getByteLength() / numRespSubspaces)
);
sendMessageToUDP( *childIt, (BaseOverlayMessage*) moveList->dup() );
}
//... all cached children (if messages are not too big) ...
if( moveList->getByteLength() < 1024 ){ // FIXME: magic number. make it a parameter, or dependant on the available bandwidth
for( map<NodeHandle, bool>::iterator childIt = subspace.cachedChildren.begin();
childIt != subspace.cachedChildren.end(); ++childIt )
{
moveListMessagesSize+= moveList->getByteLength();
respMoveListMessagesSize+= (int)((double) moveList->getByteLength() / numRespSubspaces)
);
sendMessageToUDP( childIt->first, (BaseOverlayMessage*) moveList->dup() );
// ... but don't send msgs to too many cached children, as this would exhaust our bandwidth
}
}
// ... all direct intermediates and intermediates with broken parent
deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
for( iit = subspace.intermediateNodes.begin(); iit != subspace.intermediateNodes.end(); ++iit )
{
int intermediatePos = iit - subspace.intermediateNodes.begin();
if( intermediatePos >= maxChildren &&
!subspace.intermediateNodes[intermediatePos/maxChildren -1].node.isUnspecified() ) continue;
if( !iit->node.isUnspecified() ) {
moveListMessagesSize+= moveList->getByteLength();
respMoveListMessagesSize+= (int)((double) moveList->getByteLength() / numRespSubspaces)
);
sendMessageToUDP( iit->node, (BaseOverlayMessage*) moveList->dup() );
}
}
delete moveList;
}
}
void PubSubMMOG::receiveChangeNotification ( int  category,
const cPolymorphic *  details 
)
virtual

callback-method for events at the NotificationBoard

Parameters
category... TODO ...
details... TODO ...

Reimplemented from BaseOverlay.

Definition at line 535 of file PubSubMMOG.cc.

{
if(category == NF_OVERLAY_NODE_GRACEFUL_LEAVE && state == READY) {
}
}
void PubSubMMOG::sendHearbeatToChildren ( )
protected

Definition at line 1227 of file PubSubMMOG.cc.

{
std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
for( it = responsibleSubspaces.begin(); it != responsibleSubspaces.end(); ++it) {
PubSubPingCall* bHeartbeat = new PubSubPingCall("Heartbeat Backup");
bHeartbeat->setSubspaceId( it->second.getId().getId() );
bHeartbeat->setBitLength( PUBSUB_PINGCALL_L( bHeartbeat ));
PubSubPingCall* iHeartbeat = new PubSubPingCall("Heartbeat Intermediate");
iHeartbeat->setSubspaceId( it->second.getId().getId() );
iHeartbeat->setBitLength( PUBSUB_PINGCALL_L( iHeartbeat ));
sendMessageToChildren( it->second, iHeartbeat, bHeartbeat, NULL);
delete bHeartbeat;
delete iHeartbeat;
}
}
void PubSubMMOG::sendMessageToChildren ( PubSubSubspaceResponsible subspace,
BaseOverlayMessage toIntermediates,
BaseOverlayMessage toBackup,
BaseOverlayMessage toPlayers 
)
protected

Definition at line 1749 of file PubSubMMOG.cc.

{
BaseCallMessage* intermediateCall = dynamic_cast<BaseCallMessage*>(toIntermediates);
BaseCallMessage* backupCall = dynamic_cast<BaseCallMessage*>(toBackup);
BaseCallMessage* playerCall = dynamic_cast<BaseCallMessage*>(toPlayers);
std::set<NodeHandle>::iterator childIt;
if( toPlayers ) {
// Inform all children ...
for( childIt = subspace.children.begin(); childIt != subspace.children.end(); ++childIt ) {
if( playerCall ){
pubSubSignalingMessagesSize+= playerCall->getByteLength()
);
sendUdpRpcCall( *childIt, static_cast<BaseCallMessage*>(playerCall->dup()) );
} else {
pubSubSignalingMessagesSize+= toPlayers->getByteLength()
);
sendMessageToUDP( *childIt, static_cast<BaseOverlayMessage*>(toPlayers->dup()) );
}
}
// ... and all cached children ...
std::map<NodeHandle, bool>::iterator cacheChildIt;
for( cacheChildIt = subspace.cachedChildren.begin(); cacheChildIt != subspace.cachedChildren.end(); ++cacheChildIt ) {
if( playerCall ){
pubSubSignalingMessagesSize+= playerCall->getByteLength()
);
sendUdpRpcCall( cacheChildIt->first, static_cast<BaseCallMessage*>(playerCall->dup()) );
} else {
pubSubSignalingMessagesSize+= toPlayers->getByteLength()
);
sendMessageToUDP( cacheChildIt->first, static_cast<BaseOverlayMessage*>(toPlayers->dup()) );
}
}
}
deque<PubSubSubspaceResponsible::IntermediateNode>::iterator iit;
// ... all intermediate nodes ...
for( iit = subspace.intermediateNodes.begin(); iit != subspace.intermediateNodes.end(); ++iit ){
if( toIntermediates && !iit->node.isUnspecified() ){
if( intermediateCall ){
pubSubSignalingMessagesSize+= intermediateCall->getByteLength()
);
sendUdpRpcCall( iit->node, static_cast<BaseCallMessage*>(intermediateCall->dup()) );
} else {
pubSubSignalingMessagesSize+= toIntermediates->getByteLength()
);
sendMessageToUDP( iit->node, static_cast<BaseOverlayMessage*>(toIntermediates->dup()) );
}
}
if( toPlayers ) {
// .. and all intermediate node's children ...
for( childIt = iit->children.begin(); childIt != iit->children.end(); ++childIt ){
if( playerCall ){
pubSubSignalingMessagesSize+= playerCall->getByteLength()
);
sendUdpRpcCall( *childIt, static_cast<BaseCallMessage*>(playerCall->dup()) );
} else {
pubSubSignalingMessagesSize+= toPlayers->getByteLength()
);
sendMessageToUDP( *childIt, static_cast<BaseOverlayMessage*>(toPlayers->dup()) );
}
}
}
}
// ... and the backup node
if( toBackup && !subspace.getBackupNode().isUnspecified() ) {
if( backupCall ){
pubSubSignalingMessagesSize+= backupCall->getByteLength()
);
sendUdpRpcCall( subspace.getBackupNode(), static_cast<BaseCallMessage*>(backupCall->dup()) );
} else {
pubSubSignalingMessagesSize+= toBackup->getByteLength()
);
sendMessageToUDP( subspace.getBackupNode(), static_cast<BaseOverlayMessage*>(toBackup->dup()) );
}
}
}
void PubSubMMOG::sendPingToChildren ( )
protected

Definition at line 1247 of file PubSubMMOG.cc.

{
std::map<PubSubSubspaceId, PubSubSubspaceResponsible>::iterator it;
for( it = responsibleSubspaces.begin(); it != responsibleSubspaces.end(); ++it) {
PubSubPingCall* heartbeat = new PubSubPingCall("Ping");
heartbeat->setSubspaceId( it->second.getId().getId() );
heartbeat->setBitLength( PUBSUB_PINGCALL_L( heartbeat ));
sendMessageToChildren( it->second, NULL, NULL, heartbeat );
delete heartbeat;
}
}
void PubSubMMOG::setBootstrapedIcon ( )
protected

Definition at line 1927 of file PubSubMMOG.cc.

{
if(ev.isGUI()) {
if(state == READY) {
getParentModule()->getParentModule()->getDisplayString().setTagArg("i2", 1, "green");
getDisplayString().setTagArg("i", 1, "green");
}
else if(state == JOIN) {
getParentModule()->getParentModule()->getDisplayString().setTagArg("i2", 1, "yellow");
getDisplayString().setTagArg("i", 1, "yellow");
}
else {
getParentModule()->getParentModule()->getDisplayString().setTagArg("i2", 1, "red");
getDisplayString().setTagArg("i", 1, "red");
}
}
}
void PubSubMMOG::startTimer ( PubSubTimer timer)
protected

Definition at line 1945 of file PubSubMMOG.cc.

{
if( !timer ) {
EV << "[PubSubMMOG::startTimer() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " WARNING! Trying to start NULL timer @ " << thisNode << "\n"
<< endl;
return;
}
if( timer->isScheduled() ) {
cancelEvent( timer );
}
simtime_t duration = 0;
switch( timer->getType() ) {
duration = parentTimeout/2;
break;
duration = parentTimeout*10; // FIXME: make it a parameter
break;
duration = parentTimeout;
break;
duration = 1.0/movementRate;
break;
}
scheduleAt(simTime() + duration, timer );
}
void PubSubMMOG::takeOverNewSubspace ( PubSubSubspaceId  subspaceId)
protected

Definition at line 1174 of file PubSubMMOG.cc.

{
// create a new subspace
PubSubSubspaceResponsible subspace( subspaceId );
takeOverSubspace( subspace, true );
}
void PubSubMMOG::takeOverSubspace ( PubSubSubspaceResponsible subspaceId,
bool  isNew = false 
)
protected

Definition at line 1181 of file PubSubMMOG.cc.

{
const PubSubSubspaceId& subspaceId = subspace.getId();
int intId = subspaceId.getId();
subspace.fixTotalChildrenCount();
NodeHandle oldNode = subspace.getResponsibleNode();
// insert subspace into responsible list
subspace.setResponsibleNode( thisNode );
responsibleSubspaces.insert( make_pair(subspaceId, subspace) );
// request backup
PubSubHelpCall* helpCall = new PubSubHelpCall("I need a backup node");
helpCall->setHelpType( PUBSUB_BACKUP );
helpCall->setSubspaceId( intId );
helpCall->setBitLength( PUBSUB_HELPCALL_L( helpCall ));
pubSubSignalingMessagesSize+= helpCall->getByteLength()
);
if( !isNew ) {
PubSubReplacementMessage* repMsg = new PubSubReplacementMessage("I replaced the responsible node");
repMsg->setSubspaceId( intId );
repMsg->setBitLength( PUBSUB_REPLACEMENT_L( repMsg ));
// Inform children and lobbyserver about takeover
sendMessageToChildren( subspace, repMsg, NULL, repMsg );
// inform lobby server over failed node
PubSubFailedNodeMessage* failedNode = new PubSubFailedNodeMessage("Node failed");
failedNode->setFailedNode( oldNode );
failedNode->setBitLength( PUBSUB_FAILEDNODE_L( failedNode ));
pubSubSignalingMessagesSize+= failedNode->getByteLength()
);
sendMessageToUDP( lobbyServer, failedNode );
}
}
void PubSubMMOG::unsubscribeChild ( const NodeHandle node,
PubSubSubspaceResponsible subspace 
)
protected

Definition at line 1623 of file PubSubMMOG.cc.

{
PubSubBackupUnsubscribeMessage* backupMsg = new PubSubBackupUnsubscribeMessage("Backup: node left subspace");
backupMsg->setChild( node );
backupMsg->setSubspaceId( subspace.getId().getId() );
if( iNode && !iNode->node.isUnspecified() ) {
// Node is handled by an intermediate node, inform him
PubSubNodeLeftMessage* goneMsg = new PubSubNodeLeftMessage("Node left Subspace");
goneMsg->setNode( node );
goneMsg->setSubspaceId( subspace.getId().getId() );
goneMsg->setBitLength( PUBSUB_NODELEFT_L( goneMsg ));
pubSubSignalingMessagesSize+= goneMsg->getByteLength()
);
sendMessageToUDP( iNode->node, goneMsg );
}
if ( subspace.getTotalChildrenCount() < ( maxChildren - 1) * subspace.getNumIntermediates()){// FIXME: parameter when to start cleanup?
// Too many "free" slots, remove one intermediate node
if( !liNode.node.isUnspecified() ){
// Inform node + lobby about release from intermediate node status
PubSubReleaseIntermediateMessage* releaseMsg = new PubSubReleaseIntermediateMessage("I don't need you anymore as intermediate");
releaseMsg->setSubspaceId( subspace.getId().getId() );
releaseMsg->setBitLength( PUBSUB_RELEASEINTERMEDIATE_L( releaseMsg ));
pubSubSignalingMessagesSize+= releaseMsg->getByteLength()
);
sendMessageToUDP( liNode.node, releaseMsg );
PubSubHelpReleaseMessage* helpRMsg = new PubSubHelpReleaseMessage("node is not my intermediate anymore");
helpRMsg->setSubspaceId( subspace.getId().getId() );
helpRMsg->setNode( liNode.node );
helpRMsg->setBitLength( PUBSUB_HELPRELEASE_L( helpRMsg ));
pubSubSignalingMessagesSize+= helpRMsg->getByteLength()
);
// inform parent of intermediate node
int parentPos = (subspace.intermediateNodes.size()-1)/maxChildren -1;
if( parentPos >= 0 ){
if( !parent.node.isUnspecified() ){
PubSubNodeLeftMessage* goneMsg = new PubSubNodeLeftMessage("Intermediate left Subspace");
goneMsg->setNode( liNode.node );
goneMsg->setSubspaceId( subspace.getId().getId() );
goneMsg->setBitLength( PUBSUB_NODELEFT_L( goneMsg ));
pubSubSignalingMessagesSize+= goneMsg->getByteLength()
);
sendMessageToUDP( parent.node, goneMsg );
}
}
}
bool fixNeeded = false;
set<NodeHandle>::iterator childIt;
for( childIt = liNode.children.begin(); childIt != liNode.children.end(); ++childIt ){
// remove children of last intermediate
if( subspace.getNumChildren() + subspace.getNumIntermediates() < maxChildren ){
// we have room for the child
if( !subspace.children.insert( *childIt ).second ) fixNeeded = true;
//FIXME: send backup new->toMe
} else {
// Node has to go to some intermediate
// find intermediate with free capacities
newINode = subspace.getNextFreeIntermediate();
if( newINode && newINode->node != liNode.node ){
// cache it
if( !subspace.cachedChildren.insert( make_pair(*childIt, true) ).second ) fixNeeded = true;
//FIXME: send backup new->toCache
++(newINode->waitingChildren);
// let him adopt the child
PubSubAdoptChildCall* adoptCall = new PubSubAdoptChildCall("Adopt Node");
adoptCall->setSubspaceId( subspace.getId().getId() );
adoptCall->setChild( *childIt );
adoptCall->setBitLength( PUBSUB_ADOPTCHILDCALL_L( adoptCall ));
pubSubSignalingMessagesSize+= adoptCall->getByteLength()
);
sendUdpRpcCall( newINode->node, adoptCall );
} else {
// no intermediate found
// just move child to cache and wait for a new one
if( !subspace.cachedChildren.insert( make_pair(*childIt, false) ).second ) fixNeeded = true;
}
}
}
// delete node from subspace's intermediate node list
subspace.intermediateNodes.pop_back();
// inform backup about deleted intermediate
backupMsg->setIntermediate( liNode.node );
if( fixNeeded ) subspace.fixTotalChildrenCount();
}
// FIXME: just for testing
int iii = subspace.getTotalChildrenCount();
if( iii != subspace.getTotalChildrenCount() ){
opp_error("Huh?");
}
if( !subspace.getBackupNode().isUnspecified() ){
backupMsg->setBitLength( PUBSUB_BACKUPUNSUBSCRIBE_L( backupMsg ));
pubSubSignalingMessagesSize+= backupMsg->getByteLength()
);
sendMessageToUDP( subspace.getBackupNode(), backupMsg );
} else {
delete backupMsg;
}
}

Member Data Documentation

bool PubSubMMOG::allowOldMoveMessages
protected

Definition at line 111 of file PubSubMMOG.h.

int PubSubMMOG::AOIWidth
protected

Definition at line 105 of file PubSubMMOG.h.

std::map<PubSubSubspaceId, PubSubSubspaceResponsible> PubSubMMOG::backupSubspaces
protected

Definition at line 101 of file PubSubMMOG.h.

PubSubTimer* PubSubMMOG::childPingTimer
protected

Definition at line 117 of file PubSubMMOG.h.

unsigned int PubSubMMOG::currentRegionX
protected

Definition at line 113 of file PubSubMMOG.h.

unsigned int PubSubMMOG::currentRegionY
protected

Definition at line 113 of file PubSubMMOG.h.

PubSubTimer* PubSubMMOG::eventDeliveryTimer
protected

Definition at line 118 of file PubSubMMOG.h.

PubSubTimer* PubSubMMOG::heartbeatTimer
protected

Definition at line 116 of file PubSubMMOG.h.

std::map<PubSubSubspaceId, PubSubSubspaceIntermediate> PubSubMMOG::intermediateSubspaces
protected

Definition at line 102 of file PubSubMMOG.h.

cMessage* PubSubMMOG::joinTimer
protected

Definition at line 119 of file PubSubMMOG.h.

TransportAddress PubSubMMOG::lobbyServer
protected

Definition at line 120 of file PubSubMMOG.h.

int PubSubMMOG::lostMovementLists
protected

Definition at line 132 of file PubSubMMOG.h.

int PubSubMMOG::maxChildren
protected

Definition at line 109 of file PubSubMMOG.h.

int PubSubMMOG::maxMoveDelay
protected

Definition at line 115 of file PubSubMMOG.h.

int PubSubMMOG::moveListMessagesSize
protected

Definition at line 130 of file PubSubMMOG.h.

int PubSubMMOG::movementRate
protected

Definition at line 114 of file PubSubMMOG.h.

int PubSubMMOG::moveMessagesSize
protected

Definition at line 128 of file PubSubMMOG.h.

int PubSubMMOG::numEventsCorrectTimeslot
protected

Definition at line 124 of file PubSubMMOG.h.

int PubSubMMOG::numEventsWrongTimeslot
protected

Definition at line 123 of file PubSubMMOG.h.

int PubSubMMOG::numMoveListMessages
protected

Definition at line 129 of file PubSubMMOG.h.

int PubSubMMOG::numMoveMessages
protected

Definition at line 127 of file PubSubMMOG.h.

int PubSubMMOG::numPubSubSignalingMessages
protected

Definition at line 125 of file PubSubMMOG.h.

int PubSubMMOG::numSubspaces
protected

Definition at line 107 of file PubSubMMOG.h.

int PubSubMMOG::parentTimeout
protected

Definition at line 108 of file PubSubMMOG.h.

int PubSubMMOG::pubSubSignalingMessagesSize
protected

Definition at line 126 of file PubSubMMOG.h.

int PubSubMMOG::receivedMovementLists
protected

Definition at line 133 of file PubSubMMOG.h.

int PubSubMMOG::respMoveListMessagesSize
protected

Definition at line 131 of file PubSubMMOG.h.

std::map<PubSubSubspaceId, PubSubSubspaceResponsible> PubSubMMOG::responsibleSubspaces
protected

Definition at line 100 of file PubSubMMOG.h.

std::list<PubSubSubspace> PubSubMMOG::subscribedSubspaces
protected

Definition at line 99 of file PubSubMMOG.h.

int PubSubMMOG::subspaceSize
protected

Definition at line 104 of file PubSubMMOG.h.


The documentation for this class was generated from the following files: