OverSim
PubSubLobby Class Reference

#include <PubSubLobby.h>

Inheritance diagram for PubSubLobby:
BaseOverlay BaseRpc BaseTcpSupport TopologyVis RpcListener

Classes

class  ChildEntry

Public Member Functions

virtual ~PubSubLobby ()
virtual void initializeOverlay (int stage)
 Initializes derived-class-attributes.
virtual void finishOverlay ()
 collects statistical data in derived class
virtual void handleUDPMessage (BaseOverlayMessage *msg)
 Processes messages from underlay.
virtual void handleTimerEvent (cMessage *msg)
virtual bool handleRpcCall (BaseCallMessage *msg)
 Processes Remote-Procedure-Call invocation messages.
virtual void handleRpcResponse (BaseResponseMessage *msg, cPolymorphic *context, int rpcId, simtime_t rtt)
 This method is called if an RPC response has been received.
virtual void handleRpcTimeout (BaseCallMessage *msg, const TransportAddress &dest, cPolymorphic *context, int rpcId, const OverlayKey &destKey)
 This method is called if an RPC timeout has been reached.
- Public Member Functions inherited from BaseOverlay
 BaseOverlay ()
virtual ~BaseOverlay ()
 Virtual destructor.
States getState ()
bool isMalicious ()
 Returns true, if node is malicious.
bool isInSimpleMultiOverlayHost ()
 Returns true if overlay is one in an array, inside a SimpleMultiOverlayHost.
const simtime_t & getCreationTime ()
void join (const OverlayKey &nodeID=OverlayKey::UNSPECIFIED_KEY)
 Join the overlay with a given nodeID.
virtual NodeVectorlocal_lookup (const OverlayKey &key, int num, bool safe)
 finds nodes closest to the given OverlayKey
virtual NodeVectorneighborSet (int num)
virtual bool isSiblingFor (const NodeHandle &node, const OverlayKey &key, int numSiblings, bool *err)
 Query if a node is among the siblings for a given key.
virtual int getMaxNumSiblings ()
 Query the maximum number of siblings (nodes close to a key) that are maintained by this overlay protocol.
virtual int getMaxNumRedundantNodes ()
 Query the maximum number of redundant next hop nodes that are returned by findNode().
void sendMessageToUDP (const TransportAddress &dest, cPacket *msg, simtime_t delay=SIMTIME_ZERO)
 Sends message to underlay.
void sendToKey (const OverlayKey &key, BaseOverlayMessage *message, int numSiblings=1, const std::vector< TransportAddress > &sourceRoute=TransportAddress::UNSPECIFIED_NODES, RoutingType routingType=DEFAULT_ROUTING)
 Sends a message to an overlay node, with the generic routing algorithm.
virtual OverlayKey distance (const OverlayKey &x, const OverlayKey &y, bool useAlternative=false) const
 This method should implement the distance between two keys.
void registerComp (CompType compType, cModule *module)
cModule * getCompModule (CompType compType)
cGate * getCompRpcGate (CompType compType)
void sendMessageToAllComp (cMessage *msg, CompType srcComp)
bool providesKbr ()
virtual uint8_t getBitsPerDigit ()
bool getMeasureAuthBlock ()
BootstrapListgetBootstrapList () const
virtual OverlayKey estimateMeanDistance ()
 returns mean distance between OverlayKeys in the network
virtual uint32_t estimateOverlaySize ()
 estimates the current number of nodes online
- Public Member Functions inherited from BaseRpc
 BaseRpc ()
const NodeHandlegetThisNode ()
 Returns the NodeHandle of this node.
simtime_t getUdpTimeout ()
- Public Member Functions inherited from RpcListener
virtual ~RpcListener ()
 destructor
- Public Member Functions inherited from BaseTcpSupport
virtual void socketDataArrived (int connId, void *yourPtr, cPacket *msg, bool urgent)
virtual void socketEstablished (int connId, void *yourPtr)
virtual void socketPeerClosed (int connId, void *yourPtr)
virtual void socketFailure (int connId, void *yourPtr, int code)
virtual void socketStatusArrived (int connId, void *yourPtr, TCPStatusInfo *status)
- Public Member Functions inherited from TopologyVis
 TopologyVis ()
void showOverlayNeighborArrow (const NodeHandle &neighbor, bool flush=true, const char *displayString=NULL)
 Draws an arrow from this node to neighbor.
void deleteOverlayNeighborArrow (const NodeHandle &neighbor)
 Removes an arrow from this node to neighbor.

Protected Types

typedef std::map
< TransportAddress, ChildEntry
PlayerMap
typedef std::multimap< int,
ChildEntry *, std::greater
< int > > 
PlayerRessourceMap
- Protected Types inherited from BaseOverlay
typedef UNORDERED_SET
< AbstractLookup
*, lookupHashFcn,
lookupHashFcn
LookupSet

Protected Member Functions

virtual void handleJoin (PubSubJoinCall *joinMsg)
virtual void handleHelpCall (PubSubHelpCall *helpMsg)
virtual void handleRespCall (PubSubResponsibleNodeCall *respCall)
virtual void handleTakeOverResponse (PubSubTakeOverSubspaceResponse *takeOverResp)
virtual void handleTakeOverTimeout (PubSubTakeOverSubspaceCall *takeOverCall, const TransportAddress &oldNode)
void handleHelpReleaseMessage (PubSubHelpReleaseMessage *helpRMsg)
void replaceResponsibleNode (int subspaceId, NodeHandle respNode)
void replaceResponsibleNode (PubSubSubspaceId subspaceId, NodeHandle respNode)
void failedNode (const TransportAddress &node)
- Protected Member Functions inherited from BaseOverlay
int numInitStages () const
 Sets init stage.
void bindToPort (int port)
 Tells UDP we want to get all packets arriving on the given port.
virtual void route (const OverlayKey &key, CompType destComp, CompType srcComp, cPacket *msg, const std::vector< TransportAddress > &sourceRoute=TransportAddress::UNSPECIFIED_NODES, RoutingType routingType=DEFAULT_ROUTING)
 Routes message through overlay.
void callDeliver (BaseOverlayMessage *msg, const OverlayKey &destKey)
 Calls deliver function in application.
void callForward (const OverlayKey &key, BaseRouteMessage *msg, const NodeHandle &nextHopNode)
 Calls forward function in application.
void callUpdate (const NodeHandle &node, bool joined)
 Informs application about state changes of nodes or newly joined nodes.
void handleMessage (cMessage *msg)
 Checks for message type and calls corresponding method.
void handleBaseOverlayMessage (BaseOverlayMessage *msg, const OverlayKey &destKey=OverlayKey::UNSPECIFIED_KEY)
 Handles a BaseOverlayMessage

virtual void handleAppMessage (cMessage *msg)
 Processes "timer" self-messages.
virtual void receiveChangeNotification (int category, const cPolymorphic *details)
 callback-method for events at the NotificationBoard
virtual void handleTransportAddressChangedNotification ()
 This method gets call if the node has a new TransportAddress (IP address) because he changed his access network.
virtual void handleNodeLeaveNotification ()
 This method gets call **.gracefulLeaveDelay seconds before it is killed.
virtual void handleNodeGracefulLeaveNotification ()
 This method gets call **.gracefulLeaveDelay seconds before it is killed if this node is among the gracefulLeaveProbability nodes.
virtual void recordOverlaySentStats (BaseOverlayMessage *msg)
 Collect overlay specific sent messages statistics.
void setOverlayReady (bool ready)
 Sets the overlay ready icon and register/deregisters the node at the GlobalNodeList.
virtual AbstractLookupcreateLookup (RoutingType routingType=DEFAULT_ROUTING, const BaseOverlayMessage *msg=NULL, const cPacket *findNodeExt=NULL, bool appLookup=false)
 Creates an abstract iterative lookup instance.
virtual void removeLookup (AbstractLookup *lookup)
 Removes the abstract lookup instance.
virtual NodeVectorfindNode (const OverlayKey &key, int numRedundantNodes, int numSiblings, BaseOverlayMessage *msg=NULL)
 Implements the find node call.
virtual void joinOverlay ()
 Join the overlay with a given nodeID in thisNode.key.
virtual void joinForeignPartition (const NodeHandle &node)
 Join another overlay partition with the given node as bootstrap node.
virtual bool handleFailedNode (const TransportAddress &failed)
 Handles a failed node.
virtual void lookupRpc (LookupCall *call)
virtual void nextHopRpc (NextHopCall *call)
void countFindNodeCall (const FindNodeCall *call)
void countFailedNodeCall (const FailedNodeCall *call)
bool internalHandleRpcCall (BaseCallMessage *msg)
 Handles internal rpc requests.
void internalHandleRpcResponse (BaseResponseMessage *msg, cPolymorphic *context, int rpcId, simtime_t rtt)
 Handles rpc responses internal in base classes

void internalHandleRpcTimeout (BaseCallMessage *msg, const TransportAddress &dest, cPolymorphic *context, int rpcId, const OverlayKey &destKey)
 Handles rpc timeouts internal in base classes

void internalSendRouteRpc (BaseRpcMessage *message, const OverlayKey &destKey, const std::vector< TransportAddress > &sourceRoute, RoutingType routingType)
CompType getThisCompType ()
 Return the component type of this module.
- Protected Member Functions inherited from BaseRpc
void initRpcs ()
 Initializes Remote-Procedure state.
void finishRpcs ()
 Deinitializes Remote-Procedure state.
virtual void internalHandleRpcMessage (BaseRpcMessage *msg)
 Handles incoming rpc messages and delegates them to the corresponding listeners or handlers.
uint32_t sendRouteRpcCall (CompType destComp, const TransportAddress &dest, const OverlayKey &destKey, BaseCallMessage *msg, cPolymorphic *context=NULL, RoutingType routingType=DEFAULT_ROUTING, simtime_t timeout=-1, int retries=0, int rpcId=-1, RpcListener *rpcListener=NULL)
 Routes a Remote-Procedure-Call message to an OverlayKey.
uint32_t sendRouteRpcCall (CompType destComp, const OverlayKey &destKey, BaseCallMessage *msg, cPolymorphic *context=NULL, RoutingType routingType=DEFAULT_ROUTING, simtime_t timeout=-1, int retries=0, int rpcId=-1, RpcListener *rpcListener=NULL)
 Routes a Remote-Procedure-Call message to an OverlayKey.
uint32_t sendRouteRpcCall (CompType destComp, const TransportAddress &dest, BaseCallMessage *msg, cPolymorphic *context=NULL, RoutingType routingType=DEFAULT_ROUTING, simtime_t timeout=-1, int retries=0, int rpcId=-1, RpcListener *rpcListener=NULL)
 Sends a Remote-Procedure-Call message using the overlay's UDP port
This replaces ROUTE_DIRECT calls!
uint32_t sendUdpRpcCall (const TransportAddress &dest, BaseCallMessage *msg, cPolymorphic *context=NULL, simtime_t timeout=-1, int retries=0, int rpcId=-1, RpcListener *rpcListener=NULL)
 Sends a Remote-Procedure-Call message to the underlay

uint32_t sendInternalRpcCall (CompType destComp, BaseCallMessage *msg, cPolymorphic *context=NULL, simtime_t timeout=-1, int retries=0, int rpcId=-1, RpcListener *rpcListener=NULL)
 Sends an internal Remote-Procedure-Call between two tiers

void cancelRpcMessage (uint32_t nonce)
 Cancels a Remote-Procedure-Call.
void cancelAllRpcs ()
 Cancels all RPCs.
void sendRpcResponse (TransportType transportType, CompType destComp, const TransportAddress &dest, const OverlayKey &destKey, BaseCallMessage *call, BaseResponseMessage *response)
 Send Remote-Procedure response message and deletes call message.
void sendRpcResponse (BaseCallMessage *call, BaseResponseMessage *response)
 Send Remote-Procedure response message to UDP and deletes call message.
int pingNode (const TransportAddress &dest, simtime_t timeout=-1, int retries=0, cPolymorphic *context=NULL, const char *caption="PING", RpcListener *rpcListener=NULL, int rpcId=-1, TransportType transportType=INVALID_TRANSPORT)
 ping a node by its TransportAddress
virtual void pingResponse (PingResponse *pingResponse, cPolymorphic *context, int rpcId, simtime_t rtt)
virtual void pingTimeout (PingCall *pingCall, const TransportAddress &dest, cPolymorphic *context, int rpcId)
bool internalHandleMessage (cMessage *msg)
- Protected Member Functions inherited from RpcListener
virtual void handleRpcResponse (BaseResponseMessage *msg, const RpcState &rpcState, simtime_t rtt)
 This method is called if an RPC response has been received.
virtual void handleRpcTimeout (const RpcState &rpcState)
 This method is called if an RPC timeout has been reached.
- Protected Member Functions inherited from BaseTcpSupport
void handleTCPMessage (cMessage *msg)
 Member function to handle incoming TCP messages.
void bindAndListenTcp (int port)
 Member function to bind service to the specified port and listen afterwards.
bool isAlreadyConnected (TransportAddress address)
 Member function to check if the service is already connected.
void establishTcpConnection (TransportAddress address)
 Member function to establish a connection to the specified node.
void sendTcpData (cPacket *msg, TransportAddress address)
 Member function to send TCP data to the specified node.
virtual void handleConnectionEvent (EvCode code, TransportAddress address)
 Member function to handle passive connection events.
virtual void handleDataReceived (TransportAddress address, cPacket *msg, bool urgent)
 Member function to handle incoming data.
virtual void handleIncomingConnection (TransportAddress address)
 Member function to handle newly opened connections.
void closeTcpConnection (TransportAddress address)
 Member function to close an established connection.
void setTcpOut (cGate *gate)
 Member function to set local gate towards the TCP module during init phase.
cGate * getTcpOut ()
 Member function to get local gate towards the TCP module.
- Protected Member Functions inherited from TopologyVis
void initVis (cModule *terminal)

Protected Attributes

int subspaceSize
int numSubspaces
std::vector< std::vector
< PubSubSubspaceLobby > > 
subspaces
std::list< PubSubHelpCall * > waitingForHelp
PlayerMap playerMap
PlayerRessourceMap playerRessourceMap
int numPubSubSignalingMessages
int pubSubSignalingMessagesSize
- Protected Attributes inherited from BaseOverlay
int numAppDataForwarded
 number of forwarded app data packets
int bytesAppDataForwarded
 number of forwarded app data bytes at out-gate
int numAppLookupForwarded
 number of forwarded app lookup packets
int bytesAppLookupForwarded
 number of forwarded app lookup bytes at out-gate
int numMaintenanceForwarded
 number of forwarded maintenance packets
int bytesMaintenanceForwarded
 number of forwarded maintenance bytes at out-gate
int numFindNodeSent
int bytesFindNodeSent
int numFindNodeResponseSent
int bytesFindNodeResponseSent
int numFailedNodeSent
int bytesFailedNodeSent
int numFailedNodeResponseSent
int bytesFailedNodeResponseSent
std::vector< HopDelayRecord * > singleHopDelays
simtime_t creationTime
 simtime when the node has been created
GlobalNodeListglobalNodeList
 pointer to GlobalNodeList in this node
NotificationBoard * notificationBoard
 pointer to NotificationBoard in this node
UnderlayConfiguratorunderlayConfigurator
 pointer to UnderlayConfigurator in this node
BootstrapListbootstrapList
 pointer to the BootstrapList module
GlobalParametersglobalParameters
 pointer to the GlobalParameters module
uint32_t overlayId
 identifies the overlay this node belongs to (used for multiple overlays)
bool debugOutput
 debug output ?
RoutingType defaultRoutingType
bool useCommonAPIforward
 forward messages to applications?
bool collectPerHopDelay
 collect delay for single hops
bool routeMsgAcks
 send ACK when receiving route message
uint32_t recNumRedundantNodes
 numRedundantNodes for recursive routing
bool recordRoute
 record visited hops on route
bool drawOverlayTopology
bool rejoinOnFailure
bool sendRpcResponseToLastHop
 needed by KBR protocols for NAT support
bool dropFindNodeAttack
 if node is malicious, it tries a findNode attack
bool isSiblingAttack
 if node is malicious, it tries a isSibling attack
bool invalidNodesAttack
 if node is malicious, it tries a invalidNode attack
bool dropRouteMessageAttack
 if node is malicious, it drops all received BaseRouteMessages
int localPort
 used UDP-port
int hopCountMax
 maximum hop count
bool measureAuthBlock
 if true, measure the overhead of signatures in rpc messages
bool restoreContext
 if true, a node rejoins with its old nodeId and malicious state
int numDropped
 number of dropped packets
int bytesDropped
 number of dropped bytes
cOutVector delayVector
 statistical output vector for packet-delays
cOutVector hopCountVector
 statistical output vector for hop-counts
States state
IterativeLookupConfiguration iterativeLookupConfig
RecursiveLookupConfiguration recursiveLookupConfig
LookupSet lookups
bool kbr
 set this to true, if the overlay provides KBR services
- Protected Attributes inherited from BaseRpc
NodeHandle thisNode
 NodeHandle to this node.
BaseOverlayoverlay
bool debugOutput
 debug output ?
GlobalStatisticsglobalStatistics
 pointer to GlobalStatistics module in this node
CompType thisCompType
NeighborCacheneighborCache
 pointer to the neighbor cache
CryptoModulecryptoModule
 pointer to CryptoModule
int numPingSent
int bytesPingSent
int numPingResponseSent
int bytesPingResponseSent
- Protected Attributes inherited from TopologyVis
cModule * thisTerminal
GlobalNodeListglobalNodeList
 pointer to corresponding node

Friends

std::ostream & operator<< (std::ostream &o, const ChildEntry &entry)

Additional Inherited Members

- Public Types inherited from BaseOverlay
enum  States {
  INIT = 0, BOOTSTRAP = 1, DISCOVERY = 2, PREJOIN = 3,
  JOIN = 4, POSTJOIN = 5, READY = 6, REFRESH = 7,
  SHUTDOWN = 8, FAILED = 9, RSET = JOIN, BSET = POSTJOIN
}

Detailed Description

Definition at line 36 of file PubSubLobby.h.

Member Typedef Documentation

typedef std::map<TransportAddress, ChildEntry> PubSubLobby::PlayerMap
protected

Definition at line 73 of file PubSubLobby.h.

typedef std::multimap<int, ChildEntry*, std::greater<int> > PubSubLobby::PlayerRessourceMap
protected

Definition at line 75 of file PubSubLobby.h.

Constructor & Destructor Documentation

PubSubLobby::~PubSubLobby ( )
virtual

Definition at line 474 of file PubSubLobby.cc.

{
}

Member Function Documentation

void PubSubLobby::failedNode ( const TransportAddress node)
protected

Definition at line 414 of file PubSubLobby.cc.

{
if( node.isUnspecified() ) return;
// Find node in playerMap
PlayerMap::iterator playerIt = playerMap.find( node );
if( playerIt == playerMap.end() ){
// Player was already deleted
return;
}
ChildEntry* respNodeEntry = &(playerIt->second);
// FIXME: only for debugging
opp_error("Trying to delete node that's still there...");
}
// check if node was responsible for a subspace
set<int>::iterator dutyIt;
for( dutyIt = respNodeEntry->dutySet.begin(); dutyIt != respNodeEntry->dutySet.end(); ++dutyIt ){
PubSubSubspaceId subspaceId( *dutyIt, numSubspaces );
PubSubSubspaceLobby& subspace = subspaces[subspaceId.getX()][subspaceId.getY()];
if( !subspace.getResponsibleNode().isUnspecified() && node == subspace.getResponsibleNode() ){
// remove old responsible node
// wait for the backup node to claim subspace; if timer expires, waiting-flag will be reset
subspace.waitingForRespNode = true;
PubSubTimer* graceTimer = new PubSubTimer("Grace timer for claiming subspace");
graceTimer->setSubspaceId( subspace.getId().getId() );
scheduleAt( simTime() + 5, graceTimer ); //FIXME: make it a parameter
}
}
// delete node from backupList
pair<PlayerRessourceMap::iterator, PlayerRessourceMap::iterator> resRange;
PlayerRessourceMap::iterator resIt;
resRange = playerRessourceMap.equal_range( respNodeEntry->ressources );
for( resIt = resRange.first; resIt != resRange.second; ++resIt ){
if( resIt->second == respNodeEntry ){
playerRessourceMap.erase( resIt );
break;
}
}
playerMap.erase( playerIt );
}
void PubSubLobby::finishOverlay ( )
virtual

collects statistical data in derived class

Reimplemented from BaseOverlay.

Definition at line 463 of file PubSubLobby.cc.

{
if (time < GlobalStatistics::MIN_MEASURED) return;
globalStatistics->addStdDev("PubSubLobby: Sent Signaling Messages/s",
globalStatistics->addStdDev("PubSubLobby: Sent Signaling bytes/s",
}
void PubSubLobby::handleHelpCall ( PubSubHelpCall helpMsg)
protectedvirtual

Definition at line 226 of file PubSubLobby.cc.

{
// A node needs help! Give him the handle of the node with the most ressources...
const NodeHandle& src = helpMsg->getSrcNode();
int subspaceId = helpMsg->getSubspaceId();
PlayerRessourceMap::iterator it;
for( it = playerRessourceMap.begin(); it != playerRessourceMap.end(); ++it ){
if( it->second->handle != src &&
it->second->dutySet.find( subspaceId ) == it->second->dutySet.end() &&
it->second->ressources > 1 ){
break;
}
}
// No suitable node found!
if( it == playerRessourceMap.end() ){
waitingForHelp.push_back( helpMsg );
return;
}
// decrease ressources
ChildEntry* child = it->second;
child->ressources -= ( helpMsg->getHelpType() == PUBSUB_BACKUP ) ? 2 : 1; // FIXME: make it a parameter
child->dutySet.insert( subspaceId );
playerRessourceMap.erase( it );
playerRessourceMap.insert( make_pair(child->ressources, child) );
// Send handle to requesting node
PubSubHelpResponse* helpResp = new PubSubHelpResponse("Ask him to help you");
helpResp->setSubspaceId( subspaceId );
helpResp->setHelpType( helpMsg->getHelpType() );
helpResp->setNode( child->handle );
helpResp->setBitLength( PUBSUB_HELPRESPONSE_L( helpResp ));
pubSubSignalingMessagesSize += helpResp->getByteLength()
);
sendRpcResponse( helpMsg, helpResp );
}
void PubSubLobby::handleHelpReleaseMessage ( PubSubHelpReleaseMessage helpRMsg)
protected

Definition at line 331 of file PubSubLobby.cc.

{
PlayerMap::iterator playerIt = playerMap.find( helpRMsg->getNode() );
if( playerIt == playerMap.end() ){
// Player was already deleted
return;
}
ChildEntry* nodeEntry = &(playerIt->second);
// remove subspace from node's duty set
nodeEntry->dutySet.erase( helpRMsg->getSubspaceId() );
// Increase node's ressources
pair<PlayerRessourceMap::iterator, PlayerRessourceMap::iterator> resRange;
PlayerRessourceMap::iterator resIt;
resRange = playerRessourceMap.equal_range( nodeEntry->ressources );
for( resIt = resRange.first; resIt != resRange.second; ++resIt ){
if( resIt->second == nodeEntry ){
playerRessourceMap.erase( resIt );
break;
}
}
nodeEntry->ressources += 1; // FIXME: make it a parameter
playerRessourceMap.insert( make_pair(nodeEntry->ressources, nodeEntry) );
}
void PubSubLobby::handleJoin ( PubSubJoinCall joinMsg)
protectedvirtual

Definition at line 138 of file PubSubLobby.cc.

{
// Insert node in the queue of possible backup nodes
ChildEntry e;
e.handle = joinMsg->getSrcNode();
e.ressources = joinMsg->getRessources();
pair<PlayerMap::iterator, bool> inserter;
inserter = playerMap.insert( make_pair( e.handle, e ));
ChildEntry* childEntry = &(inserter.first->second);
//pair<PlayerRessourceMap::iterator, bool> rInserter;
//rInserter = playerRessourceMap.insert( make_pair( e.ressources, childEntry ));
PlayerRessourceMap::iterator rInserter;
rInserter = playerRessourceMap.insert( make_pair( e.ressources, childEntry ));
bool insertedAtBegin = rInserter == playerRessourceMap.begin();
// send answer with responsible node
PubSubJoinResponse* joinResp = new PubSubJoinResponse( "Join Response");
unsigned int x = (unsigned int) (joinMsg->getPosition().x / subspaceSize);
unsigned int y = (unsigned int) (joinMsg->getPosition().y / subspaceSize);
PubSubSubspaceLobby& subspace = subspaces[x][y];
NodeHandle respNode = subspace.getResponsibleNode();
joinResp->setResponsibleNode( respNode );
joinResp->setBitLength( PUBSUB_JOINRESPONSE_L( joinResp ));
pubSubSignalingMessagesSize += joinResp->getByteLength()
);
sendRpcResponse( joinMsg, joinResp );
if( respNode.isUnspecified() && !subspace.waitingForRespNode) {
// respNode is unknown, create new...
// TODO: refactor: make a funktion out of this...
PubSubTakeOverSubspaceCall* toCall = new PubSubTakeOverSubspaceCall( "Take over subspace");
toCall->setSubspacePos( Vector2D(x, y) );
ChildEntry* child = playerRessourceMap.begin()->second;
toCall->setBitLength( PUBSUB_TAKEOVERSUBSPACECALL_L( toCall ));
pubSubSignalingMessagesSize += toCall->getByteLength()
);
sendUdpRpcCall( child->handle, toCall );
child->dutySet.insert( subspace.getId().getId() );
child->ressources-=2; // XXX FIXME: make it a parameter...
if( insertedAtBegin ){
rInserter = playerRessourceMap.insert( make_pair(child->ressources, child) );
} else {
playerRessourceMap.insert( make_pair(child->ressources, child) );
}
subspace.waitingForRespNode = true;
}
// New node is out of luck: he gets to help all waiting nodes as long as he has ressources left
if( waitingForHelp.size() > 0 ) {
std::list<PubSubHelpCall*>::iterator it = waitingForHelp.begin();
while( it != waitingForHelp.end() ) {
// Insert subspace into node's dutySet
if( childEntry->dutySet.insert( (*it)->getSubspaceId() ).second ){
// If it was not already there (due to duplicate HelpCalls because of retransmissions),
// decrease ressources
childEntry->ressources -= ( (*it)->getHelpType() == PUBSUB_BACKUP ) ? 2 : 1; // FIXME: make it a parameter
}
PubSubHelpResponse* helpResp = new PubSubHelpResponse("Ask him to help you");
helpResp->setSubspaceId( (*it)->getSubspaceId() );
helpResp->setType( (*it)->getType() );
helpResp->setNode( e.handle );
helpResp->setBitLength( PUBSUB_HELPRESPONSE_L( helpResp ));
pubSubSignalingMessagesSize += helpResp->getByteLength()
);
sendRpcResponse( *it, helpResp );
waitingForHelp.erase( it++ );
if( childEntry->ressources <= 0 ) break; // FIXME: clean up duplicate calls!
}
// Fix ressource map entry
playerRessourceMap.erase( rInserter );
playerRessourceMap.insert( make_pair(childEntry->ressources, childEntry) );
}
}
void PubSubLobby::handleRespCall ( PubSubResponsibleNodeCall respCall)
protectedvirtual

Definition at line 266 of file PubSubLobby.cc.

{
unsigned int x = (unsigned int) respCall->getSubspacePos().x;
unsigned int y = (unsigned int) respCall->getSubspacePos().y;
NodeHandle respNode = subspaces[x][y].getResponsibleNode();
if( !respNode.isUnspecified() ) {
PubSubResponsibleNodeResponse* msg = new PubSubResponsibleNodeResponse( "ResponsibleNode Response");
msg->setResponsibleNode( respNode );
msg->setSubspaceId( region.getId() );
msg->setBitLength( PUBSUB_RESPONSIBLENODERESPONSE_L( msg ));
pubSubSignalingMessagesSize += msg->getByteLength()
);
sendRpcResponse( respCall, msg );
} else {
// no responsible node for subspace known.
// push call to list of waiting nodes ...
PubSubSubspaceLobby& subspace = subspaces[x][y];
subspace.waitingNodes.push_back( respCall );
if (!subspace.waitingForRespNode) {
// ... and ask a node to take over the subspace
PubSubTakeOverSubspaceCall* msg = new PubSubTakeOverSubspaceCall( "Take over subspace");
msg->setSubspacePos( Vector2D( x, y) );
ChildEntry* child = playerRessourceMap.begin()->second;
msg->setBitLength( PUBSUB_TAKEOVERSUBSPACECALL_L( msg ));
pubSubSignalingMessagesSize += msg->getByteLength()
);
sendUdpRpcCall( child->handle, msg );
child->dutySet.insert( subspace.getId().getId() );
// Decrease ressources. Note: the ressources are decreased by the cost of an "backup" node
// The rest will be decreased when the new responsible answeres the takeover call
child->ressources-=1; // FIXME: make it a parameter...
playerRessourceMap.insert( make_pair(child->ressources, child) );
subspace.waitingForRespNode = true;
}
}
}
bool PubSubLobby::handleRpcCall ( BaseCallMessage msg)
virtual

Processes Remote-Procedure-Call invocation messages.


This method should be overloaded when the overlay provides RPC functionality.

Returns
true, if rpc has been handled

Reimplemented from BaseRpc.

Definition at line 96 of file PubSubLobby.cc.

{
// delegate messages
RPC_DELEGATE( PubSubJoin, handleJoin );
RPC_DELEGATE( PubSubHelp, handleHelpCall );
RPC_DELEGATE( PubSubResponsibleNode, handleRespCall );
return RPC_HANDLED;
}
void PubSubLobby::handleRpcResponse ( BaseResponseMessage msg,
cPolymorphic *  context,
int  rpcId,
simtime_t  rtt 
)
virtual

This method is called if an RPC response has been received.

Parameters
msgThe response message.
contextPointer to an optional state object. The object has to be handled/deleted by the handleRpcResponse() code
rpcIdThe RPC id.
rttThe Round-Trip-Time of this RPC

Reimplemented from RpcListener.

Definition at line 108 of file PubSubLobby.cc.

{
RPC_ON_RESPONSE( PubSubTakeOverSubspace ) {
handleTakeOverResponse( _PubSubTakeOverSubspaceResponse );
break;
}
}
void PubSubLobby::handleRpcTimeout ( BaseCallMessage msg,
const TransportAddress dest,
cPolymorphic *  context,
int  rpcId,
const OverlayKey destKey 
)
virtual

This method is called if an RPC timeout has been reached.

Parameters
msgThe original RPC message.
destThe destination node
contextPointer to an optional state object. The object has to be handled/deleted by the handleRpcResponse() code
rpcIdThe RPC id.
destKeythe destination OverlayKey

Reimplemented from RpcListener.

Definition at line 120 of file PubSubLobby.cc.

{
RPC_ON_CALL( PubSubTakeOverSubspace ) {
handleTakeOverTimeout( _PubSubTakeOverSubspaceCall, dest );
EV << "[PubSubMMOG::handleRpcTimeout() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " TakeOverSubspace RPC Call timed out: id=" << rpcId << "\n"
<< " msg=" << *_PubSubTakeOverSubspaceCall
<< endl;
break;
}
}
void PubSubLobby::handleTakeOverResponse ( PubSubTakeOverSubspaceResponse takeOverResp)
protectedvirtual

Definition at line 314 of file PubSubLobby.cc.

{
NodeHandle respNode = takeOverResp->getSrcNode();
unsigned int x = (unsigned int) takeOverResp->getSubspacePos().x;
unsigned int y = (unsigned int) takeOverResp->getSubspacePos().y;
replaceResponsibleNode( region, takeOverResp->getSrcNode() );
}
void PubSubLobby::handleTakeOverTimeout ( PubSubTakeOverSubspaceCall takeOverCall,
const TransportAddress oldNode 
)
protectedvirtual

Definition at line 324 of file PubSubLobby.cc.

{
Vector2D pos = takeOverCall->getSubspacePos();
subspaces[(int) pos.x][(int) pos.y].waitingForRespNode = false;
failedNode( oldNode );
}
void PubSubLobby::handleTimerEvent ( cMessage *  msg)
virtual

Reimplemented from BaseRpc.

Definition at line 67 of file PubSubLobby.cc.

{
if( PubSubTimer* timer = dynamic_cast<PubSubTimer*>(msg) ) {
if( timer->getType() == PUBSUB_TAKEOVER_GRACE_TIME ){
// Grace period for subspace takeover timed out.
// If noone claimed the subspace yet, the next respNode query will
// trigger the selection of a new responsible node
PubSubSubspaceId subspaceId(timer->getSubspaceId(), numSubspaces );
subspaces[subspaceId.getX()][subspaceId.getY()].waitingForRespNode = false;
delete timer;
}
}
}
void PubSubLobby::handleUDPMessage ( BaseOverlayMessage msg)
virtual

Processes messages from underlay.

Parameters
msgMessage from UDP

Reimplemented from BaseOverlay.

Definition at line 81 of file PubSubLobby.cc.

{
if( PubSubFailedNodeMessage* failMsg = dynamic_cast<PubSubFailedNodeMessage*>(msg) ){
failedNode( failMsg->getFailedNode() );
delete msg;
} else if( PubSubReplacementMessage* repMsg = dynamic_cast<PubSubReplacementMessage*>(msg) ){
replaceResponsibleNode( repMsg->getSubspaceId(), repMsg->getNewResponsibleNode() );
delete msg;
} else if( PubSubHelpReleaseMessage* helpRMsg = dynamic_cast<PubSubHelpReleaseMessage*>(msg) ){
delete msg;
}
}
void PubSubLobby::initializeOverlay ( int  stage)
virtual

Initializes derived-class-attributes.


Initializes derived-class-attributes, called by BaseOverlay::initialize(). By default this method is called once. If more stages are needed one can overload numInitStages() and add more stages.

Parameters
stagethe init stage

Reimplemented from BaseOverlay.

Definition at line 39 of file PubSubLobby.cc.

{
// because of IPAddressResolver, we need to wait until interfaces are registered,
// address auto-assignment takes place etc.
if(stage != MIN_STAGE_OVERLAY) return;
numSubspaces = par("numSubspaces");
subspaceSize = (int) ( (unsigned int) par("areaDimension") / numSubspaces);
// FIXME: Inefficient, make subspace a single dimensioned array
for ( int i = 0; i < numSubspaces; ++i ) {
for( int ii = 0; ii < numSubspaces; ++ii ) {
PubSubSubspaceId region( i, ii, numSubspaces );
subspaces[i].push_back( PubSubSubspaceLobby( region ) );
}
WATCH_VECTOR( subspaces[i] );
}
WATCH_MAP( playerMap );
}
void PubSubLobby::replaceResponsibleNode ( int  subspaceId,
NodeHandle  respNode 
)
protected

Definition at line 357 of file PubSubLobby.cc.

{
}
void PubSubLobby::replaceResponsibleNode ( PubSubSubspaceId  subspaceId,
NodeHandle  respNode 
)
protected

Definition at line 362 of file PubSubLobby.cc.

{
// a new responsible node was found for a subspace
PubSubSubspaceLobby& subspace = subspaces[subspaceId.getX()][subspaceId.getY()];
// NodeHandle oldNode = subspace.getResponsibleNode();
// set new responsible node
subspace.setResponsibleNode( respNode );
subspace.waitingForRespNode = false;
// decrease responsible node's ressources
pair<PlayerRessourceMap::iterator, PlayerRessourceMap::iterator> resRange;
PlayerRessourceMap::iterator resIt;
PlayerMap::iterator plIt = playerMap.find( respNode );
if( plIt == playerMap.end() ){
// FIXME: How to react?
// Best would be: reinsert node. But most probable we have two nodes that want to be
// responsible, so how to avoid the resulting inconsostency?
opp_error("PlayerMap inconsistent: Allegedly failed node wants to become Responsible node");
}
// ChildEntry* respNodeEntry = &(plIt->second);
// resRange = playerRessourceMap.equal_range( respNodeEntry->ressources );
// for( resIt = resRange.first; resIt != resRange.second; ++resIt ){
// if( resIt->second == respNodeEntry ){
// playerRessourceMap.erase( resIt );
// break;
// }
// }
// respNodeEntry->ressources -= 2; // FIXME: make it a parameter
// playerRessourceMap.insert( make_pair(respNodeEntry->ressources, respNodeEntry) );
// remove old node from backupList->he failed...
// failedNode( oldNode );
// inform all waiting nodes...
std::list<PubSubResponsibleNodeCall*>::iterator it;
for( it = subspace.waitingNodes.begin(); it != subspace.waitingNodes.end(); ++it ) {
PubSubResponsibleNodeResponse* msg = new PubSubResponsibleNodeResponse( "ResponsibleNode Response");
msg->setResponsibleNode( respNode );
msg->setSubspaceId( subspaceId.getId() );
msg->setBitLength( PUBSUB_RESPONSIBLENODERESPONSE_L( msg ));
pubSubSignalingMessagesSize += msg->getByteLength()
);
sendRpcResponse( *it, msg );
}
subspace.waitingNodes.clear();
}

Friends And Related Function Documentation

std::ostream& operator<< ( std::ostream &  o,
const ChildEntry entry 
)
friend

Definition at line 31 of file PubSubLobby.cc.

{
o << "Node: " << entry.handle << " ressources: " << entry.ressources;
return o;
}

Member Data Documentation

int PubSubLobby::numPubSubSignalingMessages
protected

Definition at line 90 of file PubSubLobby.h.

int PubSubLobby::numSubspaces
protected

Definition at line 68 of file PubSubLobby.h.

PlayerMap PubSubLobby::playerMap
protected

Definition at line 74 of file PubSubLobby.h.

PlayerRessourceMap PubSubLobby::playerRessourceMap
protected

Definition at line 76 of file PubSubLobby.h.

int PubSubLobby::pubSubSignalingMessagesSize
protected

Definition at line 91 of file PubSubLobby.h.

std::vector<std::vector<PubSubSubspaceLobby> > PubSubLobby::subspaces
protected

Definition at line 69 of file PubSubLobby.h.

int PubSubLobby::subspaceSize
protected

Definition at line 67 of file PubSubLobby.h.

std::list<PubSubHelpCall*> PubSubLobby::waitingForHelp
protected

Definition at line 70 of file PubSubLobby.h.


The documentation for this class was generated from the following files: