OverSim
Pastry Class Reference

Pastry overlay module. More...

#include <Pastry.h>

Inheritance diagram for Pastry:
BasePastry BaseOverlay ProxListener BaseRpc BaseTcpSupport TopologyVis RpcListener

Public Member Functions

virtual ~Pastry ()
virtual void initializeOverlay (int stage)
 Initializes derived-class-attributes.
virtual void handleTimerEvent (cMessage *msg)
virtual void handleUDPMessage (BaseOverlayMessage *msg)
 Processes messages from underlay.
void handleStateMessage (PastryStateMessage *msg)
 processes state messages, merging with own state tables
virtual void pingResponse (PingResponse *pingResponse, cPolymorphic *context, int rpcId, simtime_t rtt)
- Public Member Functions inherited from BasePastry
virtual ~BasePastry ()
int getMaxNumSiblings ()
 Query the maximum number of siblings (nodes close to a key) that are maintained by this overlay protocol.
int getMaxNumRedundantNodes ()
 Query the maximum number of redundant next hop nodes that are returned by findNode().
virtual void handleAppMessage (BaseOverlayMessage *msg)
 processes messages from application
virtual void updateTooltip ()
 updates information shown in tk-environment
virtual NodeVectorfindNode (const OverlayKey &key, int numRedundantNodes, int numSiblings, BaseOverlayMessage *msg)
 Implements the find node call.
virtual void finishOverlay ()
 collects statistical data in derived class
virtual bool isSiblingFor (const NodeHandle &node, const OverlayKey &key, int numSiblings, bool *err)
 Query if a node is among the siblings for a given key.
virtual AbstractLookupcreateLookup (RoutingType routingType=DEFAULT_ROUTING, const BaseOverlayMessage *msg=NULL, const cPacket *dummy=NULL, bool appLookup=false)
 Creates an abstract iterative lookup instance.
uint8_t getBitsPerDigit ()
void proxCallback (const TransportAddress &node, int rpcId, cPolymorphic *contextPointer, Prox prox)
virtual OverlayKey estimateMeanDistance ()
 returns mean distance between OverlayKeys in the network
uint8_t getRTLastRow () const
std::vector< TransportAddress > * getRTRow (uint8_t index) const
std::vector< TransportAddress > * getLeafSet () const
- Public Member Functions inherited from BaseOverlay
 BaseOverlay ()
virtual ~BaseOverlay ()
 Virtual destructor.
States getState ()
bool isMalicious ()
 Returns true, if node is malicious.
bool isInSimpleMultiOverlayHost ()
 Returns true if overlay is one in an array, inside a SimpleMultiOverlayHost.
const simtime_t & getCreationTime ()
void join (const OverlayKey &nodeID=OverlayKey::UNSPECIFIED_KEY)
 Join the overlay with a given nodeID.
virtual NodeVectorlocal_lookup (const OverlayKey &key, int num, bool safe)
 finds nodes closest to the given OverlayKey
virtual NodeVectorneighborSet (int num)
void sendMessageToUDP (const TransportAddress &dest, cPacket *msg, simtime_t delay=SIMTIME_ZERO)
 Sends message to underlay.
void sendToKey (const OverlayKey &key, BaseOverlayMessage *message, int numSiblings=1, const std::vector< TransportAddress > &sourceRoute=TransportAddress::UNSPECIFIED_NODES, RoutingType routingType=DEFAULT_ROUTING)
 Sends a message to an overlay node, with the generic routing algorithm.
void registerComp (CompType compType, cModule *module)
cModule * getCompModule (CompType compType)
cGate * getCompRpcGate (CompType compType)
void sendMessageToAllComp (cMessage *msg, CompType srcComp)
bool providesKbr ()
bool getMeasureAuthBlock ()
BootstrapListgetBootstrapList () const
virtual uint32_t estimateOverlaySize ()
 estimates the current number of nodes online
- Public Member Functions inherited from BaseRpc
 BaseRpc ()
const NodeHandlegetThisNode ()
 Returns the NodeHandle of this node.
simtime_t getUdpTimeout ()
- Public Member Functions inherited from RpcListener
virtual ~RpcListener ()
 destructor
- Public Member Functions inherited from BaseTcpSupport
virtual void socketDataArrived (int connId, void *yourPtr, cPacket *msg, bool urgent)
virtual void socketEstablished (int connId, void *yourPtr)
virtual void socketPeerClosed (int connId, void *yourPtr)
virtual void socketFailure (int connId, void *yourPtr, int code)
virtual void socketStatusArrived (int connId, void *yourPtr, TCPStatusInfo *status)
- Public Member Functions inherited from TopologyVis
 TopologyVis ()
void showOverlayNeighborArrow (const NodeHandle &neighbor, bool flush=true, const char *displayString=NULL)
 Draws an arrow from this node to neighbor.
void deleteOverlayNeighborArrow (const NodeHandle &neighbor)
 Removes an arrow from this node to neighbor.

Protected Member Functions

virtual void purgeVectors (void)
 delete all information/messages caching vectors, used for restarting overlay or finish()
bool handleRpcCall (BaseCallMessage *msg)
 Processes Remote-Procedure-Call invocation messages.
void handlePastryJoinCall (PastryJoinCall *call)
void handleRequestStateCall (RequestStateCall *call)
void handleRequestRepairCall (RequestRepairCall *call)
void handleRpcResponse (BaseResponseMessage *msg, cPolymorphic *context, int rpcId, simtime_t rtt)
 This method is called if an RPC response has been received.
void handlePastryJoinResponse (PastryJoinResponse *response)
void handleRequestStateResponse (RequestStateResponse *response)
void handleRequestRepairResponse (RequestRepairResponse *response)
void handleRequestLeafSetResponse (RequestLeafSetResponse *response)
void handleRequestRoutingRowResponse (RequestRoutingRowResponse *response)
void handleRpcTimeout (BaseCallMessage *call, const TransportAddress &dest, cPolymorphic *context, int rpcId, const OverlayKey &key)
 This method is called if an RPC timeout has been reached.
virtual void changeState (int toState)
 changes node state
virtual bool recursiveRoutingHook (const TransportAddress &dest, BaseRouteMessage *msg)
 Hook for forwarded message in recursive lookup mode.
void iterativeJoinHook (BaseOverlayMessage *msg, bool incrHopCount)
- Protected Member Functions inherited from BasePastry
void handleRequestLeafSetCall (RequestLeafSetCall *call)
void handleRequestRoutingRowCall (RequestRoutingRowCall *call)
PastryStateMessagecreateStateMessage (enum PastryStateMsgType type=PASTRY_STATE_STD, simtime_t timestamp=-1, int16_t row=-1, bool lastHop=false)
virtual void checkProxCache (void)=0
 checks whether proxCache is complete, takes appropriate actions depending on the protocol state
void baseInit (void)
 initializes parameters and variables used in both Bamboo and Pastry
void baseChangeState (int)
 changes node state, but leaves specific behavour, scheduling tasks in particular, to the inheriting protocols
OverlayKey distance (const OverlayKey &x, const OverlayKey &y, bool useAlternative=false) const
 This method should implement the distance between two keys.
void prePing (const PastryStateMessage *stateMsg)
 ping all nodes in a given state message.
void pingNodes (void)
 ping all nodes in the pastry state message pointed to by private member stateCache
void determineAliveTable (const PastryStateMessage *stateMsg)
 change the aliveTable to match the given stateMsg.
void newLeafs (void)
 Pastry API: send newLeafs() to application if enabled.
- Protected Member Functions inherited from BaseOverlay
int numInitStages () const
 Sets init stage.
void bindToPort (int port)
 Tells UDP we want to get all packets arriving on the given port.
virtual void route (const OverlayKey &key, CompType destComp, CompType srcComp, cPacket *msg, const std::vector< TransportAddress > &sourceRoute=TransportAddress::UNSPECIFIED_NODES, RoutingType routingType=DEFAULT_ROUTING)
 Routes message through overlay.
void callDeliver (BaseOverlayMessage *msg, const OverlayKey &destKey)
 Calls deliver function in application.
void callForward (const OverlayKey &key, BaseRouteMessage *msg, const NodeHandle &nextHopNode)
 Calls forward function in application.
void callUpdate (const NodeHandle &node, bool joined)
 Informs application about state changes of nodes or newly joined nodes.
void handleMessage (cMessage *msg)
 Checks for message type and calls corresponding method.
void handleBaseOverlayMessage (BaseOverlayMessage *msg, const OverlayKey &destKey=OverlayKey::UNSPECIFIED_KEY)
 Handles a BaseOverlayMessage

virtual void handleAppMessage (cMessage *msg)
 Processes "timer" self-messages.
virtual void receiveChangeNotification (int category, const cPolymorphic *details)
 callback-method for events at the NotificationBoard
virtual void handleTransportAddressChangedNotification ()
 This method gets call if the node has a new TransportAddress (IP address) because he changed his access network.
virtual void handleNodeLeaveNotification ()
 This method gets call **.gracefulLeaveDelay seconds before it is killed.
virtual void handleNodeGracefulLeaveNotification ()
 This method gets call **.gracefulLeaveDelay seconds before it is killed if this node is among the gracefulLeaveProbability nodes.
virtual void recordOverlaySentStats (BaseOverlayMessage *msg)
 Collect overlay specific sent messages statistics.
void setOverlayReady (bool ready)
 Sets the overlay ready icon and register/deregisters the node at the GlobalNodeList.
virtual void removeLookup (AbstractLookup *lookup)
 Removes the abstract lookup instance.
virtual void joinOverlay ()
 Join the overlay with a given nodeID in thisNode.key.
virtual void joinForeignPartition (const NodeHandle &node)
 Join another overlay partition with the given node as bootstrap node.
virtual bool handleFailedNode (const TransportAddress &failed)
 Handles a failed node.
virtual void lookupRpc (LookupCall *call)
virtual void nextHopRpc (NextHopCall *call)
void countFindNodeCall (const FindNodeCall *call)
void countFailedNodeCall (const FailedNodeCall *call)
bool internalHandleRpcCall (BaseCallMessage *msg)
 Handles internal rpc requests.
void internalHandleRpcResponse (BaseResponseMessage *msg, cPolymorphic *context, int rpcId, simtime_t rtt)
 Handles rpc responses internal in base classes

void internalHandleRpcTimeout (BaseCallMessage *msg, const TransportAddress &dest, cPolymorphic *context, int rpcId, const OverlayKey &destKey)
 Handles rpc timeouts internal in base classes

void internalSendRouteRpc (BaseRpcMessage *message, const OverlayKey &destKey, const std::vector< TransportAddress > &sourceRoute, RoutingType routingType)
CompType getThisCompType ()
 Return the component type of this module.
- Protected Member Functions inherited from BaseRpc
void initRpcs ()
 Initializes Remote-Procedure state.
void finishRpcs ()
 Deinitializes Remote-Procedure state.
virtual void internalHandleRpcMessage (BaseRpcMessage *msg)
 Handles incoming rpc messages and delegates them to the corresponding listeners or handlers.
uint32_t sendRouteRpcCall (CompType destComp, const TransportAddress &dest, const OverlayKey &destKey, BaseCallMessage *msg, cPolymorphic *context=NULL, RoutingType routingType=DEFAULT_ROUTING, simtime_t timeout=-1, int retries=0, int rpcId=-1, RpcListener *rpcListener=NULL)
 Routes a Remote-Procedure-Call message to an OverlayKey.
uint32_t sendRouteRpcCall (CompType destComp, const OverlayKey &destKey, BaseCallMessage *msg, cPolymorphic *context=NULL, RoutingType routingType=DEFAULT_ROUTING, simtime_t timeout=-1, int retries=0, int rpcId=-1, RpcListener *rpcListener=NULL)
 Routes a Remote-Procedure-Call message to an OverlayKey.
uint32_t sendRouteRpcCall (CompType destComp, const TransportAddress &dest, BaseCallMessage *msg, cPolymorphic *context=NULL, RoutingType routingType=DEFAULT_ROUTING, simtime_t timeout=-1, int retries=0, int rpcId=-1, RpcListener *rpcListener=NULL)
 Sends a Remote-Procedure-Call message using the overlay's UDP port
This replaces ROUTE_DIRECT calls!
uint32_t sendUdpRpcCall (const TransportAddress &dest, BaseCallMessage *msg, cPolymorphic *context=NULL, simtime_t timeout=-1, int retries=0, int rpcId=-1, RpcListener *rpcListener=NULL)
 Sends a Remote-Procedure-Call message to the underlay

uint32_t sendInternalRpcCall (CompType destComp, BaseCallMessage *msg, cPolymorphic *context=NULL, simtime_t timeout=-1, int retries=0, int rpcId=-1, RpcListener *rpcListener=NULL)
 Sends an internal Remote-Procedure-Call between two tiers

void cancelRpcMessage (uint32_t nonce)
 Cancels a Remote-Procedure-Call.
void cancelAllRpcs ()
 Cancels all RPCs.
void sendRpcResponse (TransportType transportType, CompType destComp, const TransportAddress &dest, const OverlayKey &destKey, BaseCallMessage *call, BaseResponseMessage *response)
 Send Remote-Procedure response message and deletes call message.
void sendRpcResponse (BaseCallMessage *call, BaseResponseMessage *response)
 Send Remote-Procedure response message to UDP and deletes call message.
int pingNode (const TransportAddress &dest, simtime_t timeout=-1, int retries=0, cPolymorphic *context=NULL, const char *caption="PING", RpcListener *rpcListener=NULL, int rpcId=-1, TransportType transportType=INVALID_TRANSPORT)
 ping a node by its TransportAddress
virtual void pingTimeout (PingCall *pingCall, const TransportAddress &dest, cPolymorphic *context, int rpcId)
bool internalHandleMessage (cMessage *msg)
- Protected Member Functions inherited from RpcListener
virtual void handleRpcResponse (BaseResponseMessage *msg, const RpcState &rpcState, simtime_t rtt)
 This method is called if an RPC response has been received.
virtual void handleRpcTimeout (const RpcState &rpcState)
 This method is called if an RPC timeout has been reached.
- Protected Member Functions inherited from BaseTcpSupport
void handleTCPMessage (cMessage *msg)
 Member function to handle incoming TCP messages.
void bindAndListenTcp (int port)
 Member function to bind service to the specified port and listen afterwards.
bool isAlreadyConnected (TransportAddress address)
 Member function to check if the service is already connected.
void establishTcpConnection (TransportAddress address)
 Member function to establish a connection to the specified node.
void sendTcpData (cPacket *msg, TransportAddress address)
 Member function to send TCP data to the specified node.
virtual void handleConnectionEvent (EvCode code, TransportAddress address)
 Member function to handle passive connection events.
virtual void handleDataReceived (TransportAddress address, cPacket *msg, bool urgent)
 Member function to handle incoming data.
virtual void handleIncomingConnection (TransportAddress address)
 Member function to handle newly opened connections.
void closeTcpConnection (TransportAddress address)
 Member function to close an established connection.
void setTcpOut (cGate *gate)
 Member function to set local gate towards the TCP module during init phase.
cGate * getTcpOut ()
 Member function to get local gate towards the TCP module.
- Protected Member Functions inherited from TopologyVis
void initVis (cModule *terminal)

Protected Attributes

std::vector< PastryStateMsgHandlestReceived
 State messages to process during join.
std::vector
< PastryStateMsgHandle >
::iterator 
stReceivedPos
std::vector< TransportAddressnotifyList
 List of nodes to notify after join.
std::vector< PastrySendState * > sendStateWait
- Protected Attributes inherited from BasePastry
uint32_t bitsPerDigit
uint32_t numberOfLeaves
uint32_t numberOfNeighbors
double readyWaitAmount
double joinTimeoutAmount
double repairTimeout
bool enableNewLeafs
bool useRegularNextHop
bool alwaysSendUpdate
bool optimizeLookup
bool proximityNeighborSelection
simtime_t nearNodeRtt
bool nearNodeImproved
bool periodicMaintenance
TransportAddressleaf2ask
TransportAddress bootstrapNode
TransportAddress nearNode
simtime_t lastStateChange
PastryStateMsgHandle stateCache
 Handle for processing a single state message.
std::queue< PastryStateMsgHandlestateCacheQueue
 Queue of state messages waiting to be processed in READY state.
PastryStateMsgProximity aliveTable
 Early update of leaf set: helper structure for marking known-dead nodes.
uint32_t joinHopCount
cMessage * readyWait
cMessage * joinUpdateWait
PastryRoutingTableroutingTable
PastryLeafSetleafSet
PastryNeighborhoodSetneighborhoodSet
- Protected Attributes inherited from BaseOverlay
int numAppDataForwarded
 number of forwarded app data packets
int bytesAppDataForwarded
 number of forwarded app data bytes at out-gate
int numAppLookupForwarded
 number of forwarded app lookup packets
int bytesAppLookupForwarded
 number of forwarded app lookup bytes at out-gate
int numMaintenanceForwarded
 number of forwarded maintenance packets
int bytesMaintenanceForwarded
 number of forwarded maintenance bytes at out-gate
int numFindNodeSent
int bytesFindNodeSent
int numFindNodeResponseSent
int bytesFindNodeResponseSent
int numFailedNodeSent
int bytesFailedNodeSent
int numFailedNodeResponseSent
int bytesFailedNodeResponseSent
std::vector< HopDelayRecord * > singleHopDelays
simtime_t creationTime
 simtime when the node has been created
GlobalNodeListglobalNodeList
 pointer to GlobalNodeList in this node
NotificationBoard * notificationBoard
 pointer to NotificationBoard in this node
UnderlayConfiguratorunderlayConfigurator
 pointer to UnderlayConfigurator in this node
BootstrapListbootstrapList
 pointer to the BootstrapList module
GlobalParametersglobalParameters
 pointer to the GlobalParameters module
uint32_t overlayId
 identifies the overlay this node belongs to (used for multiple overlays)
bool debugOutput
 debug output ?
RoutingType defaultRoutingType
bool useCommonAPIforward
 forward messages to applications?
bool collectPerHopDelay
 collect delay for single hops
bool routeMsgAcks
 send ACK when receiving route message
uint32_t recNumRedundantNodes
 numRedundantNodes for recursive routing
bool recordRoute
 record visited hops on route
bool drawOverlayTopology
bool rejoinOnFailure
bool sendRpcResponseToLastHop
 needed by KBR protocols for NAT support
bool dropFindNodeAttack
 if node is malicious, it tries a findNode attack
bool isSiblingAttack
 if node is malicious, it tries a isSibling attack
bool invalidNodesAttack
 if node is malicious, it tries a invalidNode attack
bool dropRouteMessageAttack
 if node is malicious, it drops all received BaseRouteMessages
int localPort
 used UDP-port
int hopCountMax
 maximum hop count
bool measureAuthBlock
 if true, measure the overhead of signatures in rpc messages
bool restoreContext
 if true, a node rejoins with its old nodeId and malicious state
int numDropped
 number of dropped packets
int bytesDropped
 number of dropped bytes
cOutVector delayVector
 statistical output vector for packet-delays
cOutVector hopCountVector
 statistical output vector for hop-counts
States state
IterativeLookupConfiguration iterativeLookupConfig
RecursiveLookupConfiguration recursiveLookupConfig
LookupSet lookups
bool kbr
 set this to true, if the overlay provides KBR services
- Protected Attributes inherited from BaseRpc
NodeHandle thisNode
 NodeHandle to this node.
BaseOverlayoverlay
bool debugOutput
 debug output ?
GlobalStatisticsglobalStatistics
 pointer to GlobalStatistics module in this node
CompType thisCompType
NeighborCacheneighborCache
 pointer to the neighbor cache
CryptoModulecryptoModule
 pointer to CryptoModule
int numPingSent
int bytesPingSent
int numPingResponseSent
int bytesPingResponseSent
- Protected Attributes inherited from TopologyVis
cModule * thisTerminal
GlobalNodeListglobalNodeList
 pointer to corresponding node

Private Member Functions

void clearVectors ()
void doSecondStage (void)
 do the second stage of initialization as described in the paper
void doRoutingTableMaintenance ()
 periodic routing table maintenance requests the corresponding routing table row from one node in each row
bool handleFailedNode (const TransportAddress &failed)
 notifies leafset and routingtable of a failed node and sends out a repair request if possible
void checkProxCache (void)
 checks whether proxCache is complete, takes appropriate actions depending on the protocol state
void processState (void)
bool mergeState (void)
void endProcessingState (void)
void doJoinUpdate (void)
 send updated state to all nodes when entering ready state
virtual void joinOverlay ()
void sendStateDelayed (const TransportAddress &destination)
 send a standard state message with a small delay

Private Attributes

simtime_t secondStageInterval
simtime_t routingTableMaintenanceInterval
simtime_t discoveryTimeoutAmount
bool partialJoinPath
uint16_t discoveryModeProbedNodes
int depth
int updateCounter
bool minimalJoinState
bool useDiscovery
bool useSecondStage
bool useRoutingTableMaintenance
bool sendStateAtLeafsetRepair
bool pingBeforeSecondStage
bool overrideOldPastry
bool overrideNewPastry
cMessage * secondStageWait
cMessage * ringCheck
cMessage * discoveryTimeout
cMessage * repairTaskTimeout

Additional Inherited Members

- Public Types inherited from BaseOverlay
enum  States {
  INIT = 0, BOOTSTRAP = 1, DISCOVERY = 2, PREJOIN = 3,
  JOIN = 4, POSTJOIN = 5, READY = 6, REFRESH = 7,
  SHUTDOWN = 8, FAILED = 9, RSET = JOIN, BSET = POSTJOIN
}
- Public Attributes inherited from BasePastry
int joins
int joinTries
int joinPartial
int joinSeen
int joinBytesSeen
int joinReceived
int joinBytesReceived
int joinSent
int joinBytesSent
int stateSent
int stateBytesSent
int stateReceived
int stateBytesReceived
int repairReqSent
int repairReqBytesSent
int repairReqReceived
int repairReqBytesReceived
int stateReqSent
int stateReqBytesSent
int stateReqReceived
int stateReqBytesReceived
int totalLookups
int responsibleLookups
int routingTableLookups
int closerNodeLookups
int closerNodeLookupsFromNeighborhood
int leafsetReqSent
int leafsetReqBytesSent
int leafsetReqReceived
int leafsetReqBytesReceived
int leafsetSent
int leafsetBytesSent
int leafsetReceived
int leafsetBytesReceived
int routingTableRowReqSent
int routingTableRowReqBytesSent
int routingTableRowReqReceived
int routingTableRowReqBytesReceived
int routingTableRowSent
int routingTableRowBytesSent
int routingTableRowReceived
int routingTableRowBytesReceived
- Protected Types inherited from BasePastry
enum  StateObject { ROUTINGTABLE, LEAFSET, NEIGHBORHOODSET }
enum  { PING_RECEIVED_STATE = 1, PING_NEXT_HOP = 2, PING_SINGLE_NODE = 3, PING_DISCOVERY = 4 }

Detailed Description

Pastry overlay module.

Author
Felix Palmen
See Also
BaseOverlay

Definition at line 53 of file Pastry.h.

Constructor & Destructor Documentation

Pastry::~Pastry ( )
virtual

Definition at line 39 of file Pastry.cc.

{
// destroy self timer messages
cancelAndDelete(readyWait);
cancelAndDelete(joinUpdateWait);
cancelAndDelete(secondStageWait);
if (useDiscovery) cancelAndDelete(discoveryTimeout);
}

Member Function Documentation

void Pastry::changeState ( int  toState)
protectedvirtual

changes node state

Parameters
toStatestate to change to

Reimplemented from BasePastry.

Definition at line 161 of file Pastry.cc.

Referenced by checkProxCache(), handleStateMessage(), handleTimerEvent(), and joinOverlay().

{
if (readyWait->isScheduled()) cancelEvent(readyWait);
baseChangeState(toState);
switch (toState) {
case INIT:
break;
case DISCOVERY: {
nearNodeRtt = MAXTIME;
NULL, "PING bootstrapNode in discovery mode",
NULL, PING_DISCOVERY, UDP_TRANSPORT); //TODO
new RequestLeafSetCall("REQUEST LEAFSET Call");
call->setBitLength(PASTRYREQUESTLEAFSETCALL_L(call));
leafsetReqBytesSent += call->getByteLength());
depth = -1;
}
break;
case JOIN: {
PastryJoinCall* call = new PastryJoinCall("JOIN Call");
call->setBitLength(PASTRYJOINCALL_L(msg));
RECORD_STATS(joinSent++; joinBytesSent += call->getByteLength());
}
break;
case READY:
// determine list of all known nodes as notifyList
notifyList.clear();
sort(notifyList.begin(), notifyList.end());
notifyList.erase(unique(notifyList.begin(), notifyList.end()),
notifyList.end());
// schedule update
cancelEvent(joinUpdateWait);
scheduleAt(simTime() + 0.0001, joinUpdateWait);
// schedule second stage
cancelEvent(secondStageWait);
scheduleAt(simTime() + secondStageInterval, secondStageWait);
}
// schedule routing table maintenance task
cancelEvent(repairTaskTimeout);
}
break;
}
}
void Pastry::checkProxCache ( void  )
private

checks whether proxCache is complete, takes appropriate actions depending on the protocol state

Definition at line 939 of file Pastry.cc.

{
EV << "[Pastry::checkProxCache() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]"
<< endl;
// no cached STATE message?
if (!stateCache.msg) {
return;
}
// no entries in stateCache.prox?
if (stateCache.prox->pr_rt.empty() &&
stateCache.prox->pr_ls.empty() &&
stateCache.prox->pr_ns.empty())
throw cRuntimeError("ERROR in Pastry: stateCache.prox empty!");
// some entries not yet determined?
if ((find(stateCache.prox->pr_rt.begin(), stateCache.prox->pr_rt.end(),
(find(stateCache.prox->pr_ls.begin(), stateCache.prox->pr_ls.end(),
(find(stateCache.prox->pr_ns.begin(), stateCache.prox->pr_ns.end(),
return;
}
EV << "[Pastry::checkProxCache() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " all proximities for current STATE message from "
<< " collected!"
<< endl;
simtime_t now = simTime();
if (state == JOIN) {
// save pointer to proximity vectors (it is NULL until now):
// collected proximities for all STATE messages?
if (++stReceivedPos == stReceived.end()) {
EV << "[Pastry::checkProxCache() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " proximities for all STATE messages collected!"
<< endl;
stateCache.msg = NULL;
stateCache.prox = NULL;
if (debugOutput) {
EV << "[Pastry::checkProxCache() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " [JOIN] starting to build own state from "
<< stReceived.size() << " received state messages..."
<< endl;
}
if (mergeState()) {
EV << "[Pastry::checkProxCache() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " changeState(READY) called"
<< endl;
} else {
EV << "[Pastry::checkProxCache() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Error initializing while joining! Restarting ..."
<< endl;
}
} else {
EV << "[Pastry::checkProxCache() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " NOT all proximities for all STATE messages collected!"
<< endl;
// process next state message in vector:
if (stReceivedPos->msg == NULL)
throw cRuntimeError("stReceivedPos->msg = NULL");
if (stateCache.msg == NULL)
throw cRuntimeError("msg = NULL");
}
} else {
// state == READY
// try to repair routingtable based on repair message:
const TransportAddress& askRt =
if (! askRt.isUnspecified()) {
new RequestRepairCall("REQUEST REPAIR Call");
call->setBitLength(PASTRYREQUESTREPAIRCALL_L(call));
RECORD_STATS(repairReqSent++; repairReqBytesSent += call->getByteLength());
sendUdpRpcCall(askRt, call);
}
// while not really known, it's safe to assume that a repair
// message changed our state:
} else {
// send another STATE message on outdated state update:
} else {
// merge info in own state tables
// except leafset (was already handled in handleStateMessage)
EV << "[Pastry::checkProxCache() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Merging nodes into routing table"
<< endl;
EV << "[Pastry::checkProxCache() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Merged nodes into routing table"
<< endl;
}
}
}
}
}
void Pastry::clearVectors ( )
private

Definition at line 52 of file Pastry.cc.

Referenced by purgeVectors(), and ~Pastry().

{
// purge pending state messages
if (!stReceived.empty()) {
for (std::vector<PastryStateMsgHandle>::iterator it =
stReceived.begin(); it != stReceived.end(); it++) {
// check whether one of the pointers is a duplicate of stateCache
if (it->msg == stateCache.msg) stateCache.msg = NULL;
if (it->prox == stateCache.prox) stateCache.prox = NULL;
delete it->msg;
delete it->prox;
}
stReceived.clear();
}
// purge notify list:
notifyList.clear();
}
void Pastry::doJoinUpdate ( void  )
private

send updated state to all nodes when entering ready state

Definition at line 761 of file Pastry.cc.

Referenced by handleTimerEvent().

{
// send "update" state message to all nodes who sent us their state
// during INIT, remove these from notifyList so they don't get our
// state twice
std::vector<TransportAddress>::iterator nListPos;
if (!stReceived.empty()) {
for (std::vector<PastryStateMsgHandle>::iterator it =
stReceived.begin(); it != stReceived.end(); ++it) {
PastryStateMessage* stateMsg =
it->msg->getTimestamp());
stateBytesSent += stateMsg->getByteLength());
sendMessageToUDP(it->msg->getSender(), stateMsg);
nListPos = find(notifyList.begin(), notifyList.end(),
it->msg->getSender());
if (nListPos != notifyList.end()) {
notifyList.erase(nListPos);
}
delete it->msg;
delete it->prox;
}
stReceived.clear();
}
// send a normal STATE message to all remaining known nodes
for (std::vector<TransportAddress>::iterator it =
notifyList.begin(); it != notifyList.end(); it++) {
if (*it != thisNode) {
PastryStateMessage* stateMsg =
stateBytesSent += stateMsg->getByteLength());
sendMessageToUDP(*it, stateMsg);
}
}
notifyList.clear();
}
void Pastry::doRoutingTableMaintenance ( )
private

periodic routing table maintenance requests the corresponding routing table row from one node in each row

Definition at line 859 of file Pastry.cc.

Referenced by handleTimerEvent().

{
for (int i = 0; i < routingTable->getLastRow(); i++) {
assert(!dynamic_cast<const NodeHandle&>(ask4row).getKey().isUnspecified());
if ((!ask4row.isUnspecified()) && (ask4row != thisNode)) {
new RequestRoutingRowCall("REQUEST ROUTING ROW Call");
call->setRow(i + 1);
call->setBitLength(PASTRYREQUESTROUTINGROWCALL_L(call));
routingTableRowReqBytesSent += call->getByteLength());
sendUdpRpcCall(ask4row, call);
} else {
EV << "[Pastry::doRoutingTableMaintenance() @ "
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " could not send Message to Node in Row" << i
<< endl;
}
}
}
void Pastry::doSecondStage ( void  )
private

do the second stage of initialization as described in the paper

Definition at line 804 of file Pastry.cc.

Referenced by handleTimerEvent().

{
getParentModule()->getParentModule()->bubble("entering SECOND STAGE");
// probe nodes in local state
if (leafSet->isValid()) {
PastryStateMsgHandle handle(stateMsg);
if (!stateCache.msg) {
stateCache = handle;
} else {
stateCacheQueue.push(handle);
if (stateCacheQueue.size() > 15) {
delete stateCacheQueue.front().msg;
EV << "[Pastry::doSecondStage() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " stateCacheQueue full -> pop()" << endl;
}
prePing(stateMsg);
}
}
}
// "second stage" for locality:
notifyList.clear();
sort(notifyList.begin(), notifyList.end());
notifyList.erase(unique(notifyList.begin(), notifyList.end()),
notifyList.end());
for (std::vector<TransportAddress>::iterator it = notifyList.begin();
it != notifyList.end(); it++) {
if (*it == thisNode) continue;
EV << "[Pastry::doSecondStage() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " second stage: requesting state from " << *it
<< endl;
new RequestStateCall("REQUEST STATE Call");
call->setBitLength(PASTRYREQUESTREPAIRCALL_L(call));
stateReqBytesSent += call->getByteLength());
sendUdpRpcCall(*it, call);
}
notifyList.clear();
}
void Pastry::endProcessingState ( void  )
private

Definition at line 1070 of file Pastry.cc.

Referenced by checkProxCache(), and processState().

{
// if state message was not an update, send one back:
if (stateCache.msg &&
(alwaysSendUpdate || lastStateChange == simTime()) &&
PastryStateMessage* stateMsg =
stateBytesSent += stateMsg->getByteLength());
}
delete stateCache.msg;
stateCache.msg = NULL;
delete stateCache.prox;
stateCache.prox = NULL;
// process next queued message:
if (! stateCacheQueue.empty()) {
}
}
bool Pastry::handleFailedNode ( const TransportAddress failed)
private

notifies leafset and routingtable of a failed node and sends out a repair request if possible

Parameters
failedthe failed node
Returns
true as long as local state is READY (signals lookup to try again)

Definition at line 886 of file Pastry.cc.

Referenced by handlePastryJoinCall(), and recursiveRoutingHook().

{
if (state != READY) return false;
bool wasValid = leafSet->isValid();
if (failed.isUnspecified())
opp_error("Pastry::handleFailedNode(): failed is unspecified!");
const TransportAddress& lsAsk = leafSet->failedNode(failed);
const TransportAddress& rtAsk = routingTable->failedNode(failed);
if (!lsAsk.isUnspecified()) {
new RequestRepairCall("REQUEST REPAIR Call");
call->setBitLength(PASTRYREQUESTREPAIRCALL_L(call));
repairReqBytesSent += call->getByteLength());
sendUdpRpcCall(lsAsk, call);
} else {
new RequestLeafSetCall("REQUEST LEAFSET Call");
call->setBitLength(PASTRYREQUESTLEAFSETCALL_L(call));
leafsetReqBytesSent += call->getByteLength());
sendUdpRpcCall(lsAsk, call);
}
}
if (!rtAsk.isUnspecified() && (lsAsk.isUnspecified() || lsAsk != rtAsk)) {
new RequestRepairCall("REQUEST REPAIR Call");
call->setBitLength(PASTRYREQUESTREPAIRCALL_L(call));
RECORD_STATS(repairReqSent++; repairReqBytesSent += call->getByteLength());
sendUdpRpcCall(rtAsk, call);
}
if (wasValid && lsAsk.isUnspecified() && (! leafSet->isValid())) {
EV << "[Pastry::handleFailedNode() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " lost connection to the network, trying to re-join."
<< endl;
join();
return false;
}
return true;
}
void Pastry::handlePastryJoinCall ( PastryJoinCall call)
protected

Definition at line 382 of file Pastry.cc.

Referenced by handleRpcCall().

{
EV << "[Pastry::handlePastryJoinCall() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]"
<< endl;
joinBytesReceived += call->getByteLength());
if (state != READY) {
if (call->getSrcNode() == thisNode) {
EV << "[Pastry::handlePastryJoinCall() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " PastryJoinCall received by originator!"
<< endl;
} else {
EV << "[Pastry::handlePastryJoinCall() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " received join message before reaching "
<< "READY state, dropping message!"
<< endl;
}
} else if (call->getSrcNode() == thisNode) {
EV << "[Pastry::handlePastryJoinCall() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " PastryJoinCall gets dropped because it is "
<< "outdated and has been received by originator!"
<< endl;
} else {
OverlayCtrlInfo* overlayCtrlInfo =
check_and_cast<OverlayCtrlInfo*>(call->getControlInfo());
uint32_t joinHopCount = overlayCtrlInfo->getHopCount();
if ((joinHopCount > 1) &&
joinHopCount--;
// remove node from state if it is rejoining
PastryJoinResponse* response = new PastryJoinResponse("JOIN Response");
// create new state msg and set special fields for some types:
response->setStatType(MAINTENANCE_STAT);
response->setTimestamp(simTime());
response->setBitLength(PASTRYJOINRESPONSE_L(response));
response->encapsulate(createStateMessage((minimalJoinState ?
-1, joinHopCount, true));
// send...
stateBytesSent += response->getByteLength());
sendRpcResponse(call, response);
}
}
void Pastry::handlePastryJoinResponse ( PastryJoinResponse response)
protected

Definition at line 594 of file Pastry.cc.

Referenced by handleRpcResponse().

{
EV << "[Pastry::handlePastryJoinResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]"
<< endl;
stateBytesReceived += response->getByteLength());
if (state == JOIN) {
handleStateMessage(check_and_cast<PastryStateMessage*>(response->decapsulate()));
}
}
void Pastry::handleRequestLeafSetResponse ( RequestLeafSetResponse response)
protectedvirtual

Reimplemented from BasePastry.

Definition at line 623 of file Pastry.cc.

Referenced by handleRpcResponse().

{
EV << "[Pastry::handleRequestLeafSetResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]"
<< endl;
if (state == DISCOVERY) {
const NodeHandle* node;
check_and_cast<PastryStateMessage*>(response->getEncapsulatedPacket());
for (uint32_t i = 0; i < leaves->getLeafSetArraySize(); ++i) {
node = &(leaves->getLeafSet(i));
// unspecified nodes not considered
if ( !(node->isUnspecified()) ) {
NULL, "PING received leaves for nearest node",
NULL, -1, UDP_TRANSPORT); //TODO
}
}
EV << " received leafset, waiting for pings"
<< endl;
if (discoveryTimeout->isScheduled()) cancelEvent(discoveryTimeout);
scheduleAt(simTime() + discoveryTimeoutAmount, discoveryTimeout);
}
}
void Pastry::handleRequestRepairCall ( RequestRepairCall call)
protected

Definition at line 474 of file Pastry.cc.

Referenced by handleRpcCall().

{
EV << "[Pastry::handleRequestRepairCall() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]"
<< endl;
repairReqBytesReceived += call->getByteLength());
if (state != READY) {
EV << " received repair request before reaching"
<< " READY state, dropping message!"
<< endl;
delete call;
return;
}
new RequestRepairResponse("REQUEST REPAIR Response");
response->setBitLength(PASTRYREQUESTREPAIRRESPONSE_L(response));
response->encapsulate(createStateMessage(PASTRY_STATE_REPAIR));
stateBytesSent += response->getByteLength());
sendRpcResponse(call, response);
}
void Pastry::handleRequestRepairResponse ( RequestRepairResponse response)
protected

Definition at line 504 of file Pastry.cc.

Referenced by handleRpcResponse().

{
EV << "[Pastry::handleRequestRepairResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]"
<< endl;
stateBytesReceived += response->getByteLength());
if (state == READY) {
handleStateMessage(check_and_cast<PastryStateMessage*>(response->decapsulate()));
}
}
void Pastry::handleRequestRoutingRowResponse ( RequestRoutingRowResponse response)
protectedvirtual

Reimplemented from BasePastry.

Definition at line 654 of file Pastry.cc.

Referenced by handleRpcResponse().

{
EV << "[Pastry::handleRequestRoutingRowResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]"
<< endl;
if (state == DISCOVERY) {
PastryStateMessage* rowState =
check_and_cast<PastryStateMessage*>(response->getEncapsulatedPacket());
uint32_t nodesPerRow = rowState->getRoutingTableArraySize();
const NodeHandle* node;
if (depth == -1) {
depth = rowState->getRow();
}
if (depth > 0) {
for (uint32_t i = 0; i < nodesPerRow; i++) {
node = &(rowState->getRoutingTable(i));
// unspecified nodes not considered
if ( !(node->isUnspecified()) ) {
// we look for best connection here,
// so Timeout is short and there are no retries
"PING received routing table for nearest node",
NULL, -1, UDP_TRANSPORT); //TODO
}
}
depth--;
}
EV << " received routing table, waiting for pings"
<< endl;
if (discoveryTimeout->isScheduled()) cancelEvent(discoveryTimeout);
scheduleAt(simTime() + discoveryTimeoutAmount, discoveryTimeout);
}
}
void Pastry::handleRequestStateCall ( RequestStateCall call)
protected

Definition at line 444 of file Pastry.cc.

Referenced by handleRpcCall().

{
EV << "[Pastry::handleRequestStateCall() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]"
<< endl;
stateReqBytesReceived += call->getByteLength());
if (state != READY) {
EV << " received repair request before reaching"
<< " READY state, dropping message!"
<< endl;
delete call;
return;
}
new RequestStateResponse("REQUEST STATE Response");
response->setBitLength(PASTRYREQUESTSTATERESPONSE_L(response));
response->encapsulate(createStateMessage());
stateBytesSent += response->getByteLength());
sendRpcResponse(call, response);
}
void Pastry::handleRequestStateResponse ( RequestStateResponse response)
protected

Definition at line 608 of file Pastry.cc.

Referenced by handleRpcResponse().

{
EV << "[Pastry::handleRequestStateResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]"
<< endl;
stateBytesReceived += response->getByteLength());
if (state == READY) {
handleStateMessage(check_and_cast<PastryStateMessage*>(response->decapsulate()));
}
}
bool Pastry::handleRpcCall ( BaseCallMessage msg)
protectedvirtual

Processes Remote-Procedure-Call invocation messages.


This method should be overloaded when the overlay provides RPC functionality.

Returns
true, if rpc has been handled

Reimplemented from BasePastry.

Definition at line 358 of file Pastry.cc.

{
if (BasePastry::handleRpcCall(msg)) return true;
if (state != READY) {
EV << "[Pastry::handleRpcCall() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Received RPC call and state != READY"
<< endl;
return false;
}
// delegate messages
// RPC_DELEGATE( <messageName>[Call|Response], <methodToCall> )
return RPC_HANDLED;
}
void Pastry::handleRpcResponse ( BaseResponseMessage msg,
cPolymorphic *  context,
int  rpcId,
simtime_t  rtt 
)
protectedvirtual

This method is called if an RPC response has been received.

Parameters
msgThe response message.
contextPointer to an optional state object. The object has to be handled/deleted by the handleRpcResponse() code
rpcIdThe RPC id.
rttThe Round-Trip-Time of this RPC

Reimplemented from BasePastry.

Definition at line 519 of file Pastry.cc.

{
BasePastry::handleRpcResponse(msg, context, rpcId, rtt);
RPC_ON_RESPONSE( PastryJoin ) {
EV << "[Pastry::handleRpcResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Received a JOIN RPC Response: id=" << rpcId << "\n"
<< " msg=" << *_PastryJoinResponse << " rtt=" << SIMTIME_DBL(rtt)
<< endl;
handlePastryJoinResponse(_PastryJoinResponse);
break;
}
RPC_ON_RESPONSE( RequestState ) {
EV << "[Pastry::handleRpcResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Received a RequestState RPC Response: id=" << rpcId << "\n"
<< " msg=" << *_RequestStateResponse << " rtt=" << SIMTIME_DBL(rtt)
<< endl;
handleRequestStateResponse(_RequestStateResponse);
break;
}
RPC_ON_RESPONSE( RequestRepair ) {
EV << "[BasePastry::handleRpcResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Received a Request Repair RPC Response: id=" << rpcId << "\n"
<< " msg=" << *_RequestRepairResponse << " rtt=" << SIMTIME_DBL(rtt)
<< endl;
handleRequestRepairResponse(_RequestRepairResponse);
break;
}
RPC_ON_RESPONSE( RequestLeafSet ) {
EV << "[Pastry::handleRpcResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Received a RequestLeafSet RPC Response: id=" << rpcId << "\n"
<< " msg=" << *_RequestLeafSetResponse << " rtt=" << SIMTIME_DBL(rtt)
<< endl;
handleRequestLeafSetResponse(_RequestLeafSetResponse);
break;
}
RPC_ON_RESPONSE( RequestRoutingRow ) {
EV << "[Pastry::handleRpcResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Received a RequestRoutingRow RPC Response: id=" << rpcId << "\n"
<< " msg=" << *_RequestRoutingRowResponse << " rtt=" << rtt
<< endl;
handleRequestRoutingRowResponse(_RequestRoutingRowResponse);
break;
}
}
void Pastry::handleRpcTimeout ( BaseCallMessage msg,
const TransportAddress dest,
cPolymorphic *  context,
int  rpcId,
const OverlayKey destKey 
)
protectedvirtual

This method is called if an RPC timeout has been reached.

Parameters
msgThe original RPC message.
destThe destination node
contextPointer to an optional state object. The object has to be handled/deleted by the handleRpcResponse() code
rpcIdThe RPC id.
destKeythe destination OverlayKey

Reimplemented from BasePastry.

Definition at line 575 of file Pastry.cc.

{
BasePastry::handleRpcTimeout(call, dest, context, rpcId, key);
EV << "[Pastry::handleRpcTimeout() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Timeout of RPC Call: id=" << rpcId << "\n"
<< " msg=" << *call << " key=" << key
<< endl;
if (state == DISCOVERY && dynamic_cast<RequestLeafSetCall*>(call)) {
join();
}
}
void Pastry::handleStateMessage ( PastryStateMessage msg)
virtual

processes state messages, merging with own state tables

Parameters
msgthe pastry state message

Implements BasePastry.

Definition at line 1193 of file Pastry.cc.

Referenced by handlePastryJoinResponse(), handleRequestRepairResponse(), handleRequestStateResponse(), and handleUDPMessage().

{
if (debugOutput) {
EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " new STATE message to process "
<< static_cast<void*>(msg) << " in state " <<
((state == READY)?"READY":((state == JOIN)?"JOIN":"INIT"))
<< endl;
if (state == JOIN) {
EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " *** own joinHopCount: " << joinHopCount << endl
<< " *** already received: " << stReceived.size() << endl
<< " *** last-hop flag: "
<< (msg->getLastHop() ? "true" : "false") << endl
<< " *** msg joinHopCount: "
<< msg->getRow() << endl;
}
}
if (state == INIT || state == DISCOVERY) {
EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " can't handle state messages until at least reaching JOIN state."
<< endl;
delete msg;
return;
}
PastryStateMsgHandle handle(msg);
// in JOIN state, store all received state Messages, need them later:
if (state == JOIN) {
if (!(msg->getPastryStateMsgType() &
delete msg;
return;
}
if (joinHopCount && stReceived.size() == joinHopCount) {
EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Warning: dropping state message received after "
<< "all needed state messages were collected in JOIN state."
<< endl;
delete msg;
return;
}
stReceived.push_back(handle);
if (msg->getLastHop()) {
if (joinHopCount) {
EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Error: received a second `last' state message! Restarting ..."
<< endl;
return;
}
joinHopCount = msg->getRow();
if (stReceived.size() < joinHopCount) {
// some states still missing:
cancelEvent(readyWait);
scheduleAt(simTime() + readyWaitAmount, readyWait);
}
}
if (joinHopCount) {
if (stReceived.size() > joinHopCount) {
EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Error: too many state messages received in JOIN state! ("
<< stReceived.size() << " > " << joinHopCount << ") Restarting ..."
<< endl;
return;
}
if (stReceived.size() == joinHopCount) {
// all state messages are here, sort by hopcount:
sort(stReceived.begin(), stReceived.end(),
// start pinging the nodes found in the first state message:
EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " have all STATE messages, now pinging nodes."
<< endl;
} else {
stateCache.msg = NULL;
EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " changeState(READY) called"
<< endl;
}
// cancel timeout:
if (readyWait->isScheduled()) cancelEvent(readyWait);
} else {
//TODO occasionally, here we got a wrong hop count in
// iterative mode due to more than one it. lookup during join
// procedure
EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Still need some STATE messages."
<< endl;
}
}
return;
}
if (debugOutput) {
EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " handling STATE message"
<< endl;
EV << " type: " << ((msg->getPastryStateMsgType()
== PASTRY_STATE_UPDATE) ? "update"
:"standard")
<< endl;
EV << " msg timestamp: " <<
msg->getTimestamp() << endl;
EV << " last state change: " <<
lastStateChange << endl;
}
}
&& (msg->getTimestamp() <= lastStateChange)) {
// if we received an update based on our outdated state,
// mark handle for retrying later:
EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " outdated state from " << msg->getSender()
<< endl;
handle.outdatedUpdate = true;
}
// determine aliveTable to prevent leafSet from merging nodes that are
// known to be dead:
// try to repair leafset based on repair message right now
const TransportAddress& askLs = leafSet->repair(msg, &aliveTable);
if (! askLs.isUnspecified()) {
//sendRequest(askLs, PASTRY_REQ_REPAIR);
new RequestRepairCall("REQUEST REPAIR Call");
call->setBitLength(PASTRYREQUESTREPAIRCALL_L(call));
RECORD_STATS(repairReqSent++; repairReqBytesSent += call->getByteLength());
sendUdpRpcCall(askLs, call);
}
// while not really known, it's safe to assume that a repair
// message changed our state:
lastStateChange = simTime();
} else if (leafSet->mergeState(msg, &aliveTable)) {
// merged state into leafset right now
lastStateChange = simTime();
}
// in READY state, only ping nodes to get proximity metric:
if (!stateCache.msg) {
// no state message is processed right now, start immediately:
assert(stateCache.prox == NULL);
stateCache = handle;
} else {
// enqueue message for later processing:
stateCacheQueue.push(handle);
if (stateCacheQueue.size() > 15) {
delete stateCacheQueue.front().msg;
EV << "[Pastry::handleStateMessage() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " stateCacheQueue full -> pop()" << endl;
}
prePing(msg);
} else {
bool temp = true;
if (!neighborhoodSet->mergeState(msg, NULL)) {
temp = false;
}
if (!leafSet->mergeState(msg, NULL)) {
temp = false;
} else {
}
if (!routingTable->mergeState(msg, NULL)) {
temp = false;
}
if (temp) lastStateChange = simTime();
delete msg;
}
}
}
void Pastry::handleTimerEvent ( cMessage *  msg)
virtual

Reimplemented from BaseRpc.

Definition at line 252 of file Pastry.cc.

{
if (msg == readyWait) {
sort(stReceived.begin(), stReceived.end(), stateMsgIsSmaller);
// start pinging the nodes found in the first state message:
EV << "[Pastry::handleTimerEvent() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " joining despite some missing STATE messages."
<< endl;
} else {
EV << "[Pastry::handleTimerEvent() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " timeout waiting for missing state messages"
<< " in JOIN state, restarting..."
<< endl;
join();
}
} else if (msg == joinUpdateWait) {
EV << "[Pastry::handleTimerEvent() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " sending state updates to all nodes."
<< endl;
} else if (msg == secondStageWait) {
EV << "[Pastry::handleTimerEvent() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " sending STATE requests to all nodes in"
<< " second stage of initialization."
<< endl;
} else if (msg == discoveryTimeout) {
if ((depth == 0) && (nearNodeImproved)) {
depth++; //repeat last step if closer node was found
}
if ((depth == 0) || (discoveryModeProbedNodes < 1)) {
} else {
new RequestRoutingRowCall("REQUEST ROUTING ROW Call");
call->setRow(depth);
call->setBitLength(PASTRYREQUESTROUTINGROWCALL_L(call));
routingTableRowReqBytesSent += call->getByteLength());
}
} else if (msg == repairTaskTimeout) {
EV << "[Pastry::handleTimerEvent() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " starting routing table maintenance"
<< endl;
scheduleAt(simTime() + routingTableMaintenanceInterval,
} else if (dynamic_cast<PastrySendState*>(msg)) {
PastrySendState* sendStateMsg = static_cast<PastrySendState*>(msg);
std::vector<PastrySendState*>::iterator pos =
std::find(sendStateWait.begin(), sendStateWait.end(),
sendStateMsg);
if (pos != sendStateWait.end()) sendStateWait.erase(pos);
stateBytesSent += stateMsg->getByteLength());
sendMessageToUDP(sendStateMsg->getDest(), stateMsg);
delete sendStateMsg;
}
}
void Pastry::handleUDPMessage ( BaseOverlayMessage msg)
virtual

Processes messages from underlay.

Parameters
msgMessage from UDP

Reimplemented from BaseOverlay.

Definition at line 339 of file Pastry.cc.

{
PastryStateMessage* stateMsg = check_and_cast<PastryStateMessage*>(msg);
uint32_t type = stateMsg->getPastryStateMsgType();
if (debugOutput) {
EV << "[Pastry::handleUDPMessage() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " incoming STATE message of type "
<< cEnum::get("PastryStateMsgType")->getStringFor(type) << endl;
}
stateMsg->getByteLength());
handleStateMessage(stateMsg);
}
void Pastry::initializeOverlay ( int  stage)
virtual

Initializes derived-class-attributes.


Initializes derived-class-attributes, called by BaseOverlay::initialize(). By default this method is called once. If more stages are needed one can overload numInitStages() and add more stages.

Parameters
stagethe init stage

Reimplemented from BaseOverlay.

Definition at line 91 of file Pastry.cc.

{
if ( stage != MIN_STAGE_OVERLAY )
return;
// Pastry provides KBR services
kbr = true;
useDiscovery = par("useDiscovery");
useSecondStage = par("useSecondStage");
pingBeforeSecondStage = par("pingBeforeSecondStage");
secondStageInterval = par("secondStageWait");
discoveryTimeoutAmount = par("discoveryTimeoutAmount");
useRoutingTableMaintenance = par("useRoutingTableMaintenance");
routingTableMaintenanceInterval = par("routingTableMaintenanceInterval");
sendStateAtLeafsetRepair = par("sendStateAtLeafsetRepair");
partialJoinPath = par("partialJoinPath");
readyWaitAmount = par("readyWait");
minimalJoinState = par("minimalJoinState");
overrideOldPastry = par("overrideOldPastry");
overrideNewPastry = par("overrideNewPastry");
useDiscovery = false;
}
useSecondStage = false;
useDiscovery = true;
}
readyWait = new cMessage("readyWait");
secondStageWait = new cMessage("secondStageWait");
joinUpdateWait = new cMessage("joinUpdateWait");
(useDiscovery ? new cMessage("discoveryTimeout") : NULL);
(useRoutingTableMaintenance ? new cMessage("repairTaskTimeout") : NULL);
}
void Pastry::iterativeJoinHook ( BaseOverlayMessage msg,
bool  incrHopCount 
)
protectedvirtual

Reimplemented from BasePastry.

Definition at line 729 of file Pastry.cc.

{
PastryFindNodeExtData* findNodeExt = NULL;
if (msg && msg->hasObject("findNodeExt")) {
findNodeExt =
check_and_cast<PastryFindNodeExtData*>(msg->
getObject("findNodeExt"));
}
// Send state tables on any JOIN message we see:
if (findNodeExt) {
const TransportAddress& stateRecipient =
findNodeExt->getSendStateTo();
if (!stateRecipient.isUnspecified()) {
PastryStateMessage* stateMsg =
-1,
findNodeExt->getJoinHopCount(),
false);
stateBytesSent += stateMsg->getByteLength());
sendMessageToUDP(stateRecipient, stateMsg);
}
if (incrHopCount) {
findNodeExt->setJoinHopCount(findNodeExt->getJoinHopCount() + 1);
}
}
}
void Pastry::joinOverlay ( )
privatevirtual

Definition at line 145 of file Pastry.cc.

Referenced by checkProxCache(), and handleStateMessage().

{
// no existing pastry network -> first node of a new one
} else {
// join existing pastry network
}
}
bool Pastry::mergeState ( void  )
private

Definition at line 1101 of file Pastry.cc.

Referenced by checkProxCache(), handleStateMessage(), and processState().

{
bool ret = true;
if (state == JOIN) {
// building initial state
if (debugOutput) {
EV << "[Pastry::mergeState() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " [JOIN] starting to build own state from "
<< stReceived.size() << " received state messages..."
<< endl;
}
if (stateCache.msg &&
if (debugOutput) {
EV << "[Pastry::mergeState() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " [JOIN] initializing NeighborhoodSet from "
<< stReceived.front().msg->getRow() << ". hop"
<< endl;
}
stReceived.front().prox )) {
EV << "[Pastry::mergeState() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Error initializing own neighborhoodSet"
<< " while joining! Restarting ..."
<< endl;
ret = false;
}
}
if (debugOutput) {
EV << "[Pastry::mergeState() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " [JOIN] initializing LeafSet from "
<< stReceived.back().msg->getRow() << ". hop"
<< endl;
}
if (!leafSet->mergeState(stReceived.back().msg,
stReceived.back().prox )) {
EV << "[Pastry::mergeState() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Error initializing own leafSet while joining!"
<< " Restarting ..."
<< endl;
ret = false;
} else {
}
if (debugOutput) {
EV << "[Pastry::mergeState() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " [JOIN] initializing RoutingTable from all hops"
<< endl;
}
assert(!stateCache.msg ||
EV << "[Pastry::mergeState() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Error initializing own routingTable while joining!"
<< " Restarting ..."
<< endl;
ret = false;
}
} else if (state == READY) {
// merging single state (stateCache.msg)
ret = false;
}
if (!leafSet->mergeState(stateCache.msg, NULL)) {
ret = false;
} else {
}
ret = false;
}
}
if (ret) lastStateChange = simTime();
return ret;
}
void Pastry::pingResponse ( PingResponse pingResponse,
cPolymorphic *  context,
int  rpcId,
simtime_t  rtt 
)
virtual

Reimplemented from BaseRpc.

Definition at line 232 of file Pastry.cc.

{
if (state == DISCOVERY) {
EV << "[Pastry::pingResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Pong (or Ping-context from NeighborCache) received (from "
<< pingResponse->getSrcNode().getIp() << ") in DISCOVERY mode"
<< endl;
if (nearNodeRtt > rtt) {
nearNode = pingResponse->getSrcNode();
nearNodeRtt = rtt;
}
}
}
void Pastry::processState ( void  )
private

Definition at line 1410 of file Pastry.cc.

Referenced by checkProxCache(), doSecondStage(), endProcessingState(), handleStateMessage(), and handleTimerEvent().

{
EV << "[Pastry::processState() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " new \""
<< std::string(cEnum::find("PastryStateMsgType")
->getStringFor(stateCache.msg->getPastryStateMsgType())).erase(0, 13)
<< "\" STATE message " << static_cast<void*>(stateCache.msg)
<< " from " << stateCache.msg->getSender().getIp() << " to process "
<< endl;
} else {
}
}
void Pastry::purgeVectors ( void  )
protectedvirtual

delete all information/messages caching vectors, used for restarting overlay or finish()

Reimplemented from BasePastry.

Definition at line 73 of file Pastry.cc.

Referenced by changeState().

{
// purge vector of waiting sendState messages:
if (! sendStateWait.empty()) {
for (std::vector<PastrySendState*>::iterator it =
sendStateWait.begin(); it != sendStateWait.end(); it++) {
if ( (*it)->isScheduled() ) cancelEvent(*it);
delete *it;
}
sendStateWait.clear();
}
}
bool Pastry::recursiveRoutingHook ( const TransportAddress dest,
BaseRouteMessage msg 
)
protectedvirtual

Hook for forwarded message in recursive lookup mode.

This hook is called just before a message is forwarded to a next hop or if the message is at its destination just before it is sent to the app. Default implementation just returns true. This hook can for example be used to detect failed nodes and call handleFailedNode() before the actual forwarding takes place.

Parameters
destdestination node
msgmessage to send
Returns
true, if message should be forwarded; false, if message will be forwarded later by an other function or message has been discarded

Reimplemented from BaseOverlay.

Definition at line 696 of file Pastry.cc.

{
if (dest == thisNode) {
return true;
}
dynamic_cast<PastryJoinCall*>(msg->getEncapsulatedPacket());
if (call && call->getSrcNode() != thisNode) {
joinBytesSeen += call->getByteLength());
// remove node from state if it is rejoining
PastryStateMessage* stateMsg =
-1,
check_and_cast<OverlayCtrlInfo*>(msg->getControlInfo())->getHopCount(),
false);
stateBytesSent += stateMsg->getByteLength());
sendMessageToUDP(call->getSrcNode(), stateMsg);
}
// forward now:
return true;
}
void Pastry::sendStateDelayed ( const TransportAddress destination)
private

send a standard state message with a small delay

Parameters
destinationdestination node

Definition at line 330 of file Pastry.cc.

Referenced by checkProxCache().

{
PastrySendState* selfMsg = new PastrySendState("sendStateWait");
selfMsg->setDest(destination);
sendStateWait.push_back(selfMsg);
scheduleAt(simTime() + 0.0001, selfMsg);
}

Member Data Documentation

int Pastry::depth
private

Definition at line 139 of file Pastry.h.

Referenced by changeState(), handleRequestRoutingRowResponse(), and handleTimerEvent().

uint16_t Pastry::discoveryModeProbedNodes
private
cMessage* Pastry::discoveryTimeout
private
simtime_t Pastry::discoveryTimeoutAmount
private
bool Pastry::minimalJoinState
private
std::vector<TransportAddress> Pastry::notifyList
protected

List of nodes to notify after join.

Definition at line 125 of file Pastry.h.

Referenced by changeState(), clearVectors(), doJoinUpdate(), and doSecondStage().

bool Pastry::overrideNewPastry
private

Definition at line 151 of file Pastry.h.

Referenced by initializeOverlay().

bool Pastry::overrideOldPastry
private

Definition at line 150 of file Pastry.h.

Referenced by initializeOverlay().

bool Pastry::partialJoinPath
private

Definition at line 136 of file Pastry.h.

Referenced by handleTimerEvent(), and initializeOverlay().

bool Pastry::pingBeforeSecondStage
private

Definition at line 148 of file Pastry.h.

Referenced by handleStateMessage(), initializeOverlay(), and processState().

cMessage* Pastry::repairTaskTimeout
private

Definition at line 156 of file Pastry.h.

Referenced by changeState(), handleTimerEvent(), initializeOverlay(), and ~Pastry().

cMessage* Pastry::ringCheck
private

Definition at line 154 of file Pastry.h.

simtime_t Pastry::routingTableMaintenanceInterval
private

Definition at line 134 of file Pastry.h.

Referenced by changeState(), handleTimerEvent(), initializeOverlay(), and ~Pastry().

simtime_t Pastry::secondStageInterval
private

Definition at line 133 of file Pastry.h.

Referenced by changeState(), and initializeOverlay().

cMessage* Pastry::secondStageWait
private

Definition at line 153 of file Pastry.h.

Referenced by changeState(), handleTimerEvent(), initializeOverlay(), and ~Pastry().

bool Pastry::sendStateAtLeafsetRepair
private

Definition at line 147 of file Pastry.h.

Referenced by handleFailedNode(), and initializeOverlay().

std::vector<PastrySendState*> Pastry::sendStateWait
protected

Definition at line 127 of file Pastry.h.

Referenced by handleTimerEvent(), purgeVectors(), and sendStateDelayed().

std::vector<PastryStateMsgHandle> Pastry::stReceived
protected

State messages to process during join.

Definition at line 119 of file Pastry.h.

Referenced by checkProxCache(), clearVectors(), doJoinUpdate(), handleStateMessage(), handleTimerEvent(), and mergeState().

std::vector<PastryStateMsgHandle>::iterator Pastry::stReceivedPos
protected

Definition at line 120 of file Pastry.h.

Referenced by checkProxCache(), clearVectors(), handleStateMessage(), and handleTimerEvent().

int Pastry::updateCounter
private

Definition at line 141 of file Pastry.h.

Referenced by checkProxCache(), and initializeOverlay().

bool Pastry::useDiscovery
private

Definition at line 144 of file Pastry.h.

Referenced by initializeOverlay(), joinOverlay(), and ~Pastry().

bool Pastry::useRoutingTableMaintenance
private

Definition at line 146 of file Pastry.h.

Referenced by changeState(), and initializeOverlay().

bool Pastry::useSecondStage
private

Definition at line 145 of file Pastry.h.

Referenced by changeState(), and initializeOverlay().


The documentation for this class was generated from the following files: