/* $Id$ */
/*
- * Copyright (c) 2006 Stanford University.
+ * Copyright (c) 2008-9 Stanford University.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
*/
/**
- * The ForwardingEngine is responsible for queueing and scheduling outgoing
- * packets in a collection protocol. It maintains a pool of forwarding messages
- * and a packet send
- * queue. A ForwardingEngine with a forwarding message pool of size <i>F</i>
- * and <i>C</i> CollectionSenderC clients has a send queue of size
- * <i>F + C</i>. This implementation has a large number of configuration
- * constants, which can be found in <code>ForwardingEngine.h</code>.
+ * This component contains the forwarding path of CTP Noe, the
+ * standard CTP implementation packaged with TinyOS 2.x. The CTP
+ * specification can be found in TEP 123. The paper entitled
+ * "Collection Tree Protocol," by Omprakash Gnawali et al., in SenSys
+ * 2009, describes the implementation and provides detailed
+ * performance results of CTP Noe.</p>
*
- * <p>Packets in the send queue are sent in FIFO order, with head-of-line
- * blocking. Because this is a tree collection protocol, all packets are going
- * to the same destination, and so the ForwardingEngine does not distinguish
- * packets from one another: packets from CollectionSenderC clients are
- * treated identically to forwarded packets.</p>
+ * <p>The CTP ForwardingEngine is responsible for queueing and
+ * scheduling outgoing packets. It maintains a pool of forwarding
+ * messages and a packet send queue. A ForwardingEngine with a
+ * forwarding message pool of size <i>F</i> and <i>C</i>
+ * CollectionSenderC clients has a send queue of size <i>F +
+ * C</i>. This implementation several configuration constants, which
+ * can be found in <code>ForwardingEngine.h</code>.</p>
+ *
+ * <p>Packets in the send queue are sent in FIFO order, with
+ * head-of-line blocking. Because this is a tree collection protocol,
+ * all packets are going to the same destination, and so the
+ * ForwardingEngine does not distinguish packets from one
+ * another. Packets from CollectionSenderC clients are sent
+ * identically to forwarded packets: only their buffer handling is
+ * different.</p>
*
* <p>If ForwardingEngine is on top of a link layer that supports
* synchronous acknowledgments, it enables them and retransmits packets
* when they are not acked. It transmits a packet up to MAX_RETRIES times
- * before giving up and dropping the packet.</p>
+ * before giving up and dropping the packet. MAX_RETRIES is typically a
+ * large number (e.g., >20), as this implementation assumes there is
+ * link layer feedback on failed packets, such that link costs will go
+ * up and cause the routing layer to pick a next hop. If the underlying
+ * link layer does not support acknowledgments, ForwardingEngine sends
+ * a packet only once.</p>
*
* <p>The ForwardingEngine detects routing loops and tries to correct
- * them. It assumes that the collection tree is based on a gradient,
- * such as hop count or estimated transmissions. When the ForwardingEngine
- * sends a packet to the next hop, it puts the local gradient value in
- * the packet header. If a node receives a packet to forward whose
- * gradient value is less than its own, then the gradient is not monotonically
- * decreasing and there may be a routing loop. When the ForwardingEngine
- * receives such a packet, it tells the RoutingEngine to advertise its
- * gradient value soon, with the hope that the advertisement will update
- * the node who just sent a packet and break the loop.
+ * them. Routing is in terms of a cost gradient, where the collection
+ * root has a cost of zero and a node's cost is the cost of its next
+ * hop plus the cost of the link to that next hop. If there are no
+ * loops, then this gradient value decreases monotonically along a
+ * route. When the ForwardingEngine sends a packet to the next hop,
+ * it puts the local gradient value in the packet header. If a node
+ * receives a packet to forward whose gradient value is less than its
+ * own, then the gradient is not monotonically decreasing and there
+ * may be a routing loop. When the ForwardingEngine receives such a
+ * packet, it tells the RoutingEngine to advertise its gradient value
+ * soon, with the hope that the advertisement will update the node
+ * who just sent a packet and break the loop. It also pauses the
+ * before the next packet transmission, in hopes of giving the
+ * routing layer's packet a priority.</p>
*
- * <p>ForwardingEngine times its packet transmissions. It differentiates
- * between four transmission cases: forwarding, success, ack failure,
- * and loop detection. In each case, the
- * ForwardingEngine waits a randomized period of time before sending the next
- * packet. This approach assumes that the network is operating at low
- * utilization; its goal is to prevent correlated traffic -- such as
- * nodes along a route forwarding packets -- from interfering with itself.
+ * <p>ForwardingEngine times its packet transmissions. It
+ * differentiates between four transmission cases: forwarding,
+ * success, ack failure, and loop detection. In each case, the
+ * ForwardingEngine waits a randomized period of time before sending
+ * the next packet. This approach assumes that the network is
+ * operating at low utilization; its goal is to prevent correlated
+ * traffic -- such as nodes along a route forwarding packets -- from
+ * interfering with itself.</p>
*
- * <table>
- * <tr>
- * <td><b>Case</b></td>
- * <td><b>CC2420 Wait (ms)</b></td>
- * <td><b>Other Wait (ms)</b></td>
- * <td><b>Description</b></td>
- * </tr>
- * <tr>
- * <td>Forwarding</td>
- * <td>Immediate</td>
- * <td>Immediate</td>
- * <td>When the ForwardingEngine receives a packet to forward and it is not
- * already sending a packet (queue is empty). In this case, it immediately
- * forwards the packet.</td>
- * </tr>
- * <tr>
- * <td>Success</td>
- * <td>16-31</td>
- * <td>128-255</td>
- * <td>When the ForwardingEngine successfully sends a packet to the next
- * hop, it waits this long before sending the next packet in the queue.
- * </td>
- * </tr>
- * <tr>
- * <td>Ack Failure</td>
- * <td>8-15</td>
- * <td>128-255</td>
- * <td>If the link layer supports acks and the ForwardingEngine did not
- * receive an acknowledgment from the next hop, it waits this long before
- * trying a retransmission. If the packet has exceeded the retransmission
- * count, ForwardingEngine drops the packet and uses the Success timer instead. </td>
- * </tr>
- * <tr>
- * <td>Loop Detection</td>
- * <td>32-63</td>
- * <td>512-1023</td>
- * <td>If the ForwardingEngine is asked to forward a packet from a node that
- * believes it is closer to the root, the ForwardingEngine pauses its
- * transmissions for this interval and triggers the RoutingEngine to
- * send an update. The goal is to let the gradient become consistent before
- * sending packets, in order to prevent routing loops from consuming
- * bandwidth and energy.</td>
- * </tr>
- * </table>
+ * <p>While this implementation can work on top of a variety of link
+ * estimators, it is designed to work with a 4-bit link estimator
+ * (4B). Details on 4B can be found in the HotNets paper "Four Bit
+ * Link Estimation" by Rodrigo Fonseca et al. The forwarder provides
+ * the "ack" bit for each sent packet, telling the estimator whether
+ * the packet was acknowledged.</p>
*
- * <p>The times above are all for CC2420-based platforms. The timings for
- * other platforms depend on their bit rates, as they are based on packet
- * transmission times.</p>
-
* @author Philip Levis
* @author Kyle Jamieson
* @date $Date$
interface CtpCongestion;
}
uses {
+ // These five interfaces are used in the forwarding path
+ // SubSend is for sending packets
+ // PacketAcknowledgements is for enabling layer 2 acknowledgments
+ // RetxmitTimer is for timing packet sends for improved performance
+ // LinkEstimator is for providing the ack bit to a link estimator
interface AMSend as SubSend;
- interface Receive as SubReceive;
- interface Receive as SubSnoop;
- interface Packet as SubPacket;
+ interface PacketAcknowledgements;
+ interface Timer<TMilli> as RetxmitTimer;
+ interface LinkEstimator;
interface UnicastNameFreeRouting;
- interface SplitControl as RadioControl;
+ interface Packet as SubPacket;
+
+ // These four data structures are used to manage packets to forward.
+ // SendQueue and QEntryPool are the forwarding queue.
+ // MessagePool is the buffer pool for messages to forward.
+ // SentCache is for suppressing duplicate packet transmissions.
interface Queue<fe_queue_entry_t*> as SendQueue;
interface Pool<fe_queue_entry_t> as QEntryPool;
interface Pool<message_t> as MessagePool;
- interface Timer<TMilli> as RetxmitTimer;
-
- interface LinkEstimator;
-
- // Counts down from the last time we heard from our parent; used
- // to expire local state about parent congestion.
- interface Timer<TMilli> as CongestionTimer;
-
interface Cache<message_t*> as SentCache;
+
+ interface Receive as SubReceive;
+ interface Receive as SubSnoop;
interface CtpInfo;
- interface PacketAcknowledgements;
- interface Random;
interface RootControl;
interface CollectionId[uint8_t client];
interface AMPacket;
- interface CollectionDebug;
interface Leds;
+ interface Random;
+
+ // This implementation has extensive debugging instrumentation.
+ // Wiring up the CollectionDebug interface provides information
+ // on important events, such as transmissions, receptions,
+ // and cache checks. The TinyOS release includes scripts for
+ // parsing these messages.
+ interface CollectionDebug;
+
+
+ // The ForwardingEngine monitors whether the underlying
+ // radio is on or not in order to start/stop forwarding
+ // as appropriate.
+ interface SplitControl as RadioControl;
}
}
implementation {
* masked by the given mask and added to the given offset.
*/
static void startRetxmitTimer(uint16_t mask, uint16_t offset);
- static void startCongestionTimer(uint16_t mask, uint16_t offset);
-
- /* Indicates whether our client is congested */
- bool clientCongested = FALSE;
-
- /* Tracks our parent's congestion state. */
- bool parentCongested = FALSE;
-
- /* Threshold for congestion */
- uint8_t congestionThreshold;
-
- /* Keeps track of whether the routing layer is running; if not,
- * it will not send packets. */
- bool running = FALSE;
-
- /* Keeps track of whether the radio is on; no sense sending packets
- * if the radio is off. */
- bool radioOn = FALSE;
+ void clearState(uint8_t state);
+ bool hasState(uint8_t state);
+ void setState(uint8_t state);
- /* Keeps track of whether an ack is pending on an outgoing packet,
- * so that the engine can work unreliably when the data-link layer
- * does not support acks. */
- bool ackPending = FALSE;
-
- /* Keeps track of whether the packet on the head of the queue
- * is being used, and control access to the data-link layer.*/
- bool sending = FALSE;
+ // CTP state variables.
+ enum {
+ QUEUE_CONGESTED = 0x1, // Need to set C bit?
+ ROUTING_ON = 0x2, // Forwarding running?
+ RADIO_ON = 0x4, // Radio is on?
+ ACK_PENDING = 0x8, // Have an ACK pending?
+ SENDING = 0x10 // Am sending a packet?
+ };
+ // Start with all states false
+ uint8_t forwardingState = 0;
+
/* Keep track of the last parent address we sent to, so that
unacked packets to an old parent are not incorrectly attributed
to a new parent. */
clientPtrs[i] = clientEntries + i;
dbg("Forwarder", "clientPtrs[%hhu] = %p\n", i, clientPtrs[i]);
}
- congestionThreshold = (call SendQueue.maxSize()) >> 1;
loopbackMsgPtr = &loopbackMsg;
lastParent = call AMPacket.address();
seqno = 0;
}
command error_t StdControl.start() {
- running = TRUE;
+ setState(ROUTING_ON);
return SUCCESS;
}
command error_t StdControl.stop() {
- running = FALSE;
+ clearState(ROUTING_ON);
return SUCCESS;
}
when it turns on, it then starts sending packets. */
event void RadioControl.startDone(error_t err) {
if (err == SUCCESS) {
- radioOn = TRUE;
+ setState(RADIO_ON);
if (!call SendQueue.empty()) {
+ dbg("FHangBug", "%s posted sendTask.\n", __FUNCTION__);
post sendTask();
}
}
}
-
+
+ static void startRetxmitTimer(uint16_t window, uint16_t offset) {
+ uint16_t r = call Random.rand16();
+ r %= window;
+ r += offset;
+ call RetxmitTimer.startOneShot(r);
+ dbg("Forwarder", "Rexmit timer will fire in %hu ms\n", r);
+ }
+
/*
* If the ForwardingEngine has stopped sending packets because
* these has been no route, then as soon as one is found, start
* sending packets.
*/
event void UnicastNameFreeRouting.routeFound() {
+ dbg("FHangBug", "%s posted sendTask.\n", __FUNCTION__);
post sendTask();
}
event void RadioControl.stopDone(error_t err) {
if (err == SUCCESS) {
- radioOn = FALSE;
+ clearState(RADIO_ON);
}
}
* already sending packets (the RetxmitTimer isn't running), post
* sendTask. It could be that the engine is running and sendTask
* has already been posted, but the post-once semantics make this
- * not matter.
+ * not matter. What's important is that you don't post sendTask
+ * if the retransmit timer is running; this would circumvent the
+ * timer and send a packet before it fires.
*/
command error_t Send.send[uint8_t client](message_t* msg, uint8_t len) {
ctp_data_header_t* hdr;
fe_queue_entry_t *qe;
dbg("Forwarder", "%s: sending packet from client %hhu: %x, len %hhu\n", __FUNCTION__, client, msg, len);
- if (!running) {return EOFF;}
+ if (!hasState(ROUTING_ON)) {return EOFF;}
if (len > call Send.maxPayloadLength[client]()) {return ESIZE;}
call Packet.setPayloadLength(msg, len);
qe->retries = MAX_RETRIES;
dbg("Forwarder", "%s: queue entry for %hhu is %hhu deep\n", __FUNCTION__, client, call SendQueue.size());
if (call SendQueue.enqueue(qe) == SUCCESS) {
- if (radioOn && !call RetxmitTimer.isRunning()) {
+ if (hasState(RADIO_ON) && !hasState(SENDING)) {
+ dbg("FHangBug", "%s posted sendTask.\n", __FUNCTION__);
post sendTask();
}
clientPtrs[client] = NULL;
* These is where all of the send logic is. When the ForwardingEngine
* wants to send a packet, it posts this task. The send logic is
* independent of whether it is a forwarded packet or a packet from
- * a send client.
+ * a send clientL the two cases differ in how memory is managed in
+ * sendDone.
*
* The task first checks that there is a packet to send and that
* there is a valid route. It then marshals the relevant arguments
*/
task void sendTask() {
+ uint16_t gradient;
dbg("Forwarder", "%s: Trying to send a packet. Queue size is %hhu.\n", __FUNCTION__, call SendQueue.size());
- if (sending) {
- dbg("Forwarder", "%s: busy, don't send\n", __FUNCTION__);
- call CollectionDebug.logEvent(NET_C_FE_SEND_BUSY);
- return;
- }
- else if (call SendQueue.empty()) {
- dbg("Forwarder", "%s: queue empty, don't send\n", __FUNCTION__);
+ if (hasState(SENDING) || call SendQueue.empty()) {
call CollectionDebug.logEvent(NET_C_FE_SENDQUEUE_EMPTY);
return;
}
- else if (!call RootControl.isRoot() &&
- !call UnicastNameFreeRouting.hasRoute()) {
- dbg("Forwarder", "%s: no route, don't send, start retry timer\n", __FUNCTION__);
- call RetxmitTimer.startOneShot(10000);
-
- // send a debug message to the uart
+ else if ((!call RootControl.isRoot() &&
+ !call UnicastNameFreeRouting.hasRoute()) ||
+ (call CtpInfo.getEtx(&gradient) != SUCCESS)) {
+ /* This code path is for when we don't have a valid next
+ * hop. We set a retry timer.
+ *
+ * Technically, this timer isn't necessary, as if a route
+ * is found we'll get an event. But just in case such an event
+ * is lost (e.g., a bug in the routing engine), we retry.
+ * Otherwise the forwarder might hang indefinitely. As this test
+ * doesn't require radio activity, the energy cost is minimal. */
+ dbg("Forwarder", "%s: no route, don't send, try again in %i.\n", __FUNCTION__, NO_ROUTE_RETRY);
+ call RetxmitTimer.startOneShot(NO_ROUTE_RETRY);
call CollectionDebug.logEvent(NET_C_FE_NO_ROUTE);
-
return;
}
- /*
- else if (parentCongested) {
- // Do nothing; the congestion timer is necessarily set which
- // will clear parentCongested and repost sendTask().
- dbg("Forwarder", "%s: sendTask deferring for congested parent\n",
- __FUNCTION__);
- call CollectionDebug.logEvent(NET_C_FE_CONGESTION_SENDWAIT);
- }
- */
else {
+ /* We can send a packet.
+ First check if it's a duplicate;
+ if not, try to send/forward. */
error_t subsendResult;
fe_queue_entry_t* qe = call SendQueue.head();
uint8_t payloadLen = call SubPacket.payloadLength(qe->msg);
am_addr_t dest = call UnicastNameFreeRouting.nextHop();
- uint16_t gradient;
- if (call CtpInfo.isNeighborCongested(dest)) {
- // Our parent is congested. We should wait.
- // Don't repost the task, CongestionTimer will do the job
- if (! parentCongested ) {
- parentCongested = TRUE;
- call CollectionDebug.logEvent(NET_C_FE_CONGESTION_BEGIN);
- }
- if (! call CongestionTimer.isRunning()) {
- startCongestionTimer(CONGESTED_WAIT_WINDOW, CONGESTED_WAIT_OFFSET);
- }
- dbg("Forwarder", "%s: sendTask deferring for congested parent\n",
- __FUNCTION__);
- //call CollectionDebug.logEvent(NET_C_FE_CONGESTION_SENDWAIT);
- return;
- }
- if (parentCongested) {
- parentCongested = FALSE;
- call CollectionDebug.logEvent(NET_C_FE_CONGESTION_END);
- }
- // Once we are here, we have decided to send the packet.
if (call SentCache.lookup(qe->msg)) {
+ /* This packet is a duplicate, so suppress it: free memory and
+ * send next packet. Duplicates are only possible for
+ * forwarded packets, so we can circumvent the client or
+ * forwarded branch for freeing the buffer. */
call CollectionDebug.logEvent(NET_C_FE_DUPLICATE_CACHE_AT_SEND);
call SendQueue.dequeue();
- if (call MessagePool.put(qe->msg) != SUCCESS)
- call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR);
- if (call QEntryPool.put(qe) != SUCCESS)
- call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR);
+ if (call MessagePool.put(qe->msg) != SUCCESS)
+ call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR);
+ if (call QEntryPool.put(qe) != SUCCESS)
+ call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR);
+
post sendTask();
return;
}
- /* If our current parent is not the same as the last parent
- we sent do, then reset the count of unacked packets: don't
- penalize a new parent for the failures of a prior one.*/
- if (dest != lastParent) {
- qe->retries = MAX_RETRIES;
- lastParent = dest;
- }
-
+
+ // Not a duplicate: we've decided we're going to send.
dbg("Forwarder", "Sending queue entry %p\n", qe);
+
if (call RootControl.isRoot()) {
+ /* Code path for roots: copy the packet and signal receive. */
collection_id_t collectid = getHeader(qe->msg)->type;
+ uint8_t* payload;
+ uint8_t payloadLength;
+
memcpy(loopbackMsgPtr, qe->msg, sizeof(message_t));
- ackPending = FALSE;
-
+
+ payload = call Packet.getPayload(loopbackMsgPtr, call Packet.payloadLength(loopbackMsgPtr));
+ payloadLength = call Packet.payloadLength(loopbackMsgPtr);
dbg("Forwarder", "%s: I'm a root, so loopback and signal receive.\n", __FUNCTION__);
loopbackMsgPtr = signal Receive.receive[collectid](loopbackMsgPtr,
- call Packet.getPayload(loopbackMsgPtr, call Packet.payloadLength(loopbackMsgPtr)),
- call Packet.payloadLength(loopbackMsgPtr));
+ payload,
+ payloadLength);
signal SubSend.sendDone(qe->msg, SUCCESS);
- return;
}
-
- // Loop-detection functionality:
- if (call CtpInfo.getEtx(&gradient) != SUCCESS) {
- // If we have no metric, set our gradient conservatively so
- // that other nodes don't automatically drop our packets.
- gradient = 0;
- }
- call CtpPacket.setEtx(qe->msg, gradient);
-
- ackPending = (call PacketAcknowledgements.requestAck(qe->msg) == SUCCESS);
-
- // Set or clear the congestion bit on *outgoing* packets.
- if (call CtpCongestion.isCongested())
- call CtpPacket.setOption(qe->msg, CTP_OPT_ECN);
- else
- call CtpPacket.clearOption(qe->msg, CTP_OPT_ECN);
-
- subsendResult = call SubSend.send(dest, qe->msg, payloadLen);
- if (subsendResult == SUCCESS) {
- // Successfully submitted to the data-link layer.
- sending = TRUE;
- dbg("Forwarder", "%s: subsend succeeded with %p.\n", __FUNCTION__, qe->msg);
- if (qe->client < CLIENT_COUNT) {
- dbg("Forwarder", "%s: client packet.\n", __FUNCTION__);
- }
- else {
- dbg("Forwarder", "%s: forwarded packet.\n", __FUNCTION__);
- }
- return;
- }
- else if (subsendResult == EOFF) {
- // The radio has been turned off underneath us. Assume that
- // this is for the best. When the radio is turned back on, we'll
- // handle a startDone event and resume sending.
- radioOn = FALSE;
- dbg("Forwarder", "%s: subsend failed from EOFF.\n", __FUNCTION__);
- // send a debug message to the uart
- call CollectionDebug.logEvent(NET_C_FE_SUBSEND_OFF);
- }
- else if (subsendResult == EBUSY) {
- // This shouldn't happen, as we sit on top of a client and
- // control our own output; it means we're trying to
- // double-send (bug). This means we expect a sendDone, so just
- // wait for that: when the sendDone comes in, // we'll try
- // sending this packet again.
- dbg("Forwarder", "%s: subsend failed from EBUSY.\n", __FUNCTION__);
- // send a debug message to the uart
- call CollectionDebug.logEvent(NET_C_FE_SUBSEND_BUSY);
- }
- else if (subsendResult == ESIZE) {
- dbg("Forwarder", "%s: subsend failed from ESIZE: truncate packet.\n", __FUNCTION__);
- call Packet.setPayloadLength(qe->msg, call Packet.maxPayloadLength());
- post sendTask();
- call CollectionDebug.logEvent(NET_C_FE_SUBSEND_SIZE);
+ else {
+ /* The basic forwarding/sending case. */
+ call CtpPacket.setEtx(qe->msg, gradient);
+ call CtpPacket.clearOption(qe->msg, CTP_OPT_ECN | CTP_OPT_PULL);
+ if (call PacketAcknowledgements.requestAck(qe->msg) == SUCCESS) {
+ setState(ACK_PENDING);
+ }
+ if (hasState(QUEUE_CONGESTED)) {
+ call CtpPacket.setOption(qe->msg, CTP_OPT_ECN);
+ clearState(QUEUE_CONGESTED);
+ }
+
+ subsendResult = call SubSend.send(dest, qe->msg, payloadLen);
+ if (subsendResult == SUCCESS) {
+ // Successfully submitted to the data-link layer.
+ setState(SENDING);
+ dbg("Forwarder", "%s: subsend succeeded with %p.\n", __FUNCTION__, qe->msg);
+ return;
+ }
+ // The packet is too big: truncate it and retry.
+ else if (subsendResult == ESIZE) {
+ dbg("Forwarder", "%s: subsend failed from ESIZE: truncate packet.\n", __FUNCTION__);
+ call Packet.setPayloadLength(qe->msg, call Packet.maxPayloadLength());
+ post sendTask();
+ call CollectionDebug.logEvent(NET_C_FE_SUBSEND_SIZE);
+ }
+ else {
+ dbg("Forwarder", "%s: subsend failed from %i\n", __FUNCTION__, (int)subsendResult);
+ }
}
}
}
- void sendDoneBug() {
- // send a debug message to the uart
- call CollectionDebug.logEvent(NET_C_FE_BAD_SENDDONE);
- }
/*
* The second phase of a send operation; based on whether the transmission was
*
*/
+ void packetComplete(fe_queue_entry_t* qe, message_t* msg, bool success) {
+ // Four cases:
+ // Local packet: success or failure
+ // Forwarded packet: success or failure
+ if (qe->client < CLIENT_COUNT) {
+ clientPtrs[qe->client] = qe;
+ signal Send.sendDone[qe->client](msg, SUCCESS);
+ if (success) {
+ dbg("CtpForwarder", "%s: packet %hu.%hhu for client %hhu acknowledged.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg), qe->client);
+ call CollectionDebug.logEventMsg(NET_C_FE_SENT_MSG,
+ call CollectionPacket.getSequenceNumber(msg),
+ call CollectionPacket.getOrigin(msg),
+ call AMPacket.destination(msg));
+ } else {
+ dbg("CtpForwarder", "%s: packet %hu.%hhu for client %hhu dropped.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg), qe->client);
+ call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL_ACK_SEND,
+ call CollectionPacket.getSequenceNumber(msg),
+ call CollectionPacket.getOrigin(msg),
+ call AMPacket.destination(msg));
+ }
+ }
+ else {
+ if (success) {
+ call SentCache.insert(qe->msg);
+ dbg("CtpForwarder", "%s: forwarded packet %hu.%hhu acknowledged: insert in transmit queue.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg));
+ call CollectionDebug.logEventMsg(NET_C_FE_FWD_MSG,
+ call CollectionPacket.getSequenceNumber(msg),
+ call CollectionPacket.getOrigin(msg),
+ call AMPacket.destination(msg));
+ }
+ else {
+ dbg("CtpForwarder", "%s: forwarded packet %hu.%hhu dropped.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg));
+ call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL_ACK_FWD,
+ call CollectionPacket.getSequenceNumber(msg),
+ call CollectionPacket.getOrigin(msg),
+ call AMPacket.destination(msg));
+ }
+ if (call MessagePool.put(qe->msg) != SUCCESS)
+ call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR);
+ if (call QEntryPool.put(qe) != SUCCESS)
+ call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR);
+ }
+ }
+
event void SubSend.sendDone(message_t* msg, error_t error) {
fe_queue_entry_t *qe = call SendQueue.head();
dbg("Forwarder", "%s to %hu and %hhu\n", __FUNCTION__, call AMPacket.destination(msg), error);
- if (qe == NULL || qe->msg != msg) {
- dbg("Forwarder", "%s: BUG: not our packet (%p != %p)!\n", __FUNCTION__, msg, qe->msg);
- sendDoneBug(); // Not our packet, something is very wrong...
- return;
- }
- else if (error != SUCCESS) {
- // Immediate retransmission is the worst thing to do.
+
+ if (error != SUCCESS) {
+ /* The radio wasn't able to send the packet: retransmit it. */
dbg("Forwarder", "%s: send failed\n", __FUNCTION__);
call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL,
- call CollectionPacket.getSequenceNumber(msg),
- call CollectionPacket.getOrigin(msg),
- call AMPacket.destination(msg));
+ call CollectionPacket.getSequenceNumber(msg),
+ call CollectionPacket.getOrigin(msg),
+ call AMPacket.destination(msg));
startRetxmitTimer(SENDDONE_FAIL_WINDOW, SENDDONE_FAIL_OFFSET);
}
- else if (ackPending && !call PacketAcknowledgements.wasAcked(msg)) {
- // AckPending is for case when DL cannot support acks.
+ else if (hasState(ACK_PENDING) && !call PacketAcknowledgements.wasAcked(msg)) {
+ /* No ack: if countdown is not 0, retransmit, else drop the packet. */
call LinkEstimator.txNoAck(call AMPacket.destination(msg));
call CtpInfo.recomputeRoutes();
if (--qe->retries) {
- dbg("Forwarder", "%s: not acked\n", __FUNCTION__);
+ dbg("Forwarder", "%s: not acked, retransmit\n", __FUNCTION__);
call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_WAITACK,
call CollectionPacket.getSequenceNumber(msg),
call CollectionPacket.getOrigin(msg),
call AMPacket.destination(msg));
startRetxmitTimer(SENDDONE_NOACK_WINDOW, SENDDONE_NOACK_OFFSET);
} else {
- //max retries, dropping packet
- if (qe->client < CLIENT_COUNT) {
- clientPtrs[qe->client] = qe;
- signal Send.sendDone[qe->client](msg, FAIL);
- call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL_ACK_SEND,
- call CollectionPacket.getSequenceNumber(msg),
- call CollectionPacket.getOrigin(msg),
- call AMPacket.destination(msg));
- } else {
- if (call MessagePool.put(qe->msg) != SUCCESS)
- call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR);
- if (call QEntryPool.put(qe) != SUCCESS)
- call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR);
- call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL_ACK_FWD,
- call CollectionPacket.getSequenceNumber(msg),
- call CollectionPacket.getOrigin(msg),
- call AMPacket.destination(msg));
- }
- call SendQueue.dequeue();
- sending = FALSE;
+ /* Hit max retransmit threshold: drop the packet. */
+ call SendQueue.dequeue();
+ clearState(SENDING);
startRetxmitTimer(SENDDONE_OK_WINDOW, SENDDONE_OK_OFFSET);
+
+ packetComplete(qe, msg, FALSE);
}
}
- else if (qe->client < CLIENT_COUNT) {
- ctp_data_header_t* hdr;
- uint8_t client = qe->client;
- dbg("Forwarder", "%s: our packet for client %hhu, remove %p from queue\n",
- __FUNCTION__, client, qe);
- call CollectionDebug.logEventMsg(NET_C_FE_SENT_MSG,
- call CollectionPacket.getSequenceNumber(msg),
- call CollectionPacket.getOrigin(msg),
- call AMPacket.destination(msg));
- call LinkEstimator.txAck(call AMPacket.destination(msg));
- clientPtrs[client] = qe;
- hdr = getHeader(qe->msg);
+ else {
+ /* Packet was acknowledged. Updated the link estimator,
+ free the buffer (pool or sendDone), start timer to
+ send next packet. */
call SendQueue.dequeue();
- signal Send.sendDone[client](msg, SUCCESS);
- sending = FALSE;
+ clearState(SENDING);
startRetxmitTimer(SENDDONE_OK_WINDOW, SENDDONE_OK_OFFSET);
- }
- else if (call MessagePool.size() < call MessagePool.maxSize()) {
- // A successfully forwarded packet.
- dbg("Forwarder,Route", "%s: successfully forwarded packet (client: %hhu), message pool is %hhu/%hhu.\n", __FUNCTION__, qe->client, call MessagePool.size(), call MessagePool.maxSize());
- call CollectionDebug.logEventMsg(NET_C_FE_FWD_MSG,
- call CollectionPacket.getSequenceNumber(msg),
- call CollectionPacket.getOrigin(msg),
- call AMPacket.destination(msg));
call LinkEstimator.txAck(call AMPacket.destination(msg));
- call SentCache.insert(qe->msg);
- call SendQueue.dequeue();
- if (call MessagePool.put(qe->msg) != SUCCESS)
- call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR);
- if (call QEntryPool.put(qe) != SUCCESS)
- call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR);
- sending = FALSE;
- startRetxmitTimer(SENDDONE_OK_WINDOW, SENDDONE_OK_OFFSET);
- }
- else {
- dbg("Forwarder", "%s: BUG: we have a pool entry, but the pool is full, client is %hhu.\n", __FUNCTION__, qe->client);
- sendDoneBug(); // It's a forwarded packet, but there's no room the pool;
- // someone has double-stored a pointer somewhere and we have nowhere
- // to put this, so we have to leak it...
+ packetComplete(qe, msg, TRUE);
}
}
if (!call RetxmitTimer.isRunning()) {
// sendTask is only immediately posted if we don't detect a
// loop.
+ dbg("FHangBug", "%s: posted sendTask.\n", __FUNCTION__);
post sendTask();
}
}
event void RetxmitTimer.fired() {
- sending = FALSE;
- post sendTask();
- }
-
- event void CongestionTimer.fired() {
- //parentCongested = FALSE;
- //call CollectionDebug.logEventSimple(NET_C_FE_CONGESTION_END, 0);
+ clearState(SENDING);
+ dbg("FHangBug", "%s posted sendTask.\n", __FUNCTION__);
post sendTask();
}
-
command bool CtpCongestion.isCongested() {
- // A simple predicate for now to determine congestion state of
- // this node.
- bool congested = (call SendQueue.size() > congestionThreshold) ?
- TRUE : FALSE;
- return ((congested || clientCongested)?TRUE:FALSE);
+ return FALSE;
}
command void CtpCongestion.setClientCongested(bool congested) {
- bool wasCongested = call CtpCongestion.isCongested();
- clientCongested = congested;
- if (!wasCongested && congested) {
- call CtpInfo.triggerImmediateRouteUpdate();
- } else if (wasCongested && ! (call CtpCongestion.isCongested())) {
- call CtpInfo.triggerRouteUpdate();
- }
+ // Do not respond to congestion.
}
+
+ /* signalled when this neighbor is evicted from the neighbor table */
+ event void LinkEstimator.evicted(am_addr_t neighbor) {}
+
+ // Packet ADT commands
command void Packet.clear(message_t* msg) {
call SubPacket.clear(msg);
}
-
+
command uint8_t Packet.payloadLength(message_t* msg) {
return call SubPacket.payloadLength(msg) - sizeof(ctp_data_header_t);
}
return payload;
}
+ // CollectionPacket ADT commands
command am_addr_t CollectionPacket.getOrigin(message_t* msg) {return getHeader(msg)->origin;}
-
command collection_id_t CollectionPacket.getType(message_t* msg) {return getHeader(msg)->type;}
command uint8_t CollectionPacket.getSequenceNumber(message_t* msg) {return getHeader(msg)->originSeqNo;}
command void CollectionPacket.setOrigin(message_t* msg, am_addr_t addr) {getHeader(msg)->origin = addr;}
command void CollectionPacket.setType(message_t* msg, collection_id_t id) {getHeader(msg)->type = id;}
command void CollectionPacket.setSequenceNumber(message_t* msg, uint8_t _seqno) {getHeader(msg)->originSeqNo = _seqno;}
-
- //command ctp_options_t CtpPacket.getOptions(message_t* msg) {return getHeader(msg)->options;}
+ // CtpPacket ADT commands
command uint8_t CtpPacket.getType(message_t* msg) {return getHeader(msg)->type;}
command am_addr_t CtpPacket.getOrigin(message_t* msg) {return getHeader(msg)->origin;}
command uint16_t CtpPacket.getEtx(message_t* msg) {return getHeader(msg)->etx;}
command uint8_t CtpPacket.getSequenceNumber(message_t* msg) {return getHeader(msg)->originSeqNo;}
command uint8_t CtpPacket.getThl(message_t* msg) {return getHeader(msg)->thl;}
-
command void CtpPacket.setThl(message_t* msg, uint8_t thl) {getHeader(msg)->thl = thl;}
command void CtpPacket.setOrigin(message_t* msg, am_addr_t addr) {getHeader(msg)->origin = addr;}
command void CtpPacket.setType(message_t* msg, uint8_t id) {getHeader(msg)->type = id;}
-
+ command void CtpPacket.setEtx(message_t* msg, uint16_t e) {getHeader(msg)->etx = e;}
+ command void CtpPacket.setSequenceNumber(message_t* msg, uint8_t _seqno) {getHeader(msg)->originSeqNo = _seqno;}
command bool CtpPacket.option(message_t* msg, ctp_options_t opt) {
return ((getHeader(msg)->options & opt) == opt) ? TRUE : FALSE;
}
-
command void CtpPacket.setOption(message_t* msg, ctp_options_t opt) {
getHeader(msg)->options |= opt;
}
-
command void CtpPacket.clearOption(message_t* msg, ctp_options_t opt) {
getHeader(msg)->options &= ~opt;
}
- command void CtpPacket.setEtx(message_t* msg, uint16_t e) {getHeader(msg)->etx = e;}
- command void CtpPacket.setSequenceNumber(message_t* msg, uint8_t _seqno) {getHeader(msg)->originSeqNo = _seqno;}
// A CTP packet ID is based on the origin and the THL field, to
// implement duplicate suppression as described in TEP 123.
-
command bool CtpPacket.matchInstance(message_t* m1, message_t* m2) {
return (call CtpPacket.getOrigin(m1) == call CtpPacket.getOrigin(m2) &&
call CtpPacket.getSequenceNumber(m1) == call CtpPacket.getSequenceNumber(m2) &&
call CtpPacket.getType(m1) == call CtpPacket.getType(m2));
}
+
+ void clearState(uint8_t state) {
+ forwardingState = forwardingState & ~state;
+ }
+ bool hasState(uint8_t state) {
+ return forwardingState & state;
+ }
+ void setState(uint8_t state) {
+ forwardingState = forwardingState | state;
+ }
+
+ /******** Defaults. **************/
+
default event void
Send.sendDone[uint8_t client](message_t *msg, error_t error) {
}
default command collection_id_t CollectionId.fetch[uint8_t client]() {
return 0;
}
-
- static void startRetxmitTimer(uint16_t mask, uint16_t offset) {
- uint16_t r = call Random.rand16();
- r &= mask;
- r += offset;
- call RetxmitTimer.startOneShot(r);
- dbg("Forwarder", "Rexmit timer will fire in %hu ms\n", r);
- }
-
- static void startCongestionTimer(uint16_t mask, uint16_t offset) {
- uint16_t r = call Random.rand16();
- r &= mask;
- r += offset;
- call CongestionTimer.startOneShot(r);
- dbg("Forwarder", "Congestion timer will fire in %hu ms\n", r);
- }
-
- /* signalled when this neighbor is evicted from the neighbor table */
- event void LinkEstimator.evicted(am_addr_t neighbor) {
- }
-
-
+
/* Default implementations for CollectionDebug calls.
* These allow CollectionDebug not to be wired to anything if debugging
* is not desired. */
-
- default command error_t CollectionDebug.logEvent(uint8_t type) {
- return SUCCESS;
- }
- default command error_t CollectionDebug.logEventSimple(uint8_t type, uint16_t arg) {
- return SUCCESS;
- }
- default command error_t CollectionDebug.logEventDbg(uint8_t type, uint16_t arg1, uint16_t arg2, uint16_t arg3) {
- return SUCCESS;
- }
- default command error_t CollectionDebug.logEventMsg(uint8_t type, uint16_t msg, am_addr_t origin, am_addr_t node) {
- return SUCCESS;
- }
- default command error_t CollectionDebug.logEventRoute(uint8_t type, am_addr_t parent, uint8_t hopcount, uint16_t metric) {
- return SUCCESS;
- }
+
+ default command error_t CollectionDebug.logEvent(uint8_t type) {
+ return SUCCESS;
+ }
+ default command error_t CollectionDebug.logEventSimple(uint8_t type, uint16_t arg) {
+ return SUCCESS;
+ }
+ default command error_t CollectionDebug.logEventDbg(uint8_t type, uint16_t arg1, uint16_t arg2, uint16_t arg3) {
+ return SUCCESS;
+ }
+ default command error_t CollectionDebug.logEventMsg(uint8_t type, uint16_t msg, am_addr_t origin, am_addr_t node) {
+ return SUCCESS;
+ }
+ default command error_t CollectionDebug.logEventRoute(uint8_t type, am_addr_t parent, uint8_t hopcount, uint16_t metric) {
+ return SUCCESS;
+ }
}
-/* Rodrigo. This is an alternative
- event void CtpInfo.ParentCongested(bool congested) {
- if (congested) {
- // We've overheard our parent's ECN bit set.
- startCongestionTimer(CONGESTED_WAIT_WINDOW, CONGESTED_WAIT_OFFSET);
- parentCongested = TRUE;
- call CollectionDebug.logEvent(NET_C_FE_CONGESTION_BEGIN);
- } else {
- // We've overheard our parent's ECN bit cleared.
- call CongestionTimer.stop();
- parentCongested = FALSE;
- call CollectionDebug.logEventSimple(NET_C_FE_CONGESTION_END, 1);
- post sendTask();
- }
- }
-*/
-