OverSim
Scribe Class Reference

#include <Scribe.h>

Inheritance diagram for Scribe:
BaseApp BaseRpc BaseTcpSupport RpcListener

Public Member Functions

 Scribe ()
 ~Scribe ()
virtual void initializeApp (int stage)
 initializes derived class-attributes
virtual void handleUpperMessage (cMessage *msg)
 handleUpperMessage gets called of handleMessage(cMessage* msg) if msg arrivedOn from_upperTier (currently msg gets deleted in this function)
virtual void handleReadyMessage (CompReadyMessage *msg)
 method to handle ready messages from the overlay
virtual void handleTimerEvent (cMessage *msg)
virtual bool handleRpcCall (BaseCallMessage *msg)
 Processes Remote-Procedure-Call invocation messages.
virtual void handleRpcResponse (BaseResponseMessage *msg, cPolymorphic *context, int rpcId, simtime_t rtt)
 This method is called if an RPC response has been received.
virtual void forward (OverlayKey *key, cPacket **msg, NodeHandle *nextHopNode)
 Common API function: handles messages from overlay to be forwarded.
virtual void deliver (OverlayKey &key, cMessage *msg)
 Common API function: handles delivered messages from overlay.
virtual void update (const NodeHandle &node, bool joined)
 Common API function: informs application about neighbors and own nodeID.
virtual void finishApp ()
 collects statistical data of derived app
- Public Member Functions inherited from BaseApp
 BaseApp ()
virtual ~BaseApp ()
 virtual destructor
- Public Member Functions inherited from BaseRpc
 BaseRpc ()
const NodeHandlegetThisNode ()
 Returns the NodeHandle of this node.
simtime_t getUdpTimeout ()
- Public Member Functions inherited from RpcListener
virtual ~RpcListener ()
 destructor
- Public Member Functions inherited from BaseTcpSupport
virtual void socketDataArrived (int connId, void *yourPtr, cPacket *msg, bool urgent)
virtual void socketEstablished (int connId, void *yourPtr)
virtual void socketPeerClosed (int connId, void *yourPtr)
virtual void socketFailure (int connId, void *yourPtr, int code)
virtual void socketStatusArrived (int connId, void *yourPtr, TCPStatusInfo *status)

Protected Member Functions

void handleJoinResponse (ScribeJoinResponse *joinResponse)
 Handles a response to a join call send by this node.
void handleJoinCall (ScribeJoinCall *joinMsg)
 Handles a join request from another node.
void handlePublishCall (ScribePublishCall *publishCall)
 Handles a publish call from another node.
void handlePublishResponse (ScribePublishResponse *publishResponse)
 Handles a response to a publish call send b this node.
void handleJoinMessage (ScribeJoinCall *joinMsg, bool amIRoot)
 Handles join requests from other nodes.
void handleLeaveMessage (ScribeLeaveMessage *leaveMsg)
 Handles leave requests from other nodes.
void subscribeToGroup (const OverlayKey &groupId)
 Gets called if the local node wants to subscribe to a multicast group.
void leaveGroup (const OverlayKey &group)
 Gets called if the local node wants to leave a multicast group.
void startTimer (ScribeTimer *timer)
 Starts a local timer.
void addChildToGroup (const NodeHandle &child, ScribeGroup &group)
 Adds a child to a multicast group.
void removeChildFromGroup (const NodeHandle &child, ScribeGroup &group)
 Removes a child from a multicast group.
void removeChildFromGroup (ScribeTimer *timer)
 Removes a child from a multicast group.
void checkGroupEmpty (ScribeGroup &group)
 Chechs wheter there are any subscibers left to a given root.
void refreshChildTimer (NodeHandle &child, OverlayKey &groupId)
 Refreshes a child timer.
void deliverALMDataToGroup (ScribeDataMessage *dataMsg)
 Delivers a multicast message to all children in the multicast group.
void deliverALMDataToRoot (ALMMulticastMessage *mcastMsg)
 Delivers a multicast message to the tree's root.
- Protected Member Functions inherited from BaseApp
int numInitStages () const
 method to set InitStage
void initialize (int stage)
 initializes base class-attributes
void handleMessage (cMessage *msg)
 checks for message type and calls corresponding method
virtual void receiveChangeNotification (int category, const cPolymorphic *details)
 callback-method for events at the NotificationBoard
virtual void handleTransportAddressChangedNotification ()
 This method gets call if the node has a new TransportAddress (IP address) because he changed his access network.
virtual void handleNodeLeaveNotification ()
 This method gets call **.gracefulLeaveDelay seconds before it is killed.
virtual void handleNodeGracefulLeaveNotification ()
 This method gets call **.gracefulLeaveDelay seconds before it is killed if this node is among the gracefulLeaveProbability nodes.
void finish ()
 collects statistical data
void callRoute (const OverlayKey &key, cPacket *msg, const TransportAddress &hint=TransportAddress::UNSPECIFIED_NODE, RoutingType routingType=DEFAULT_ROUTING)
 Common API function: calls route-method in overlay.
void callRoute (const OverlayKey &key, cPacket *msg, const std::vector< TransportAddress > &sourceRoute, RoutingType routingType=DEFAULT_ROUTING)
NodeVectorcallLocalLookup (const OverlayKey &key, int num, bool safe)
 Common API function: produces a list of nodes that can be used as next hops towards key.
NodeVectorcallNeighborSet (int num)
 Common API function: produces a list of neighbor nodes.
bool isSiblingFor (const NodeHandle &node, const OverlayKey &key, int numSiblings, bool *err)
 Query if a node is among the siblings for a given key.
virtual void handleLowerMessage (cMessage *msg)
 processes self-messages
virtual void handleUDPMessage (cMessage *msg)
 method to handle messages that come directly from the UDP gate
virtual void bindToPort (int port)
 Tells UDP we want to get all packets arriving on the given port.
virtual void sendMessageToUDP (const TransportAddress &destAddr, cPacket *msg, simtime_t delay=SIMTIME_ZERO)
 Sends a packet over UDP.
virtual void handleTraceMessage (cMessage *msg)
 handleTraceMessage gets called of handleMessage(cMessage* msg) if a message arrives at trace_in.
void sendMessageToLowerTier (cPacket *msg)
 sends non-commonAPI message to the lower tier
bool internalHandleRpcCall (BaseCallMessage *msg)
 Handles internal rpc requests.
void internalHandleRpcResponse (BaseResponseMessage *msg, cPolymorphic *context, int rpcId, simtime_t rtt)
 Handles rpc responses internal in base classes

void internalSendRouteRpc (BaseRpcMessage *message, const OverlayKey &destKey, const std::vector< TransportAddress > &sourceRoute, RoutingType routingType)
virtual CompType getThisCompType ()
 Return the component type of this module.
void sendReadyMessage (bool ready=true, const OverlayKey &nodeId=OverlayKey::UNSPECIFIED_KEY)
- Protected Member Functions inherited from BaseRpc
virtual void internalHandleRpcTimeout (BaseCallMessage *msg, const TransportAddress &dest, cPolymorphic *context, int rpcId, const OverlayKey &destKey)
 Handles rpc timeouts internal in base classes

void initRpcs ()
 Initializes Remote-Procedure state.
void finishRpcs ()
 Deinitializes Remote-Procedure state.
virtual void internalHandleRpcMessage (BaseRpcMessage *msg)
 Handles incoming rpc messages and delegates them to the corresponding listeners or handlers.
uint32_t sendRouteRpcCall (CompType destComp, const TransportAddress &dest, const OverlayKey &destKey, BaseCallMessage *msg, cPolymorphic *context=NULL, RoutingType routingType=DEFAULT_ROUTING, simtime_t timeout=-1, int retries=0, int rpcId=-1, RpcListener *rpcListener=NULL)
 Routes a Remote-Procedure-Call message to an OverlayKey.
uint32_t sendRouteRpcCall (CompType destComp, const OverlayKey &destKey, BaseCallMessage *msg, cPolymorphic *context=NULL, RoutingType routingType=DEFAULT_ROUTING, simtime_t timeout=-1, int retries=0, int rpcId=-1, RpcListener *rpcListener=NULL)
 Routes a Remote-Procedure-Call message to an OverlayKey.
uint32_t sendRouteRpcCall (CompType destComp, const TransportAddress &dest, BaseCallMessage *msg, cPolymorphic *context=NULL, RoutingType routingType=DEFAULT_ROUTING, simtime_t timeout=-1, int retries=0, int rpcId=-1, RpcListener *rpcListener=NULL)
 Sends a Remote-Procedure-Call message using the overlay's UDP port
This replaces ROUTE_DIRECT calls!
uint32_t sendUdpRpcCall (const TransportAddress &dest, BaseCallMessage *msg, cPolymorphic *context=NULL, simtime_t timeout=-1, int retries=0, int rpcId=-1, RpcListener *rpcListener=NULL)
 Sends a Remote-Procedure-Call message to the underlay

uint32_t sendInternalRpcCall (CompType destComp, BaseCallMessage *msg, cPolymorphic *context=NULL, simtime_t timeout=-1, int retries=0, int rpcId=-1, RpcListener *rpcListener=NULL)
 Sends an internal Remote-Procedure-Call between two tiers

void cancelRpcMessage (uint32_t nonce)
 Cancels a Remote-Procedure-Call.
void cancelAllRpcs ()
 Cancels all RPCs.
void sendRpcResponse (TransportType transportType, CompType destComp, const TransportAddress &dest, const OverlayKey &destKey, BaseCallMessage *call, BaseResponseMessage *response)
 Send Remote-Procedure response message and deletes call message.
void sendRpcResponse (BaseCallMessage *call, BaseResponseMessage *response)
 Send Remote-Procedure response message to UDP and deletes call message.
int pingNode (const TransportAddress &dest, simtime_t timeout=-1, int retries=0, cPolymorphic *context=NULL, const char *caption="PING", RpcListener *rpcListener=NULL, int rpcId=-1, TransportType transportType=INVALID_TRANSPORT)
 ping a node by its TransportAddress
virtual void pingResponse (PingResponse *pingResponse, cPolymorphic *context, int rpcId, simtime_t rtt)
virtual void pingTimeout (PingCall *pingCall, const TransportAddress &dest, cPolymorphic *context, int rpcId)
bool internalHandleMessage (cMessage *msg)
- Protected Member Functions inherited from RpcListener
virtual void handleRpcResponse (BaseResponseMessage *msg, const RpcState &rpcState, simtime_t rtt)
 This method is called if an RPC response has been received.
virtual void handleRpcTimeout (BaseCallMessage *msg, const TransportAddress &dest, cPolymorphic *context, int rpcId, const OverlayKey &destKey)
 This method is called if an RPC timeout has been reached.
virtual void handleRpcTimeout (const RpcState &rpcState)
 This method is called if an RPC timeout has been reached.
- Protected Member Functions inherited from BaseTcpSupport
void handleTCPMessage (cMessage *msg)
 Member function to handle incoming TCP messages.
void bindAndListenTcp (int port)
 Member function to bind service to the specified port and listen afterwards.
bool isAlreadyConnected (TransportAddress address)
 Member function to check if the service is already connected.
void establishTcpConnection (TransportAddress address)
 Member function to establish a connection to the specified node.
void sendTcpData (cPacket *msg, TransportAddress address)
 Member function to send TCP data to the specified node.
virtual void handleConnectionEvent (EvCode code, TransportAddress address)
 Member function to handle passive connection events.
virtual void handleDataReceived (TransportAddress address, cPacket *msg, bool urgent)
 Member function to handle incoming data.
virtual void handleIncomingConnection (TransportAddress address)
 Member function to handle newly opened connections.
void closeTcpConnection (TransportAddress address)
 Member function to close an established connection.
void setTcpOut (cGate *gate)
 Member function to set local gate towards the TCP module during init phase.
cGate * getTcpOut ()
 Member function to get local gate towards the TCP module.

Private Types

typedef std::map< OverlayKey,
ScribeGroup
GroupList
typedef std::multimap
< NodeHandle, ScribeTimer * > 
ChildTimeoutList

Private Attributes

GroupList groupList
ChildTimeoutList childTimeoutList
int childTimeout
int parentTimeout
ScribeTimersubscriptionTimer
int numJoins
int numChildTimeout
int numParentTimeout
int numForward
int forwardBytes
int numReceived
int receivedBytes
int numHeartbeat
int heartbeatBytes
int numSubscriptionRefresh
int subscriptionRefreshBytes

Additional Inherited Members

- Public Types inherited from BaseTcpSupport
enum  EvCode {
  NO_EST_CONNECTION, PEER_CLOSED, PEER_TIMEDOUT, PEER_REFUSED,
  CONNECTION_RESET, CONNECTION_SUCC_ClOSED
}
- Protected Attributes inherited from BaseApp
UnderlayConfiguratorunderlayConfigurator
 pointer to UnderlayConfigurator in this node
GlobalNodeListglobalNodeList
 pointer to GlobalNodeList in this node
GlobalStatisticsglobalStatistics
 pointer to GlobalStatistics module in this node
NotificationBoard * notificationBoard
 pointer to NotificationBoard in this node
bool debugOutput
 debug output yes/no?
int numOverlaySent
 number of sent packets to overlay
int bytesOverlaySent
 number of sent bytes to overlay
int numOverlayReceived
 number of received packets from overlay
int bytesOverlayReceived
 number of received bytes from overlay
int numUdpSent
 number of sent packets to UDP
int bytesUdpSent
 number of sent bytes to UDP
int numUdpReceived
 number of received packets from UDP
int bytesUdpReceived
 number of received bytes from UDP
simtime_t creationTime
 simTime when the App has been created

Detailed Description

Definition at line 57 of file Scribe.h.

Member Typedef Documentation

typedef std::multimap<NodeHandle, ScribeTimer*> Scribe::ChildTimeoutList
private

Definition at line 62 of file Scribe.h.

typedef std::map<OverlayKey, ScribeGroup> Scribe::GroupList
private

Definition at line 60 of file Scribe.h.

Constructor & Destructor Documentation

Scribe::~Scribe ( )

Definition at line 53 of file Scribe.cc.

{
groupList.clear();
cancelAndDelete(subscriptionTimer);
// TODO: clear childTimeoutList
}

Member Function Documentation

void Scribe::addChildToGroup ( const NodeHandle child,
ScribeGroup group 
)
protected

Adds a child to a multicast group.

Definition at line 490 of file Scribe.cc.

{
if( child == overlay->getThisNode() ) {
// Join from ourself, ignore
return;
}
// add child to group's children list
pair<set<NodeHandle>::iterator, bool> inserter =
group.addChild( child );
if( inserter.second ) {
// if child has not been in the list, create new timeout msg
ScribeTimer* timeoutMsg = new ScribeTimer;
// Remember child and group
timeoutMsg->setChild( *inserter.first );
timeoutMsg->setGroup( group.getGroupId() );
startTimer( timeoutMsg );
childTimeoutList.insert( make_pair(child, timeoutMsg) );
}
}
void Scribe::checkGroupEmpty ( ScribeGroup group)
protected

Chechs wheter there are any subscibers left to a given root.

Definition at line 565 of file Scribe.cc.

{
if( !group.isForwarder() && !group.getSubscription() && !group.getAmISource()){
if( !group.getParent().isUnspecified() && group.getParent() != overlay->getThisNode() ) {
msg->setGroupId( group.getGroupId() );
msg->setBitLength( SCRIBE_LEAVE_L(msg) );
}
if( group.getParentTimer() ) cancelAndDelete( group.getParentTimer() );
if( group.getHeartbeatTimer() ) cancelAndDelete( group.getHeartbeatTimer() );
groupList.erase( group.getGroupId() );
}
}
void Scribe::deliver ( OverlayKey key,
cMessage *  msg 
)
virtual

Common API function: handles delivered messages from overlay.

method to handle decapsulated KBRdeliver messages from overlay module, should be overwritten in derived application

Parameters
keydestination key
msgdelivered message

Reimplemented from BaseApp.

Definition at line 178 of file Scribe.cc.

{
dynamic_cast<ScribeSubscriptionRefreshMessage*>(msg) ){
// reset child timeout
refreshChildTimer( refreshMsg->getSrc(), refreshMsg->getGroupId() );
delete refreshMsg;
} else if( ScribeDataMessage* data = dynamic_cast<ScribeDataMessage*>(msg) ){
} else if( ScribeLeaveMessage* leaveMsg = dynamic_cast<ScribeLeaveMessage*>(msg) ){
handleLeaveMessage( leaveMsg );
}
}
void Scribe::deliverALMDataToGroup ( ScribeDataMessage dataMsg)
protected

Delivers a multicast message to all children in the multicast group.

Definition at line 664 of file Scribe.cc.

{
// find group
GroupList::iterator it = groupList.find( dataMsg->getGroupId() );
if( it == groupList.end() ) {
EV << "[Scribe::deliverALMDataToGroup() @ " << overlay->getThisNode().getIp()
<< "Getting ALM data message response for an unknown group!\n";
delete dataMsg;
return;
}
// FIXME: ignore message if not from designated parent to avoid duplicates
// reset parent heartbeat Timer
ScribeTimer *timer = it->second.getParentTimer();
if( timer ) startTimer( timer );
// Only empty heartbeat?
if( dataMsg->getEmpty() ) {
delete dataMsg;
return;
}
// deliver data to children
for( set<NodeHandle>::iterator cit = it->second.getChildrenBegin();
cit != it->second.getChildrenEnd(); ++cit ) {
ScribeDataMessage* newMsg = new ScribeDataMessage( *dataMsg );
RECORD_STATS(++numForward; forwardBytes += newMsg->getByteLength());
}
// deliver to myself if I'm subscribed to that group
if( it->second.getSubscription() ) {
ALMMulticastMessage* mcastMsg = new ALMMulticastMessage( dataMsg->getName() );
mcastMsg->setGroupId( dataMsg->getGroupId() );
mcastMsg->encapsulate( dataMsg->decapsulate() );
RECORD_STATS(++numReceived; receivedBytes += dataMsg->getByteLength());
send( mcastMsg, "to_upperTier" );
}
delete dataMsg;
}
void Scribe::deliverALMDataToRoot ( ALMMulticastMessage mcastMsg)
protected

Delivers a multicast message to the tree's root.

This method gets called when the local app wants to publish some data to the multiacst group.

Definition at line 624 of file Scribe.cc.

{
// find group
pair<GroupList::iterator, bool> groupInserter;
groupInserter = groupList.insert( make_pair(mcastMsg->getGroupId(), ScribeGroup(mcastMsg->getGroupId())) );
// Group is not known yet
if( groupInserter.second ) {
groupInserter.first->second.setAmISource( true );
// TODO: Start/Restart timer to clean up cached groups
// If the timer expires, the flag should be cleared and checkGroupEmpty should be called
//
// FIXME: amISource should be set allways if app publishes messages to the group
// As the timer is not implemented yet, we only set the flag in "sender, but not receiver" mode
// to reduce the amount of unneccessary cached groups
}
ScribeDataMessage* dataMsg = new ScribeDataMessage( mcastMsg->getName() );
dataMsg->setGroupId( mcastMsg->getGroupId() );
dataMsg->setBitLength( SCRIBE_DATA_L( dataMsg ));
dataMsg->encapsulate( mcastMsg->decapsulate() );
// Send publish ...
ScribePublishCall* msg = new ScribePublishCall( "ScribePublish" );
msg->setGroupId( dataMsg->getGroupId() );
msg->setBitLength( SCRIBE_PUBLISHCALL_L(msg) );
msg->encapsulate( dataMsg );
if( !groupInserter.first->second.getRendezvousPoint().isUnspecified() ) {
// ... directly to the rendevouz point, if known ...
sendRouteRpcCall(TIER1_COMP, groupInserter.first->second.getRendezvousPoint(), msg);
} else {
// ... else route it via KBR
}
delete mcastMsg;
}
void Scribe::finishApp ( )
virtual

collects statistical data of derived app

Reimplemented from BaseApp.

Definition at line 84 of file Scribe.cc.

{
if (time < GlobalStatistics::MIN_MEASURED) return;
globalStatistics->addStdDev("Scribe: Received JOIN Messages/s",
numJoins / time);
globalStatistics->addStdDev("Scribe: Forwarded Multicast Messages/s",
numForward / time);
globalStatistics->addStdDev("Scribe: Forwarded Multicast Bytes/s",
forwardBytes / time);
globalStatistics->addStdDev("Scribe: Received Multicast Messages/s (subscribed groups only)",
numReceived / time);
globalStatistics->addStdDev("Scribe: Received Multicast Bytes/s (subscribed groups only)",
receivedBytes / time);
globalStatistics->addStdDev("Scribe: Send Heartbeat Messages/s",
numHeartbeat / time);
globalStatistics->addStdDev("Scribe: Send Heartbeat Bytes/s",
heartbeatBytes / time);
globalStatistics->addStdDev("Scribe: Send Subscription Refresh Messages/s",
globalStatistics->addStdDev("Scribe: Send Subscription Refresh Bytes/s",
globalStatistics->addStdDev("Scribe: Number of Child Timeouts/s",
numChildTimeout / time);
globalStatistics->addStdDev("Scribe: Number of Parent Timeouts/s",
}
void Scribe::forward ( OverlayKey key,
cPacket **  msg,
NodeHandle nextHopNode 
)
virtual

Common API function: handles messages from overlay to be forwarded.

method to handle decapsulated KBRdeliver messages from overlay module, should be overwritten in derived application if needed

Parameters
keydestination key
msgmessage to forward
nextHopNodenext hop

Reimplemented from BaseApp.

Definition at line 113 of file Scribe.cc.

{
ScribeJoinCall* joinMsg = dynamic_cast<ScribeJoinCall*> (*msg);
if( joinMsg == NULL ) {
// nothing to be done
return;
}
if( joinMsg->getSrcNode() == overlay->getThisNode() ) return;
handleJoinMessage( joinMsg, false );
*msg = NULL;
}
void Scribe::handleJoinCall ( ScribeJoinCall joinMsg)
protected

Handles a join request from another node.

This method only gets called if the local node is the root of the multicast group. It only calls handlePublishCall with amIRoot parameter set to "true"

Definition at line 324 of file Scribe.cc.

{
handleJoinMessage( joinMsg, true );
}
void Scribe::handleJoinMessage ( ScribeJoinCall joinMsg,
bool  amIRoot 
)
protected

Handles join requests from other nodes.

Definition at line 329 of file Scribe.cc.

{
OverlayKey key = joinMsg->getGroupId();
EV << "[Scribe::handleJoinMessage() @ " << overlay->getThisNode().getIp()
<< " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
<< " Received a ScribeJoin for group " << key << "\n"
<< " msg=" << joinMsg
<< endl;
// Insert group into grouplist, if not already there
pair<GroupList::iterator, bool> groupInserter;
groupInserter = groupList.insert( make_pair(key, ScribeGroup(key)) );
// If group is new or no parent is known, send join to parent (unless I am root, so there is no parent)
if ( !amIRoot && ( groupInserter.second || groupInserter.first->second.getParent().isUnspecified()) ) {
newJoin->setGroupId( key );
newJoin->setBitLength( SCRIBE_JOINCALL_L(newJoin) );
sendRouteRpcCall(TIER1_COMP, key, newJoin);
}
// If group had no children, start heartbeat timer for group
if( groupInserter.first->second.numChildren() == 0 ) {
ScribeTimer* heartbeat = new ScribeTimer("HeartbeatTimer");
heartbeat->setGroup( groupInserter.first->second.getGroupId() );
startTimer( heartbeat );
if( ScribeTimer* t = groupInserter.first->second.getHeartbeatTimer() ){
// delete old timer, if any
if( t ) cancelAndDelete( t );
}
groupInserter.first->second.setHeartbeatTimer( heartbeat );
}
// Add child to group
addChildToGroup( joinMsg->getSrcNode(), groupInserter.first->second );
// Send joinResponse
joinResponse->setGroupId( key );
joinResponse->setBitLength( SCRIBE_JOINRESPONSE_L(joinResponse) );
sendRpcResponse( joinMsg, joinResponse );
}
void Scribe::handleJoinResponse ( ScribeJoinResponse joinResponse)
protected

Handles a response to a join call send by this node.

Definition at line 410 of file Scribe.cc.

{
GroupList::iterator it = groupList.find( joinResponse->getGroupId() );
if( it == groupList.end() ) {
EV << "[Scribe::handleJoinResponse() @ " << overlay->getThisNode().getIp()
<< " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
<< "Getting join response for an unknown group!\n";
return;
}
it->second.setParent( joinResponse->getSrcNode() );
// Create new heartbeat timer
ScribeTimer* parentTimeout = new ScribeTimer("ParentTimeoutTimer");
parentTimeout->setGroup( it->second.getGroupId() );
startTimer( parentTimeout );
if( ScribeTimer* t = it->second.getParentTimer() ){
// delete old timer, if any
if( t ) cancelAndDelete( t );
}
it->second.setParentTimer( parentTimeout );
}
void Scribe::handleLeaveMessage ( ScribeLeaveMessage leaveMsg)
protected

Handles leave requests from other nodes.

Definition at line 450 of file Scribe.cc.

{
GroupList::iterator it = groupList.find( leaveMsg->getGroupId() );
if( it != groupList.end() ){
removeChildFromGroup( leaveMsg->getSrc(), it->second );
}
delete leaveMsg;
}
void Scribe::handlePublishCall ( ScribePublishCall publishCall)
protected

Handles a publish call from another node.

Publish calls are used to send multicast messages to the root of the multicast tree.

Definition at line 375 of file Scribe.cc.

{
// find group
GroupList::iterator it = groupList.find( publishCall->getGroupId() );
if( it == groupList.end() ||
it->second.getParent().isUnspecified() ||
it->second.getParent() != overlay->getThisNode() ){
// if I don't know the group or I am not root, inform sender
// TODO: forward message when I'm not root but know the rendevous point?
ScribePublishResponse* msg = new ScribePublishResponse("Wrong Root");
msg->setGroupId( publishCall->getGroupId() );
msg->setWrongRoot( true );
msg->setBitLength( SCRIBE_PUBLISHRESPONSE_L(msg) );
sendRpcResponse( publishCall, msg );
} else {
ScribeDataMessage* data = dynamic_cast<ScribeDataMessage*>(publishCall->decapsulate());
ScribePublishResponse* msg = new ScribePublishResponse("Publish Successful");
msg->setGroupId( publishCall->getGroupId() );
msg->setBitLength( SCRIBE_PUBLISHRESPONSE_L(msg) );
sendRpcResponse( publishCall, msg );
if( !data ) {
// TODO: throw exception? this should never happen
EV << "[Scribe::handlePublishCall() @ " << overlay->getThisNode().getIp()
<< " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
<< " PublishCall for group " << msg->getGroupId()
<< " does not contain a calid ALM data message!\n"
<< endl;
return;
}
}
}
void Scribe::handlePublishResponse ( ScribePublishResponse publishResponse)
protected

Handles a response to a publish call send b this node.

Publish calls are used to send multicast messages to the root of the multicast tree.

Definition at line 433 of file Scribe.cc.

{
GroupList::iterator it = groupList.find( publishResponse->getGroupId() );
if( it == groupList.end() ) {
EV << "[Scribe::handlePublishResponse() @ " << overlay->getThisNode().getIp()
<< " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
<< "Getting publish response for an unknown group!\n";
return;
}
if( publishResponse->getWrongRoot() ) {
it->second.setRendezvousPoint( NodeHandle::UNSPECIFIED_NODE );
} else {
it->second.setRendezvousPoint( publishResponse->getSrcNode() );
}
}
void Scribe::handleReadyMessage ( CompReadyMessage msg)
virtual

method to handle ready messages from the overlay

Parameters
msgmessage to handle

Reimplemented from BaseApp.

Definition at line 227 of file Scribe.cc.

{
// process only ready messages from the tier below
if( (getThisCompType() - msg->getComp() == 1) && msg->getReady() ) {
// Send a ready message to the tier above
readyMsg->setReady( true );
readyMsg->setComp( getThisCompType() );
send( readyMsg, "to_upperTier" );
}
delete msg;
}
bool Scribe::handleRpcCall ( BaseCallMessage msg)
virtual

Processes Remote-Procedure-Call invocation messages.


This method should be overloaded when the overlay provides RPC functionality.

Returns
true, if rpc has been handled

Reimplemented from BaseRpc.

Definition at line 149 of file Scribe.cc.

void Scribe::handleRpcResponse ( BaseResponseMessage msg,
cPolymorphic *  context,
int  rpcId,
simtime_t  rtt 
)
virtual

This method is called if an RPC response has been received.

Parameters
msgThe response message.
contextPointer to an optional state object. The object has to be handled/deleted by the handleRpcResponse() code
rpcIdThe RPC id.
rttThe Round-Trip-Time of this RPC

Reimplemented from RpcListener.

Definition at line 158 of file Scribe.cc.

{
RPC_ON_RESPONSE( ScribeJoin ) {
handleJoinResponse( _ScribeJoinResponse );
EV << "[Scribe::handleRpcResponse() @ " << overlay->getThisNode().getIp()
<< " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
<< " Received a ScribeJoin RPC Response: id=" << rpcId << "\n"
<< " msg=" << *_ScribeJoinResponse << " rtt=" << rtt
<< endl;
break;
}
RPC_ON_RESPONSE( ScribePublish ) {
handlePublishResponse( _ScribePublishResponse );
}
}
void Scribe::handleTimerEvent ( cMessage *  msg)
virtual

Reimplemented from BaseRpc.

Definition at line 244 of file Scribe.cc.

{
ScribeTimer* timer = dynamic_cast<ScribeTimer*>(msg);
assert( timer );
switch( timer->getTimerType() ) {
// renew subscriptions for all groups
for( GroupList::iterator it = groupList.begin(); it != groupList.end(); ++it ) {
NodeHandle parent = it->second.getParent();
if( !parent.isUnspecified() ){
refreshMsg->setGroupId( it->second.getGroupId() );
refreshMsg->setSrc( overlay->getThisNode() );
refreshMsg->setBitLength(SCRIBE_SUBSCRIPTIONREFRESH_L(refreshMsg));
subscriptionRefreshBytes += refreshMsg->getByteLength()
);
callRoute( OverlayKey::UNSPECIFIED_KEY, refreshMsg, parent );
}
}
break;
{
// Send heartbeat messages to all children in the group
GroupList::iterator groupIt = groupList.find( timer->getGroup() );
if( groupIt == groupList.end() ) {
// FIXME: should not happen
delete timer;
return;
}
for( set<NodeHandle>::iterator it = groupIt->second.getChildrenBegin();
it != groupIt->second.getChildrenEnd(); ++it ) {
ScribeDataMessage* heartbeatMsg = new ScribeDataMessage("Heartbeat");
heartbeatMsg->setEmpty( true );
heartbeatMsg->setGroupId( timer->getGroup() );
heartbeatMsg->setBitLength(SCRIBE_DATA_L(heartbeatMsg));
RECORD_STATS(++numHeartbeat; heartbeatBytes += heartbeatMsg->getByteLength());
callRoute( OverlayKey::UNSPECIFIED_KEY, heartbeatMsg, *it );
}
startTimer( timer );
break;
}
// Child failed, remove it from group
break;
// Parent failed, send new join to rejoin group
OverlayKey key = timer->getGroup();
EV << "[Scribe::handleTimerEvent() @ " << overlay->getThisNode().getIp()
<< " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
<< " Parent of group " << key << "\n"
<< " failed to send heartbeat, trying to rejoin"
<< endl;
newJoin->setGroupId( key );
newJoin->setBitLength( SCRIBE_JOINCALL_L(newJoin) );
sendRouteRpcCall(TIER1_COMP, key, newJoin);
GroupList::iterator groupIt = groupList.find( timer->getGroup() );
if( groupIt == groupList.end() ) {
// FIXME: should not happen
delete timer;
return;
}
groupIt->second.setParentTimer( NULL );
cancelAndDelete( timer );
break;
}
}
void Scribe::handleUpperMessage ( cMessage *  msg)
virtual

handleUpperMessage gets called of handleMessage(cMessage* msg) if msg arrivedOn from_upperTier (currently msg gets deleted in this function)

Parameters
msgthe message to handle

Reimplemented from BaseApp.

Definition at line 192 of file Scribe.cc.

{
if( ALMSubscribeMessage* subscribeMsg = dynamic_cast<ALMSubscribeMessage*>(msg)){
subscribeToGroup( subscribeMsg->getGroupId() );
delete msg;
} else if( ALMLeaveMessage* leaveMsg = dynamic_cast<ALMLeaveMessage*>(msg)){
leaveGroup( leaveMsg->getGroupId() );
delete msg;
} else if( ALMMulticastMessage* mcastMsg = dynamic_cast<ALMMulticastMessage*>(msg) ){
deliverALMDataToRoot( mcastMsg );
} else if( ALMAnycastMessage* acastMsg = dynamic_cast<ALMAnycastMessage*>(msg) ){
// FIXME: anycast not implemented yet
EV << "[Scribe::handleUpperMessage() @ " << overlay->getThisNode().getIp()
<< " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
<< " Anycast message for group " << acastMsg->getGroupId() << "\n"
<< " ignored: Not implemented yet!"
<< endl;
delete msg;
} else if( ALMCreateMessage* createMsg = dynamic_cast<ALMCreateMessage*>(msg) ){
EV << "[Scribe::handleUpperMessage() @ " << overlay->getThisNode().getIp()
<< " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
<< " Create message for group " << createMsg->getGroupId() << "\n"
<< " ignored: Scribe has implicit create on SUBSCRIBE"
<< endl;
delete msg;
} else if( ALMDeleteMessage* deleteMsg = dynamic_cast<ALMDeleteMessage*>(msg) ){
EV << "[Scribe::handleUpperMessage() @ " << overlay->getThisNode().getIp()
<< " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
<< " Delete message for group " << deleteMsg->getGroupId() << "\n"
<< " ignored: Scribe has implicit delete on LEAVE"
<< endl;
delete msg;
}
}
void Scribe::initializeApp ( int  stage)
virtual

initializes derived class-attributes

Parameters
stagethe init stage

Reimplemented from BaseApp.

Definition at line 60 of file Scribe.cc.

{
if( stage != (numInitStages()-1))
{
return;
}
WATCH(groupList);
WATCH(numJoins);
WATCH(numForward);
WATCH(forwardBytes);
WATCH(numReceived);
WATCH(receivedBytes);
WATCH(numHeartbeat);
childTimeout = par("childTimeout");
parentTimeout = par("parentTimeout");
}
void Scribe::leaveGroup ( const OverlayKey group)
protected

Gets called if the local node wants to leave a multicast group.

Parameters
groupthe ID of the group to leave

Definition at line 481 of file Scribe.cc.

{
GroupList::iterator it = groupList.find( group );
if( it != groupList.end() ){
it->second.setSubscription( false );
checkGroupEmpty( it->second );
}
}
void Scribe::refreshChildTimer ( NodeHandle child,
OverlayKey groupId 
)
protected

Refreshes a child timer.

If a child sends a subscribtion refresh, this method gets called. It finds the subscriptionTimeout timer for the group and reschedules it.

Definition at line 584 of file Scribe.cc.

{
// find timer
pair<ChildTimeoutList::iterator, ChildTimeoutList::iterator> ret =
childTimeoutList.equal_range( child );
// no timer yet?
if( ret.first == childTimeoutList.end() ) return;
// restart timer
for( ChildTimeoutList::iterator it = ret.first; it!=ret.second; ++it) {
if( it->first == child && it->second->getGroup() == groupId ) {
startTimer( it->second );
}
}
}
void Scribe::removeChildFromGroup ( const NodeHandle child,
ScribeGroup group 
)
protected

Removes a child from a multicast group.

Definition at line 515 of file Scribe.cc.

{
// find timer
ScribeTimer* timer = NULL;
pair<ChildTimeoutList::iterator, ChildTimeoutList::iterator> ret =
childTimeoutList.equal_range( child );
if( ret.first != childTimeoutList.end() ){
for( ChildTimeoutList::iterator it = ret.first; it!=ret.second; ++it) {
if( group == it->second->getGroup() ) {
timer = it->second;
childTimeoutList.erase( it );
cancelAndDelete( timer );
break;
}
}
}
// remove child from group's childrenlist
group.removeChild( child );
checkGroupEmpty( group );
}
void Scribe::removeChildFromGroup ( ScribeTimer timer)
protected

Removes a child from a multicast group.

Both the child and the group are determined from the timer message This method gets calld if a subscription timer of a child expires.

Definition at line 538 of file Scribe.cc.

{
NodeHandle& child = timer->getChild();
GroupList::iterator groupIt = groupList.find( timer->getGroup() );
if( groupIt != groupList.end() ) {
ScribeGroup& group = groupIt->second;
// remove child from group's childrenlist
group.removeChild( child );
checkGroupEmpty( group );
}
// remove timer from timeoutlist
pair<ChildTimeoutList::iterator, ChildTimeoutList::iterator> ret =
childTimeoutList.equal_range( child );
if( ret.first != childTimeoutList.end() ) {
for( ChildTimeoutList::iterator it = ret.first; it!=ret.second; ++it) {
if( it->second == timer ) {
childTimeoutList.erase( it );
cancelAndDelete( timer );
break;
}
}
}
}
void Scribe::startTimer ( ScribeTimer timer)
protected

Starts a local timer.

This method automaticly determines the type of the timer and schedules it accordingly. If the timer is already scheduled, it gets canceled before getting rescheduled.

Definition at line 600 of file Scribe.cc.

{
if( timer->isScheduled() ) {
cancelEvent( timer );
}
int duration = 0;
switch( timer->getTimerType() ) {
duration = parentTimeout/2;
break;
duration = childTimeout/2;
break;
duration = parentTimeout;
break;
duration = childTimeout;
break;
}
scheduleAt(simTime() + duration, timer );
}
void Scribe::subscribeToGroup ( const OverlayKey groupId)
protected

Gets called if the local node wants to subscribe to a multicast group.

Parameters
groupIdthe ID of the group to join

Definition at line 459 of file Scribe.cc.

{
EV << "[Scribe::subscribeToGroup() @ " << overlay->getThisNode().getIp()
<< " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
<< " Trying to join group " << groupId << "\n";
// Insert group into grouplist, if not known yet
pair<GroupList::iterator, bool> groupInserter;
groupInserter = groupList.insert( make_pair(groupId, ScribeGroup(groupId)) );
// Set subscription status
groupInserter.first->second.setSubscription(true);
// Send join call if I'm not already a forwarder of this group
if( groupInserter.second || groupInserter.first->second.getParent().isUnspecified()) {
m->setGroupId( groupId );
m->setBitLength( SCRIBE_JOINCALL_L(m) );
}
}
void Scribe::update ( const NodeHandle node,
bool  joined 
)
virtual

Common API function: informs application about neighbors and own nodeID.

Parameters
nodenew or lost neighbor
joinednew or lost?

Reimplemented from BaseApp.

Definition at line 129 of file Scribe.cc.

{
// if node is closer to any group i'm root of, subscribe
for( GroupList::iterator it = groupList.begin(); it != groupList.end(); ++it ){
// if I'm root ...
if( !it->second.getParent().isUnspecified()
&& it->second.getParent() == overlay->getThisNode() ) {
KeyDistanceComparator<KeyRingMetric> comp( it->second.getGroupId() );
// ... and new node is closer to groupId
if( comp.compare(node.getKey(), overlay->getThisNode().getKey()) < 0){
// then the new node is new group root, so send him a subscribe
m->setGroupId( it->second.getGroupId() );
m->setBitLength( SCRIBE_JOINCALL_L(m) );
}
}
}
}

Member Data Documentation

int Scribe::childTimeout
private

Definition at line 65 of file Scribe.h.

ChildTimeoutList Scribe::childTimeoutList
private

Definition at line 63 of file Scribe.h.

int Scribe::forwardBytes
private

Definition at line 75 of file Scribe.h.

GroupList Scribe::groupList
private

Definition at line 61 of file Scribe.h.

int Scribe::heartbeatBytes
private

Definition at line 79 of file Scribe.h.

int Scribe::numChildTimeout
private

Definition at line 72 of file Scribe.h.

int Scribe::numForward
private

Definition at line 74 of file Scribe.h.

int Scribe::numHeartbeat
private

Definition at line 78 of file Scribe.h.

int Scribe::numJoins
private

Definition at line 71 of file Scribe.h.

int Scribe::numParentTimeout
private

Definition at line 73 of file Scribe.h.

int Scribe::numReceived
private

Definition at line 76 of file Scribe.h.

int Scribe::numSubscriptionRefresh
private

Definition at line 80 of file Scribe.h.

int Scribe::parentTimeout
private

Definition at line 66 of file Scribe.h.

int Scribe::receivedBytes
private

Definition at line 77 of file Scribe.h.

int Scribe::subscriptionRefreshBytes
private

Definition at line 81 of file Scribe.h.

ScribeTimer* Scribe::subscriptionTimer
private

Definition at line 68 of file Scribe.h.


The documentation for this class was generated from the following files: