X-Git-Url: https://oss.titaniummirror.com/gitweb/?a=blobdiff_plain;f=tos%2Flib%2Fnet%2Fctp%2FCtpForwardingEngineP.nc;h=964e35a5ef39ef92b842769ae24534c86ac91392;hb=e9bfab607e051bae6afb47b44892ce37541d1b44;hp=16c1803cc4173593671f793623acb9e0360ab830;hpb=adf1de6c009d13b7b52e68535c63b28f59c97400;p=tinyos-2.x.git diff --git a/tos/lib/net/ctp/CtpForwardingEngineP.nc b/tos/lib/net/ctp/CtpForwardingEngineP.nc index 16c1803c..964e35a5 100644 --- a/tos/lib/net/ctp/CtpForwardingEngineP.nc +++ b/tos/lib/net/ctp/CtpForwardingEngineP.nc @@ -1,6 +1,6 @@ /* $Id$ */ /* - * Copyright (c) 2006 Stanford University. + * Copyright (c) 2008-9 Stanford University. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,93 +31,71 @@ */ /** - * The ForwardingEngine is responsible for queueing and scheduling outgoing - * packets in a collection protocol. It maintains a pool of forwarding messages - * and a packet send - * queue. A ForwardingEngine with a forwarding message pool of size F - * and C CollectionSenderC clients has a send queue of size - * F + C. This implementation has a large number of configuration - * constants, which can be found in ForwardingEngine.h. + * This component contains the forwarding path of CTP Noe, the + * standard CTP implementation packaged with TinyOS 2.x. The CTP + * specification can be found in TEP 123. The paper entitled + * "Collection Tree Protocol," by Omprakash Gnawali et al., in SenSys + * 2009, describes the implementation and provides detailed + * performance results of CTP Noe.

* - *

Packets in the send queue are sent in FIFO order, with head-of-line - * blocking. Because this is a tree collection protocol, all packets are going - * to the same destination, and so the ForwardingEngine does not distinguish - * packets from one another: packets from CollectionSenderC clients are - * treated identically to forwarded packets.

+ *

The CTP ForwardingEngine is responsible for queueing and + * scheduling outgoing packets. It maintains a pool of forwarding + * messages and a packet send queue. A ForwardingEngine with a + * forwarding message pool of size F and C + * CollectionSenderC clients has a send queue of size F + + * C. This implementation several configuration constants, which + * can be found in ForwardingEngine.h.

+ * + *

Packets in the send queue are sent in FIFO order, with + * head-of-line blocking. Because this is a tree collection protocol, + * all packets are going to the same destination, and so the + * ForwardingEngine does not distinguish packets from one + * another. Packets from CollectionSenderC clients are sent + * identically to forwarded packets: only their buffer handling is + * different.

* *

If ForwardingEngine is on top of a link layer that supports * synchronous acknowledgments, it enables them and retransmits packets * when they are not acked. It transmits a packet up to MAX_RETRIES times - * before giving up and dropping the packet.

+ * before giving up and dropping the packet. MAX_RETRIES is typically a + * large number (e.g., >20), as this implementation assumes there is + * link layer feedback on failed packets, such that link costs will go + * up and cause the routing layer to pick a next hop. If the underlying + * link layer does not support acknowledgments, ForwardingEngine sends + * a packet only once.

* *

The ForwardingEngine detects routing loops and tries to correct - * them. It assumes that the collection tree is based on a gradient, - * such as hop count or estimated transmissions. When the ForwardingEngine - * sends a packet to the next hop, it puts the local gradient value in - * the packet header. If a node receives a packet to forward whose - * gradient value is less than its own, then the gradient is not monotonically - * decreasing and there may be a routing loop. When the ForwardingEngine - * receives such a packet, it tells the RoutingEngine to advertise its - * gradient value soon, with the hope that the advertisement will update - * the node who just sent a packet and break the loop. + * them. Routing is in terms of a cost gradient, where the collection + * root has a cost of zero and a node's cost is the cost of its next + * hop plus the cost of the link to that next hop. If there are no + * loops, then this gradient value decreases monotonically along a + * route. When the ForwardingEngine sends a packet to the next hop, + * it puts the local gradient value in the packet header. If a node + * receives a packet to forward whose gradient value is less than its + * own, then the gradient is not monotonically decreasing and there + * may be a routing loop. When the ForwardingEngine receives such a + * packet, it tells the RoutingEngine to advertise its gradient value + * soon, with the hope that the advertisement will update the node + * who just sent a packet and break the loop. It also pauses the + * before the next packet transmission, in hopes of giving the + * routing layer's packet a priority.

* - *

ForwardingEngine times its packet transmissions. It differentiates - * between four transmission cases: forwarding, success, ack failure, - * and loop detection. In each case, the - * ForwardingEngine waits a randomized period of time before sending the next - * packet. This approach assumes that the network is operating at low - * utilization; its goal is to prevent correlated traffic -- such as - * nodes along a route forwarding packets -- from interfering with itself. + *

ForwardingEngine times its packet transmissions. It + * differentiates between four transmission cases: forwarding, + * success, ack failure, and loop detection. In each case, the + * ForwardingEngine waits a randomized period of time before sending + * the next packet. This approach assumes that the network is + * operating at low utilization; its goal is to prevent correlated + * traffic -- such as nodes along a route forwarding packets -- from + * interfering with itself.

* - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - *
CaseCC2420 Wait (ms)Other Wait (ms)Description
ForwardingImmediateImmediateWhen the ForwardingEngine receives a packet to forward and it is not - * already sending a packet (queue is empty). In this case, it immediately - * forwards the packet.
Success16-31128-255When the ForwardingEngine successfully sends a packet to the next - * hop, it waits this long before sending the next packet in the queue. - *
Ack Failure8-15128-255If the link layer supports acks and the ForwardingEngine did not - * receive an acknowledgment from the next hop, it waits this long before - * trying a retransmission. If the packet has exceeded the retransmission - * count, ForwardingEngine drops the packet and uses the Success timer instead.
Loop Detection32-63512-1023If the ForwardingEngine is asked to forward a packet from a node that - * believes it is closer to the root, the ForwardingEngine pauses its - * transmissions for this interval and triggers the RoutingEngine to - * send an update. The goal is to let the gradient become consistent before - * sending packets, in order to prevent routing loops from consuming - * bandwidth and energy.
+ *

While this implementation can work on top of a variety of link + * estimators, it is designed to work with a 4-bit link estimator + * (4B). Details on 4B can be found in the HotNets paper "Four Bit + * Link Estimation" by Rodrigo Fonseca et al. The forwarder provides + * the "ack" bit for each sent packet, telling the estimator whether + * the packet was acknowledged.

* - *

The times above are all for CC2420-based platforms. The timings for - * other platforms depend on their bit rates, as they are based on packet - * transmission times.

- * @author Philip Levis * @author Kyle Jamieson * @date $Date$ @@ -140,32 +118,48 @@ generic module CtpForwardingEngineP() { interface CtpCongestion; } uses { + // These five interfaces are used in the forwarding path + // SubSend is for sending packets + // PacketAcknowledgements is for enabling layer 2 acknowledgments + // RetxmitTimer is for timing packet sends for improved performance + // LinkEstimator is for providing the ack bit to a link estimator interface AMSend as SubSend; - interface Receive as SubReceive; - interface Receive as SubSnoop; - interface Packet as SubPacket; + interface PacketAcknowledgements; + interface Timer as RetxmitTimer; + interface LinkEstimator; interface UnicastNameFreeRouting; - interface SplitControl as RadioControl; + interface Packet as SubPacket; + + // These four data structures are used to manage packets to forward. + // SendQueue and QEntryPool are the forwarding queue. + // MessagePool is the buffer pool for messages to forward. + // SentCache is for suppressing duplicate packet transmissions. interface Queue as SendQueue; interface Pool as QEntryPool; interface Pool as MessagePool; - interface Timer as RetxmitTimer; - - interface LinkEstimator; - - // Counts down from the last time we heard from our parent; used - // to expire local state about parent congestion. - interface Timer as CongestionTimer; - interface Cache as SentCache; + + interface Receive as SubReceive; + interface Receive as SubSnoop; interface CtpInfo; - interface PacketAcknowledgements; - interface Random; interface RootControl; interface CollectionId[uint8_t client]; interface AMPacket; - interface CollectionDebug; interface Leds; + interface Random; + + // This implementation has extensive debugging instrumentation. + // Wiring up the CollectionDebug interface provides information + // on important events, such as transmissions, receptions, + // and cache checks. The TinyOS release includes scripts for + // parsing these messages. + interface CollectionDebug; + + + // The ForwardingEngine monitors whether the underlying + // radio is on or not in order to start/stop forwarding + // as appropriate. + interface SplitControl as RadioControl; } } implementation { @@ -173,34 +167,22 @@ implementation { * masked by the given mask and added to the given offset. */ static void startRetxmitTimer(uint16_t mask, uint16_t offset); - static void startCongestionTimer(uint16_t mask, uint16_t offset); - - /* Indicates whether our client is congested */ - bool clientCongested = FALSE; - - /* Tracks our parent's congestion state. */ - bool parentCongested = FALSE; - - /* Threshold for congestion */ - uint8_t congestionThreshold; - - /* Keeps track of whether the routing layer is running; if not, - * it will not send packets. */ - bool running = FALSE; - - /* Keeps track of whether the radio is on; no sense sending packets - * if the radio is off. */ - bool radioOn = FALSE; + void clearState(uint8_t state); + bool hasState(uint8_t state); + void setState(uint8_t state); - /* Keeps track of whether an ack is pending on an outgoing packet, - * so that the engine can work unreliably when the data-link layer - * does not support acks. */ - bool ackPending = FALSE; - - /* Keeps track of whether the packet on the head of the queue - * is being used, and control access to the data-link layer.*/ - bool sending = FALSE; + // CTP state variables. + enum { + QUEUE_CONGESTED = 0x1, // Need to set C bit? + ROUTING_ON = 0x2, // Forwarding running? + RADIO_ON = 0x4, // Radio is on? + ACK_PENDING = 0x8, // Have an ACK pending? + SENDING = 0x10 // Am sending a packet? + }; + // Start with all states false + uint8_t forwardingState = 0; + /* Keep track of the last parent address we sent to, so that unacked packets to an old parent are not incorrectly attributed to a new parent. */ @@ -237,7 +219,6 @@ implementation { clientPtrs[i] = clientEntries + i; dbg("Forwarder", "clientPtrs[%hhu] = %p\n", i, clientPtrs[i]); } - congestionThreshold = (call SendQueue.maxSize()) >> 1; loopbackMsgPtr = &loopbackMsg; lastParent = call AMPacket.address(); seqno = 0; @@ -245,12 +226,12 @@ implementation { } command error_t StdControl.start() { - running = TRUE; + setState(ROUTING_ON); return SUCCESS; } command error_t StdControl.stop() { - running = FALSE; + clearState(ROUTING_ON); return SUCCESS; } @@ -263,19 +244,29 @@ implementation { when it turns on, it then starts sending packets. */ event void RadioControl.startDone(error_t err) { if (err == SUCCESS) { - radioOn = TRUE; + setState(RADIO_ON); if (!call SendQueue.empty()) { + dbg("FHangBug", "%s posted sendTask.\n", __FUNCTION__); post sendTask(); } } } - + + static void startRetxmitTimer(uint16_t window, uint16_t offset) { + uint16_t r = call Random.rand16(); + r %= window; + r += offset; + call RetxmitTimer.startOneShot(r); + dbg("Forwarder", "Rexmit timer will fire in %hu ms\n", r); + } + /* * If the ForwardingEngine has stopped sending packets because * these has been no route, then as soon as one is found, start * sending packets. */ event void UnicastNameFreeRouting.routeFound() { + dbg("FHangBug", "%s posted sendTask.\n", __FUNCTION__); post sendTask(); } @@ -287,7 +278,7 @@ implementation { event void RadioControl.stopDone(error_t err) { if (err == SUCCESS) { - radioOn = FALSE; + clearState(RADIO_ON); } } @@ -302,13 +293,15 @@ implementation { * already sending packets (the RetxmitTimer isn't running), post * sendTask. It could be that the engine is running and sendTask * has already been posted, but the post-once semantics make this - * not matter. + * not matter. What's important is that you don't post sendTask + * if the retransmit timer is running; this would circumvent the + * timer and send a packet before it fires. */ command error_t Send.send[uint8_t client](message_t* msg, uint8_t len) { ctp_data_header_t* hdr; fe_queue_entry_t *qe; dbg("Forwarder", "%s: sending packet from client %hhu: %x, len %hhu\n", __FUNCTION__, client, msg, len); - if (!running) {return EOFF;} + if (!hasState(ROUTING_ON)) {return EOFF;} if (len > call Send.maxPayloadLength[client]()) {return ESIZE;} call Packet.setPayloadLength(msg, len); @@ -329,7 +322,8 @@ implementation { qe->retries = MAX_RETRIES; dbg("Forwarder", "%s: queue entry for %hhu is %hhu deep\n", __FUNCTION__, client, call SendQueue.size()); if (call SendQueue.enqueue(qe) == SUCCESS) { - if (radioOn && !call RetxmitTimer.isRunning()) { + if (hasState(RADIO_ON) && !hasState(SENDING)) { + dbg("FHangBug", "%s posted sendTask.\n", __FUNCTION__); post sendTask(); } clientPtrs[client] = NULL; @@ -366,7 +360,8 @@ implementation { * These is where all of the send logic is. When the ForwardingEngine * wants to send a packet, it posts this task. The send logic is * independent of whether it is a forwarded packet or a packet from - * a send client. + * a send clientL the two cases differ in how memory is managed in + * sendDone. * * The task first checks that there is a packet to send and that * there is a valid route. It then marshals the relevant arguments @@ -380,156 +375,105 @@ implementation { */ task void sendTask() { + uint16_t gradient; dbg("Forwarder", "%s: Trying to send a packet. Queue size is %hhu.\n", __FUNCTION__, call SendQueue.size()); - if (sending) { - dbg("Forwarder", "%s: busy, don't send\n", __FUNCTION__); - call CollectionDebug.logEvent(NET_C_FE_SEND_BUSY); - return; - } - else if (call SendQueue.empty()) { - dbg("Forwarder", "%s: queue empty, don't send\n", __FUNCTION__); + if (hasState(SENDING) || call SendQueue.empty()) { call CollectionDebug.logEvent(NET_C_FE_SENDQUEUE_EMPTY); return; } - else if (!call RootControl.isRoot() && - !call UnicastNameFreeRouting.hasRoute()) { - dbg("Forwarder", "%s: no route, don't send, start retry timer\n", __FUNCTION__); - call RetxmitTimer.startOneShot(10000); - - // send a debug message to the uart + else if ((!call RootControl.isRoot() && + !call UnicastNameFreeRouting.hasRoute()) || + (call CtpInfo.getEtx(&gradient) != SUCCESS)) { + /* This code path is for when we don't have a valid next + * hop. We set a retry timer. + * + * Technically, this timer isn't necessary, as if a route + * is found we'll get an event. But just in case such an event + * is lost (e.g., a bug in the routing engine), we retry. + * Otherwise the forwarder might hang indefinitely. As this test + * doesn't require radio activity, the energy cost is minimal. */ + dbg("Forwarder", "%s: no route, don't send, try again in %i.\n", __FUNCTION__, NO_ROUTE_RETRY); + call RetxmitTimer.startOneShot(NO_ROUTE_RETRY); call CollectionDebug.logEvent(NET_C_FE_NO_ROUTE); - return; } - /* - else if (parentCongested) { - // Do nothing; the congestion timer is necessarily set which - // will clear parentCongested and repost sendTask(). - dbg("Forwarder", "%s: sendTask deferring for congested parent\n", - __FUNCTION__); - call CollectionDebug.logEvent(NET_C_FE_CONGESTION_SENDWAIT); - } - */ else { + /* We can send a packet. + First check if it's a duplicate; + if not, try to send/forward. */ error_t subsendResult; fe_queue_entry_t* qe = call SendQueue.head(); uint8_t payloadLen = call SubPacket.payloadLength(qe->msg); am_addr_t dest = call UnicastNameFreeRouting.nextHop(); - uint16_t gradient; - if (call CtpInfo.isNeighborCongested(dest)) { - // Our parent is congested. We should wait. - // Don't repost the task, CongestionTimer will do the job - if (! parentCongested ) { - parentCongested = TRUE; - call CollectionDebug.logEvent(NET_C_FE_CONGESTION_BEGIN); - } - if (! call CongestionTimer.isRunning()) { - startCongestionTimer(CONGESTED_WAIT_WINDOW, CONGESTED_WAIT_OFFSET); - } - dbg("Forwarder", "%s: sendTask deferring for congested parent\n", - __FUNCTION__); - //call CollectionDebug.logEvent(NET_C_FE_CONGESTION_SENDWAIT); - return; - } - if (parentCongested) { - parentCongested = FALSE; - call CollectionDebug.logEvent(NET_C_FE_CONGESTION_END); - } - // Once we are here, we have decided to send the packet. if (call SentCache.lookup(qe->msg)) { + /* This packet is a duplicate, so suppress it: free memory and + * send next packet. Duplicates are only possible for + * forwarded packets, so we can circumvent the client or + * forwarded branch for freeing the buffer. */ call CollectionDebug.logEvent(NET_C_FE_DUPLICATE_CACHE_AT_SEND); call SendQueue.dequeue(); - if (call MessagePool.put(qe->msg) != SUCCESS) - call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); - if (call QEntryPool.put(qe) != SUCCESS) - call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); + if (call MessagePool.put(qe->msg) != SUCCESS) + call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); + if (call QEntryPool.put(qe) != SUCCESS) + call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); + post sendTask(); return; } - /* If our current parent is not the same as the last parent - we sent do, then reset the count of unacked packets: don't - penalize a new parent for the failures of a prior one.*/ - if (dest != lastParent) { - qe->retries = MAX_RETRIES; - lastParent = dest; - } - + + // Not a duplicate: we've decided we're going to send. dbg("Forwarder", "Sending queue entry %p\n", qe); + if (call RootControl.isRoot()) { + /* Code path for roots: copy the packet and signal receive. */ collection_id_t collectid = getHeader(qe->msg)->type; + uint8_t* payload; + uint8_t payloadLength; + memcpy(loopbackMsgPtr, qe->msg, sizeof(message_t)); - ackPending = FALSE; - + + payload = call Packet.getPayload(loopbackMsgPtr, call Packet.payloadLength(loopbackMsgPtr)); + payloadLength = call Packet.payloadLength(loopbackMsgPtr); dbg("Forwarder", "%s: I'm a root, so loopback and signal receive.\n", __FUNCTION__); loopbackMsgPtr = signal Receive.receive[collectid](loopbackMsgPtr, - call Packet.getPayload(loopbackMsgPtr, call Packet.payloadLength(loopbackMsgPtr)), - call Packet.payloadLength(loopbackMsgPtr)); + payload, + payloadLength); signal SubSend.sendDone(qe->msg, SUCCESS); - return; } - - // Loop-detection functionality: - if (call CtpInfo.getEtx(&gradient) != SUCCESS) { - // If we have no metric, set our gradient conservatively so - // that other nodes don't automatically drop our packets. - gradient = 0; - } - call CtpPacket.setEtx(qe->msg, gradient); - - ackPending = (call PacketAcknowledgements.requestAck(qe->msg) == SUCCESS); - - // Set or clear the congestion bit on *outgoing* packets. - if (call CtpCongestion.isCongested()) - call CtpPacket.setOption(qe->msg, CTP_OPT_ECN); - else - call CtpPacket.clearOption(qe->msg, CTP_OPT_ECN); - - subsendResult = call SubSend.send(dest, qe->msg, payloadLen); - if (subsendResult == SUCCESS) { - // Successfully submitted to the data-link layer. - sending = TRUE; - dbg("Forwarder", "%s: subsend succeeded with %p.\n", __FUNCTION__, qe->msg); - if (qe->client < CLIENT_COUNT) { - dbg("Forwarder", "%s: client packet.\n", __FUNCTION__); - } - else { - dbg("Forwarder", "%s: forwarded packet.\n", __FUNCTION__); - } - return; - } - else if (subsendResult == EOFF) { - // The radio has been turned off underneath us. Assume that - // this is for the best. When the radio is turned back on, we'll - // handle a startDone event and resume sending. - radioOn = FALSE; - dbg("Forwarder", "%s: subsend failed from EOFF.\n", __FUNCTION__); - // send a debug message to the uart - call CollectionDebug.logEvent(NET_C_FE_SUBSEND_OFF); - } - else if (subsendResult == EBUSY) { - // This shouldn't happen, as we sit on top of a client and - // control our own output; it means we're trying to - // double-send (bug). This means we expect a sendDone, so just - // wait for that: when the sendDone comes in, // we'll try - // sending this packet again. - dbg("Forwarder", "%s: subsend failed from EBUSY.\n", __FUNCTION__); - // send a debug message to the uart - call CollectionDebug.logEvent(NET_C_FE_SUBSEND_BUSY); - } - else if (subsendResult == ESIZE) { - dbg("Forwarder", "%s: subsend failed from ESIZE: truncate packet.\n", __FUNCTION__); - call Packet.setPayloadLength(qe->msg, call Packet.maxPayloadLength()); - post sendTask(); - call CollectionDebug.logEvent(NET_C_FE_SUBSEND_SIZE); + else { + /* The basic forwarding/sending case. */ + call CtpPacket.setEtx(qe->msg, gradient); + call CtpPacket.clearOption(qe->msg, CTP_OPT_ECN | CTP_OPT_PULL); + if (call PacketAcknowledgements.requestAck(qe->msg) == SUCCESS) { + setState(ACK_PENDING); + } + if (hasState(QUEUE_CONGESTED)) { + call CtpPacket.setOption(qe->msg, CTP_OPT_ECN); + clearState(QUEUE_CONGESTED); + } + + subsendResult = call SubSend.send(dest, qe->msg, payloadLen); + if (subsendResult == SUCCESS) { + // Successfully submitted to the data-link layer. + setState(SENDING); + dbg("Forwarder", "%s: subsend succeeded with %p.\n", __FUNCTION__, qe->msg); + return; + } + // The packet is too big: truncate it and retry. + else if (subsendResult == ESIZE) { + dbg("Forwarder", "%s: subsend failed from ESIZE: truncate packet.\n", __FUNCTION__); + call Packet.setPayloadLength(qe->msg, call Packet.maxPayloadLength()); + post sendTask(); + call CollectionDebug.logEvent(NET_C_FE_SUBSEND_SIZE); + } + else { + dbg("Forwarder", "%s: subsend failed from %i\n", __FUNCTION__, (int)subsendResult); + } } } } - void sendDoneBug() { - // send a debug message to the uart - call CollectionDebug.logEvent(NET_C_FE_BAD_SENDDONE); - } /* * The second phase of a send operation; based on whether the transmission was @@ -542,97 +486,92 @@ implementation { * */ + void packetComplete(fe_queue_entry_t* qe, message_t* msg, bool success) { + // Four cases: + // Local packet: success or failure + // Forwarded packet: success or failure + if (qe->client < CLIENT_COUNT) { + clientPtrs[qe->client] = qe; + signal Send.sendDone[qe->client](msg, SUCCESS); + if (success) { + dbg("CtpForwarder", "%s: packet %hu.%hhu for client %hhu acknowledged.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg), qe->client); + call CollectionDebug.logEventMsg(NET_C_FE_SENT_MSG, + call CollectionPacket.getSequenceNumber(msg), + call CollectionPacket.getOrigin(msg), + call AMPacket.destination(msg)); + } else { + dbg("CtpForwarder", "%s: packet %hu.%hhu for client %hhu dropped.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg), qe->client); + call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL_ACK_SEND, + call CollectionPacket.getSequenceNumber(msg), + call CollectionPacket.getOrigin(msg), + call AMPacket.destination(msg)); + } + } + else { + if (success) { + call SentCache.insert(qe->msg); + dbg("CtpForwarder", "%s: forwarded packet %hu.%hhu acknowledged: insert in transmit queue.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg)); + call CollectionDebug.logEventMsg(NET_C_FE_FWD_MSG, + call CollectionPacket.getSequenceNumber(msg), + call CollectionPacket.getOrigin(msg), + call AMPacket.destination(msg)); + } + else { + dbg("CtpForwarder", "%s: forwarded packet %hu.%hhu dropped.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg)); + call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL_ACK_FWD, + call CollectionPacket.getSequenceNumber(msg), + call CollectionPacket.getOrigin(msg), + call AMPacket.destination(msg)); + } + if (call MessagePool.put(qe->msg) != SUCCESS) + call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); + if (call QEntryPool.put(qe) != SUCCESS) + call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); + } + } + event void SubSend.sendDone(message_t* msg, error_t error) { fe_queue_entry_t *qe = call SendQueue.head(); dbg("Forwarder", "%s to %hu and %hhu\n", __FUNCTION__, call AMPacket.destination(msg), error); - if (qe == NULL || qe->msg != msg) { - dbg("Forwarder", "%s: BUG: not our packet (%p != %p)!\n", __FUNCTION__, msg, qe->msg); - sendDoneBug(); // Not our packet, something is very wrong... - return; - } - else if (error != SUCCESS) { - // Immediate retransmission is the worst thing to do. + + if (error != SUCCESS) { + /* The radio wasn't able to send the packet: retransmit it. */ dbg("Forwarder", "%s: send failed\n", __FUNCTION__); call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL, - call CollectionPacket.getSequenceNumber(msg), - call CollectionPacket.getOrigin(msg), - call AMPacket.destination(msg)); + call CollectionPacket.getSequenceNumber(msg), + call CollectionPacket.getOrigin(msg), + call AMPacket.destination(msg)); startRetxmitTimer(SENDDONE_FAIL_WINDOW, SENDDONE_FAIL_OFFSET); } - else if (ackPending && !call PacketAcknowledgements.wasAcked(msg)) { - // AckPending is for case when DL cannot support acks. + else if (hasState(ACK_PENDING) && !call PacketAcknowledgements.wasAcked(msg)) { + /* No ack: if countdown is not 0, retransmit, else drop the packet. */ call LinkEstimator.txNoAck(call AMPacket.destination(msg)); call CtpInfo.recomputeRoutes(); if (--qe->retries) { - dbg("Forwarder", "%s: not acked\n", __FUNCTION__); + dbg("Forwarder", "%s: not acked, retransmit\n", __FUNCTION__); call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_WAITACK, call CollectionPacket.getSequenceNumber(msg), call CollectionPacket.getOrigin(msg), call AMPacket.destination(msg)); startRetxmitTimer(SENDDONE_NOACK_WINDOW, SENDDONE_NOACK_OFFSET); } else { - //max retries, dropping packet - if (qe->client < CLIENT_COUNT) { - clientPtrs[qe->client] = qe; - signal Send.sendDone[qe->client](msg, FAIL); - call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL_ACK_SEND, - call CollectionPacket.getSequenceNumber(msg), - call CollectionPacket.getOrigin(msg), - call AMPacket.destination(msg)); - } else { - if (call MessagePool.put(qe->msg) != SUCCESS) - call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); - if (call QEntryPool.put(qe) != SUCCESS) - call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); - call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL_ACK_FWD, - call CollectionPacket.getSequenceNumber(msg), - call CollectionPacket.getOrigin(msg), - call AMPacket.destination(msg)); - } - call SendQueue.dequeue(); - sending = FALSE; + /* Hit max retransmit threshold: drop the packet. */ + call SendQueue.dequeue(); + clearState(SENDING); startRetxmitTimer(SENDDONE_OK_WINDOW, SENDDONE_OK_OFFSET); + + packetComplete(qe, msg, FALSE); } } - else if (qe->client < CLIENT_COUNT) { - ctp_data_header_t* hdr; - uint8_t client = qe->client; - dbg("Forwarder", "%s: our packet for client %hhu, remove %p from queue\n", - __FUNCTION__, client, qe); - call CollectionDebug.logEventMsg(NET_C_FE_SENT_MSG, - call CollectionPacket.getSequenceNumber(msg), - call CollectionPacket.getOrigin(msg), - call AMPacket.destination(msg)); - call LinkEstimator.txAck(call AMPacket.destination(msg)); - clientPtrs[client] = qe; - hdr = getHeader(qe->msg); + else { + /* Packet was acknowledged. Updated the link estimator, + free the buffer (pool or sendDone), start timer to + send next packet. */ call SendQueue.dequeue(); - signal Send.sendDone[client](msg, SUCCESS); - sending = FALSE; + clearState(SENDING); startRetxmitTimer(SENDDONE_OK_WINDOW, SENDDONE_OK_OFFSET); - } - else if (call MessagePool.size() < call MessagePool.maxSize()) { - // A successfully forwarded packet. - dbg("Forwarder,Route", "%s: successfully forwarded packet (client: %hhu), message pool is %hhu/%hhu.\n", __FUNCTION__, qe->client, call MessagePool.size(), call MessagePool.maxSize()); - call CollectionDebug.logEventMsg(NET_C_FE_FWD_MSG, - call CollectionPacket.getSequenceNumber(msg), - call CollectionPacket.getOrigin(msg), - call AMPacket.destination(msg)); call LinkEstimator.txAck(call AMPacket.destination(msg)); - call SentCache.insert(qe->msg); - call SendQueue.dequeue(); - if (call MessagePool.put(qe->msg) != SUCCESS) - call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); - if (call QEntryPool.put(qe) != SUCCESS) - call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); - sending = FALSE; - startRetxmitTimer(SENDDONE_OK_WINDOW, SENDDONE_OK_OFFSET); - } - else { - dbg("Forwarder", "%s: BUG: we have a pool entry, but the pool is full, client is %hhu.\n", __FUNCTION__, qe->client); - sendDoneBug(); // It's a forwarded packet, but there's no room the pool; - // someone has double-stored a pointer somewhere and we have nowhere - // to put this, so we have to leak it... + packetComplete(qe, msg, TRUE); } } @@ -701,6 +640,7 @@ implementation { if (!call RetxmitTimer.isRunning()) { // sendTask is only immediately posted if we don't detect a // loop. + dbg("FHangBug", "%s: posted sendTask.\n", __FUNCTION__); post sendTask(); } @@ -812,39 +752,28 @@ implementation { } event void RetxmitTimer.fired() { - sending = FALSE; - post sendTask(); - } - - event void CongestionTimer.fired() { - //parentCongested = FALSE; - //call CollectionDebug.logEventSimple(NET_C_FE_CONGESTION_END, 0); + clearState(SENDING); + dbg("FHangBug", "%s posted sendTask.\n", __FUNCTION__); post sendTask(); } - command bool CtpCongestion.isCongested() { - // A simple predicate for now to determine congestion state of - // this node. - bool congested = (call SendQueue.size() > congestionThreshold) ? - TRUE : FALSE; - return ((congested || clientCongested)?TRUE:FALSE); + return FALSE; } command void CtpCongestion.setClientCongested(bool congested) { - bool wasCongested = call CtpCongestion.isCongested(); - clientCongested = congested; - if (!wasCongested && congested) { - call CtpInfo.triggerImmediateRouteUpdate(); - } else if (wasCongested && ! (call CtpCongestion.isCongested())) { - call CtpInfo.triggerRouteUpdate(); - } + // Do not respond to congestion. } + + /* signalled when this neighbor is evicted from the neighbor table */ + event void LinkEstimator.evicted(am_addr_t neighbor) {} + + // Packet ADT commands command void Packet.clear(message_t* msg) { call SubPacket.clear(msg); } - + command uint8_t Packet.payloadLength(message_t* msg) { return call SubPacket.payloadLength(msg) - sizeof(ctp_data_header_t); } @@ -865,44 +794,38 @@ implementation { return payload; } + // CollectionPacket ADT commands command am_addr_t CollectionPacket.getOrigin(message_t* msg) {return getHeader(msg)->origin;} - command collection_id_t CollectionPacket.getType(message_t* msg) {return getHeader(msg)->type;} command uint8_t CollectionPacket.getSequenceNumber(message_t* msg) {return getHeader(msg)->originSeqNo;} command void CollectionPacket.setOrigin(message_t* msg, am_addr_t addr) {getHeader(msg)->origin = addr;} command void CollectionPacket.setType(message_t* msg, collection_id_t id) {getHeader(msg)->type = id;} command void CollectionPacket.setSequenceNumber(message_t* msg, uint8_t _seqno) {getHeader(msg)->originSeqNo = _seqno;} - - //command ctp_options_t CtpPacket.getOptions(message_t* msg) {return getHeader(msg)->options;} + // CtpPacket ADT commands command uint8_t CtpPacket.getType(message_t* msg) {return getHeader(msg)->type;} command am_addr_t CtpPacket.getOrigin(message_t* msg) {return getHeader(msg)->origin;} command uint16_t CtpPacket.getEtx(message_t* msg) {return getHeader(msg)->etx;} command uint8_t CtpPacket.getSequenceNumber(message_t* msg) {return getHeader(msg)->originSeqNo;} command uint8_t CtpPacket.getThl(message_t* msg) {return getHeader(msg)->thl;} - command void CtpPacket.setThl(message_t* msg, uint8_t thl) {getHeader(msg)->thl = thl;} command void CtpPacket.setOrigin(message_t* msg, am_addr_t addr) {getHeader(msg)->origin = addr;} command void CtpPacket.setType(message_t* msg, uint8_t id) {getHeader(msg)->type = id;} - + command void CtpPacket.setEtx(message_t* msg, uint16_t e) {getHeader(msg)->etx = e;} + command void CtpPacket.setSequenceNumber(message_t* msg, uint8_t _seqno) {getHeader(msg)->originSeqNo = _seqno;} command bool CtpPacket.option(message_t* msg, ctp_options_t opt) { return ((getHeader(msg)->options & opt) == opt) ? TRUE : FALSE; } - command void CtpPacket.setOption(message_t* msg, ctp_options_t opt) { getHeader(msg)->options |= opt; } - command void CtpPacket.clearOption(message_t* msg, ctp_options_t opt) { getHeader(msg)->options &= ~opt; } - command void CtpPacket.setEtx(message_t* msg, uint16_t e) {getHeader(msg)->etx = e;} - command void CtpPacket.setSequenceNumber(message_t* msg, uint8_t _seqno) {getHeader(msg)->originSeqNo = _seqno;} // A CTP packet ID is based on the origin and the THL field, to // implement duplicate suppression as described in TEP 123. - command bool CtpPacket.matchInstance(message_t* m1, message_t* m2) { return (call CtpPacket.getOrigin(m1) == call CtpPacket.getOrigin(m2) && call CtpPacket.getSequenceNumber(m1) == call CtpPacket.getSequenceNumber(m2) && @@ -916,6 +839,19 @@ implementation { call CtpPacket.getType(m1) == call CtpPacket.getType(m2)); } + + void clearState(uint8_t state) { + forwardingState = forwardingState & ~state; + } + bool hasState(uint8_t state) { + return forwardingState & state; + } + void setState(uint8_t state) { + forwardingState = forwardingState | state; + } + + /******** Defaults. **************/ + default event void Send.sendDone[uint8_t client](message_t *msg, error_t error) { } @@ -941,64 +877,26 @@ implementation { default command collection_id_t CollectionId.fetch[uint8_t client]() { return 0; } - - static void startRetxmitTimer(uint16_t mask, uint16_t offset) { - uint16_t r = call Random.rand16(); - r &= mask; - r += offset; - call RetxmitTimer.startOneShot(r); - dbg("Forwarder", "Rexmit timer will fire in %hu ms\n", r); - } - - static void startCongestionTimer(uint16_t mask, uint16_t offset) { - uint16_t r = call Random.rand16(); - r &= mask; - r += offset; - call CongestionTimer.startOneShot(r); - dbg("Forwarder", "Congestion timer will fire in %hu ms\n", r); - } - - /* signalled when this neighbor is evicted from the neighbor table */ - event void LinkEstimator.evicted(am_addr_t neighbor) { - } - - + /* Default implementations for CollectionDebug calls. * These allow CollectionDebug not to be wired to anything if debugging * is not desired. */ - - default command error_t CollectionDebug.logEvent(uint8_t type) { - return SUCCESS; - } - default command error_t CollectionDebug.logEventSimple(uint8_t type, uint16_t arg) { - return SUCCESS; - } - default command error_t CollectionDebug.logEventDbg(uint8_t type, uint16_t arg1, uint16_t arg2, uint16_t arg3) { - return SUCCESS; - } - default command error_t CollectionDebug.logEventMsg(uint8_t type, uint16_t msg, am_addr_t origin, am_addr_t node) { - return SUCCESS; - } - default command error_t CollectionDebug.logEventRoute(uint8_t type, am_addr_t parent, uint8_t hopcount, uint16_t metric) { - return SUCCESS; - } + + default command error_t CollectionDebug.logEvent(uint8_t type) { + return SUCCESS; + } + default command error_t CollectionDebug.logEventSimple(uint8_t type, uint16_t arg) { + return SUCCESS; + } + default command error_t CollectionDebug.logEventDbg(uint8_t type, uint16_t arg1, uint16_t arg2, uint16_t arg3) { + return SUCCESS; + } + default command error_t CollectionDebug.logEventMsg(uint8_t type, uint16_t msg, am_addr_t origin, am_addr_t node) { + return SUCCESS; + } + default command error_t CollectionDebug.logEventRoute(uint8_t type, am_addr_t parent, uint8_t hopcount, uint16_t metric) { + return SUCCESS; + } } -/* Rodrigo. This is an alternative - event void CtpInfo.ParentCongested(bool congested) { - if (congested) { - // We've overheard our parent's ECN bit set. - startCongestionTimer(CONGESTED_WAIT_WINDOW, CONGESTED_WAIT_OFFSET); - parentCongested = TRUE; - call CollectionDebug.logEvent(NET_C_FE_CONGESTION_BEGIN); - } else { - // We've overheard our parent's ECN bit cleared. - call CongestionTimer.stop(); - parentCongested = FALSE; - call CollectionDebug.logEventSimple(NET_C_FE_CONGESTION_END, 1); - post sendTask(); - } - } -*/ -