]> oss.titaniummirror.com Git - tinyos-2.x.git/blobdiff - tos/lib/net/ctp/CtpForwardingEngineP.nc
Updated files.
[tinyos-2.x.git] / tos / lib / net / ctp / CtpForwardingEngineP.nc
diff --git a/tos/lib/net/ctp/CtpForwardingEngineP.nc b/tos/lib/net/ctp/CtpForwardingEngineP.nc
new file mode 100644 (file)
index 0000000..f62ac97
--- /dev/null
@@ -0,0 +1,1014 @@
+/* $Id$ */
+/*
+ * Copyright (c) 2006 Stanford University.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * - Redistributions of source code must retain the above copyright
+ *   notice, this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright
+ *   notice, this list of conditions and the following disclaimer in the
+ *   documentation and/or other materials provided with the
+ *   distribution.
+ * - Neither the name of the Stanford University nor the names of
+ *   its contributors may be used to endorse or promote products derived
+ *   from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL STANFORD
+ * UNIVERSITY OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+ * OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ *  The ForwardingEngine is responsible for queueing and scheduling outgoing
+ *  packets in a collection protocol. It maintains a pool of forwarding messages 
+ *  and a packet send 
+ *  queue. A ForwardingEngine with a forwarding message pool of size <i>F</i> 
+ *  and <i>C</i> CollectionSenderC clients has a send queue of size
+ *  <i>F + C</i>. This implementation has a large number of configuration
+ *  constants, which can be found in <code>ForwardingEngine.h</code>.
+ *
+ *  <p>Packets in the send queue are sent in FIFO order, with head-of-line
+ *  blocking. Because this is a tree collection protocol, all packets are going
+ *  to the same destination, and so the ForwardingEngine does not distinguish
+ *  packets from one another: packets from CollectionSenderC clients are
+ *  treated identically to forwarded packets.</p>
+ *
+ *  <p>If ForwardingEngine is on top of a link layer that supports
+ *  synchronous acknowledgments, it enables them and retransmits packets
+ *  when they are not acked. It transmits a packet up to MAX_RETRIES times
+ *  before giving up and dropping the packet.</p> 
+ *
+ *  <p>The ForwardingEngine detects routing loops and tries to correct
+ *  them. It assumes that the collection tree is based on a gradient,
+ *  such as hop count or estimated transmissions. When the ForwardingEngine
+ *  sends a packet to the next hop, it puts the local gradient value in
+ *  the packet header. If a node receives a packet to forward whose
+ *  gradient value is less than its own, then the gradient is not monotonically
+ *  decreasing and there may be a routing loop. When the ForwardingEngine
+ *  receives such a packet, it tells the RoutingEngine to advertise its
+ *  gradient value soon, with the hope that the advertisement will update
+ *  the node who just sent a packet and break the loop.
+ *  
+ *  <p>ForwardingEngine times its packet transmissions. It differentiates
+ *  between four transmission cases: forwarding, success, ack failure, 
+ *  and loop detection. In each case, the
+ *  ForwardingEngine waits a randomized period of time before sending the next
+ *  packet. This approach assumes that the network is operating at low
+ *  utilization; its goal is to prevent correlated traffic -- such as 
+ *  nodes along a route forwarding packets -- from interfering with itself.
+ *
+ *  <table>
+ *    <tr>
+ *      <td><b>Case</b></td>
+ *      <td><b>CC2420 Wait (ms)</b></td>
+ *      <td><b>Other Wait (ms)</b></td>
+ *      <td><b>Description</b></td>
+ *    </tr>
+ *    <tr>
+ *      <td>Forwarding</td>
+ *      <td>Immediate</td>
+ *      <td>Immediate</td>
+ *      <td>When the ForwardingEngine receives a packet to forward and it is not
+ *          already sending a packet (queue is empty). In this case, it immediately
+ *          forwards the packet.</td>
+ *    </tr>
+ *    <tr>
+ *      <td>Success</td>
+ *      <td>16-31</td>
+ *      <td>128-255</td>
+ *      <td>When the ForwardingEngine successfully sends a packet to the next
+ *          hop, it waits this long before sending the next packet in the queue.
+ *          </td>
+ *    </tr>
+ *    <tr>
+ *      <td>Ack Failure</td>
+ *      <td>8-15</td>
+ *      <td>128-255</td>
+ *      <td>If the link layer supports acks and the ForwardingEngine did not
+ *          receive an acknowledgment from the next hop, it waits this long before
+ *          trying a retransmission. If the packet has exceeded the retransmission
+ *          count, ForwardingEngine drops the packet and uses the Success timer instead. </td>
+ *    </tr>
+ *    <tr>
+ *      <td>Loop Detection</td>
+ *      <td>32-63</td>
+ *      <td>512-1023</td>
+ *      <td>If the ForwardingEngine is asked to forward a packet from a node that
+ *          believes it is closer to the root, the ForwardingEngine pauses its
+ *          transmissions for this interval and triggers the RoutingEngine to 
+ *          send an update. The goal is to let the gradient become consistent before
+ *          sending packets, in order to prevent routing loops from consuming
+ *          bandwidth and energy.</td>
+ *    </tr>
+ *  </table>  
+ *
+ *  <p>The times above are all for CC2420-based platforms. The timings for
+ *  other platforms depend on their bit rates, as they are based on packet
+ *  transmission times.</p>
+
+ *  @author Philip Levis
+ *  @author Kyle Jamieson
+ *  @date   $Date$
+ */
+
+#include <CtpForwardingEngine.h>
+#include <CtpDebugMsg.h>
+   
+generic module CtpForwardingEngineP() {
+  provides {
+    interface Init;
+    interface StdControl;
+    interface Send[uint8_t client];
+    interface Receive[collection_id_t id];
+    interface Receive as Snoop[collection_id_t id];
+    interface Intercept[collection_id_t id];
+    interface Packet;
+    interface CollectionPacket;
+    interface CtpPacket;
+    interface CtpCongestion;
+  }
+  uses {
+    interface AMSend as SubSend;
+    interface Receive as SubReceive;
+    interface Receive as SubSnoop;
+    interface Packet as SubPacket;
+    interface UnicastNameFreeRouting;
+    interface SplitControl as RadioControl;
+    interface Queue<fe_queue_entry_t*> as SendQueue;
+    interface Pool<fe_queue_entry_t> as QEntryPool;
+    interface Pool<message_t> as MessagePool;
+    interface Timer<TMilli> as RetxmitTimer;
+
+    interface LinkEstimator;
+
+    // Counts down from the last time we heard from our parent; used
+    // to expire local state about parent congestion.
+    interface Timer<TMilli> as CongestionTimer;
+
+    interface Cache<message_t*> as SentCache;
+    interface CtpInfo;
+    interface PacketAcknowledgements;
+    interface Random;
+    interface RootControl;
+    interface CollectionId[uint8_t client];
+    interface AMPacket;
+    interface CollectionDebug;
+    interface Leds;
+  }
+}
+implementation {
+  /* Helper functions to start the given timer with a random number
+   * masked by the given mask and added to the given offset.
+   */
+  static void startRetxmitTimer(uint16_t mask, uint16_t offset);
+  static void startCongestionTimer(uint16_t mask, uint16_t offset);
+
+  /* Indicates whether our client is congested */
+  bool clientCongested = FALSE;
+
+  /* Tracks our parent's congestion state. */
+  bool parentCongested = FALSE;
+
+  /* Keeps track of whether the routing layer is running; if not,
+   * it will not send packets. */
+  bool running = FALSE;
+
+  /* Keeps track of whether the radio is on; no sense sending packets
+   * if the radio is off. */
+  bool radioOn = FALSE;
+
+  /* Keeps track of whether an ack is pending on an outgoing packet,
+   * so that the engine can work unreliably when the data-link layer
+   * does not support acks. */
+  bool ackPending = FALSE;
+
+  /* Keeps track of whether the packet on the head of the queue
+   * is being used, and control access to the data-link layer.*/
+  bool sending = FALSE;
+
+  /* Keep track of the last parent address we sent to, so that
+     unacked packets to an old parent are not incorrectly attributed
+     to a new parent. */
+  am_addr_t lastParent;
+  
+  /* Network-level sequence number, so that receivers
+   * can distinguish retransmissions from different packets. */
+  uint8_t seqno;
+
+  enum {
+    CLIENT_COUNT = uniqueCount(UQ_CTP_CLIENT)
+  };
+
+  /* Each sending client has its own reserved queue entry.
+     If the client has a packet pending, its queue entry is in the 
+     queue, and its clientPtr is NULL. If the client is idle,
+     its queue entry is pointed to by clientPtrs. */
+
+  fe_queue_entry_t clientEntries[CLIENT_COUNT];
+  fe_queue_entry_t* clientPtrs[CLIENT_COUNT];
+
+  /* The loopback message is for when a collection roots calls
+     Send.send. Since Send passes a pointer but Receive allows
+     buffer swaps, the forwarder copies the sent packet into 
+     the loopbackMsgPtr and performs a buffer swap with it.
+     See sendTask(). */
+     
+  message_t loopbackMsg;
+  message_t* loopbackMsgPtr;
+
+  command error_t Init.init() {
+    int i;
+    for (i = 0; i < CLIENT_COUNT; i++) {
+      clientPtrs[i] = clientEntries + i;
+      dbg("Forwarder", "clientPtrs[%hhu] = %p\n", i, clientPtrs[i]);
+    }
+    loopbackMsgPtr = &loopbackMsg;
+    lastParent = call AMPacket.address();
+    seqno = 0;
+    return SUCCESS;
+  }
+
+  command error_t StdControl.start() {
+    running = TRUE;
+    return SUCCESS;
+  }
+
+  command error_t StdControl.stop() {
+    running = FALSE;
+    return SUCCESS;
+  }
+
+  /* sendTask is where the first phase of all send logic
+   * exists (the second phase is in SubSend.sendDone()). */
+  task void sendTask();
+  
+  /* ForwardingEngine keeps track of whether the underlying
+     radio is powered on. If not, it enqueues packets;
+     when it turns on, it then starts sending packets. */ 
+  event void RadioControl.startDone(error_t err) {
+    if (err == SUCCESS) {
+      radioOn = TRUE;
+      if (!call SendQueue.empty()) {
+        post sendTask();
+      }
+    }
+  }
+  /* 
+   * If the ForwardingEngine has stopped sending packets because
+   * these has been no route, then as soon as one is found, start
+   * sending packets.
+   */ 
+  event void UnicastNameFreeRouting.routeFound() {
+    post sendTask();
+  }
+
+  event void UnicastNameFreeRouting.noRoute() {
+    // Depend on the sendTask to take care of this case;
+    // if there is no route the component will just resume
+    // operation on the routeFound event
+  }
+  
+  event void RadioControl.stopDone(error_t err) {
+    if (err == SUCCESS) {
+      radioOn = FALSE;
+    }
+  }
+
+  ctp_data_header_t* getHeader(message_t* m) {
+    return (ctp_data_header_t*)call SubPacket.getPayload(m, NULL);
+  }
+  /*
+   * The send call from a client. Return EBUSY if the client is busy
+   * (clientPtrs is NULL), otherwise configure its queue entry
+   * and put it in the send queue. If the ForwardingEngine is not
+   * already sending packets (the RetxmitTimer isn't running), post
+   * sendTask. It could be that the engine is running and sendTask
+   * has already been posted, but the post-once semantics make this
+   * not matter.
+   */ 
+  command error_t Send.send[uint8_t client](message_t* msg, uint8_t len) {
+    ctp_data_header_t* hdr;
+    fe_queue_entry_t *qe;
+    dbg("Forwarder", "%s: sending packet from client %hhu: %x, len %hhu\n", __FUNCTION__, client, msg, len);
+    if (!running) {return EOFF;}
+    if (len > call Send.maxPayloadLength[client]()) {return ESIZE;}
+    
+    call Packet.setPayloadLength(msg, len);
+    hdr = getHeader(msg);
+    hdr->origin = TOS_NODE_ID;
+    hdr->originSeqNo  = seqno++;
+    hdr->type = call CollectionId.fetch[client]();
+    hdr->thl = 0;
+
+    if (clientPtrs[client] == NULL) {
+      dbg("Forwarder", "%s: send failed as client is busy.\n", __FUNCTION__);
+      return EBUSY;
+    }
+
+    qe = clientPtrs[client];
+    qe->msg = msg;
+    qe->client = client;
+    qe->retries = MAX_RETRIES;
+    dbg("Forwarder", "%s: queue entry for %hhu is %hhu deep\n", __FUNCTION__, client, call SendQueue.size());
+    if (call SendQueue.enqueue(qe) == SUCCESS) {
+      if (radioOn && !call RetxmitTimer.isRunning()) {
+        post sendTask();
+      }
+      clientPtrs[client] = NULL;
+      return SUCCESS;
+    }
+    else {
+      dbg("Forwarder", 
+          "%s: send failed as packet could not be enqueued.\n", 
+          __FUNCTION__);
+      
+      // send a debug message to the uart
+      call CollectionDebug.logEvent(NET_C_FE_SEND_QUEUE_FULL);
+
+      // Return the pool entry, as it's not for me...
+      return FAIL;
+    }
+  }
+
+  command error_t Send.cancel[uint8_t client](message_t* msg) {
+    // cancel not implemented. will require being able
+    // to pull entries out of the queue.
+    return FAIL;
+  }
+
+  command uint8_t Send.maxPayloadLength[uint8_t client]() {
+    return call Packet.maxPayloadLength();
+  }
+
+  command void* Send.getPayload[uint8_t client](message_t* msg) {
+    return call Packet.getPayload(msg, NULL);
+  }
+
+  /*
+   * These is where all of the send logic is. When the ForwardingEngine
+   * wants to send a packet, it posts this task. The send logic is
+   * independent of whether it is a forwarded packet or a packet from
+   * a send client. 
+   *
+   * The task first checks that there is a packet to send and that
+   * there is a valid route. It then marshals the relevant arguments
+   * and prepares the packet for sending. If the node is a collection
+   * root, it signals Receive with the loopback message. Otherwise,
+   * it sets the packet to be acknowledged and sends it. It does not
+   * remove the packet from the send queue: while sending, the 
+   * packet being sent is at the head of the queue; a packet is dequeued
+   * in the sendDone handler, either due to retransmission failure
+   * or to a successful send.
+   */
+
+  task void sendTask() {
+    dbg("Forwarder", "%s: Trying to send a packet. Queue size is %hhu.\n", __FUNCTION__, call SendQueue.size());
+    if (sending) {
+      dbg("Forwarder", "%s: busy, don't send\n", __FUNCTION__);
+      call CollectionDebug.logEvent(NET_C_FE_SEND_BUSY);
+      return;
+    }
+    else if (call SendQueue.empty()) {
+      dbg("Forwarder", "%s: queue empty, don't send\n", __FUNCTION__);
+      call CollectionDebug.logEvent(NET_C_FE_SENDQUEUE_EMPTY);
+      return;
+    }
+    else if (!call RootControl.isRoot() && 
+             !call UnicastNameFreeRouting.hasRoute()) {
+      dbg("Forwarder", "%s: no route, don't send, start retry timer\n", __FUNCTION__);
+      call RetxmitTimer.startOneShot(10000);
+
+      // send a debug message to the uart
+      call CollectionDebug.logEvent(NET_C_FE_NO_ROUTE);
+
+      return;
+    }
+    /*
+    else if (parentCongested) {
+      // Do nothing; the congestion timer is necessarily set which
+      // will clear parentCongested and repost sendTask().
+      dbg("Forwarder", "%s: sendTask deferring for congested parent\n",
+          __FUNCTION__);
+      call CollectionDebug.logEvent(NET_C_FE_CONGESTION_SENDWAIT);
+    }
+    */
+    else {
+      error_t subsendResult;
+      fe_queue_entry_t* qe = call SendQueue.head();
+      uint8_t payloadLen = call SubPacket.payloadLength(qe->msg);
+      am_addr_t dest = call UnicastNameFreeRouting.nextHop();
+      uint16_t gradient;
+
+      if (call CtpInfo.isNeighborCongested(dest)) {
+        // Our parent is congested. We should wait.
+        // Don't repost the task, CongestionTimer will do the job
+        if (! parentCongested ) {
+          parentCongested = TRUE;
+          call CollectionDebug.logEvent(NET_C_FE_CONGESTION_BEGIN);
+        }
+        if (! call CongestionTimer.isRunning()) {
+          startCongestionTimer(CONGESTED_WAIT_WINDOW, CONGESTED_WAIT_OFFSET);
+        } 
+        dbg("Forwarder", "%s: sendTask deferring for congested parent\n",
+            __FUNCTION__);
+        //call CollectionDebug.logEvent(NET_C_FE_CONGESTION_SENDWAIT);
+        return;
+      } 
+      if (parentCongested) {
+        parentCongested = FALSE;
+        call CollectionDebug.logEvent(NET_C_FE_CONGESTION_END);
+      } 
+      // Once we are here, we have decided to send the packet.
+      if (call SentCache.lookup(qe->msg)) {
+        call CollectionDebug.logEvent(NET_C_FE_DUPLICATE_CACHE_AT_SEND);
+        call SendQueue.dequeue();
+        post sendTask();
+        return;
+      }
+      /* If our current parent is not the same as the last parent
+         we sent do, then reset the count of unacked packets: don't
+         penalize a new parent for the failures of a prior one.*/
+      if (dest != lastParent) {
+        qe->retries = MAX_RETRIES;
+        lastParent = dest;
+      }
+      dbg("Forwarder", "Sending queue entry %p\n", qe);
+      if (call RootControl.isRoot()) {
+        collection_id_t collectid = getHeader(qe->msg)->type;
+        memcpy(loopbackMsgPtr, qe->msg, sizeof(message_t));
+        ackPending = FALSE;
+       
+        dbg("Forwarder", "%s: I'm a root, so loopback and signal receive.\n", __FUNCTION__);
+        loopbackMsgPtr = signal Receive.receive[collectid](loopbackMsgPtr,
+                                                         call Packet.getPayload(loopbackMsgPtr, NULL), 
+                                                         call Packet.payloadLength(loopbackMsgPtr));
+        signal SubSend.sendDone(qe->msg, SUCCESS);
+        return;
+      }
+      
+      // Loop-detection functionality:
+      if (call CtpInfo.getEtx(&gradient) != SUCCESS) {
+        // If we have no metric, set our gradient conservatively so
+        // that other nodes don't automatically drop our packets.
+        gradient = 0;
+      }
+      call CtpPacket.setEtx(qe->msg, gradient);
+      
+      ackPending = (call PacketAcknowledgements.requestAck(qe->msg) == SUCCESS);
+
+      // Set or clear the congestion bit on *outgoing* packets.
+      if (call CtpCongestion.isCongested())
+        call CtpPacket.setOption(qe->msg, CTP_OPT_ECN);
+      else
+        call CtpPacket.clearOption(qe->msg, CTP_OPT_ECN);
+      
+      subsendResult = call SubSend.send(dest, qe->msg, payloadLen);
+      if (subsendResult == SUCCESS) {
+        // Successfully submitted to the data-link layer.
+        sending = TRUE;
+        dbg("Forwarder", "%s: subsend succeeded with %p.\n", __FUNCTION__, qe->msg);
+        if (qe->client < CLIENT_COUNT) {
+               dbg("Forwarder", "%s: client packet.\n", __FUNCTION__);
+        }
+        else {
+               dbg("Forwarder", "%s: forwarded packet.\n", __FUNCTION__);
+        }
+        return;
+      }
+      else if (subsendResult == EOFF) {
+       // The radio has been turned off underneath us. Assume that
+       // this is for the best. When the radio is turned back on, we'll
+       // handle a startDone event and resume sending.
+        radioOn = FALSE;
+       dbg("Forwarder", "%s: subsend failed from EOFF.\n", __FUNCTION__);
+        // send a debug message to the uart
+       call CollectionDebug.logEvent(NET_C_FE_SUBSEND_OFF);
+      }
+      else if (subsendResult == EBUSY) {
+       // This shouldn't happen, as we sit on top of a client and
+        // control our own output; it means we're trying to
+        // double-send (bug). This means we expect a sendDone, so just
+        // wait for that: when the sendDone comes in, // we'll try
+        // sending this packet again.  
+       dbg("Forwarder", "%s: subsend failed from EBUSY.\n", __FUNCTION__);
+        // send a debug message to the uart
+        call CollectionDebug.logEvent(NET_C_FE_SUBSEND_BUSY);
+      }
+      else if (subsendResult == ESIZE) {
+       dbg("Forwarder", "%s: subsend failed from ESIZE: truncate packet.\n", __FUNCTION__);
+       call Packet.setPayloadLength(qe->msg, call Packet.maxPayloadLength());
+       post sendTask();
+       call CollectionDebug.logEvent(NET_C_FE_SUBSEND_SIZE);
+      }
+    }
+  }
+
+  void sendDoneBug() {
+    // send a debug message to the uart
+    call CollectionDebug.logEvent(NET_C_FE_BAD_SENDDONE);
+  }
+
+  /*
+   * The second phase of a send operation; based on whether the transmission was
+   * successful, the ForwardingEngine either stops sending or starts the
+   * RetxmitTimer with an interval based on what has occured. If the send was
+   * successful or the maximum number of retransmissions has been reached, then
+   * the ForwardingEngine dequeues the current packet. If the packet is from a
+   * client it signals Send.sendDone(); if it is a forwarded packet it returns
+   * the packet and queue entry to their respective pools.
+   * 
+   */
+
+  event void SubSend.sendDone(message_t* msg, error_t error) {
+    fe_queue_entry_t *qe = call SendQueue.head();
+    dbg("Forwarder", "%s to %hu and %hhu\n", __FUNCTION__, call AMPacket.destination(msg), error);
+    if (qe == NULL || qe->msg != msg) {
+      dbg("Forwarder", "%s: BUG: not our packet (%p != %p)!\n", __FUNCTION__, msg, qe->msg);
+      sendDoneBug();      // Not our packet, something is very wrong...
+      return;
+    }
+    else if (error != SUCCESS) {
+      // Immediate retransmission is the worst thing to do.
+      dbg("Forwarder", "%s: send failed\n", __FUNCTION__);
+      call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL, 
+                                        call CollectionPacket.getSequenceNumber(msg), 
+                                        call CollectionPacket.getOrigin(msg), 
+                                         call AMPacket.destination(msg));
+      startRetxmitTimer(SENDDONE_FAIL_WINDOW, SENDDONE_FAIL_OFFSET);
+    }
+    else if (ackPending && !call PacketAcknowledgements.wasAcked(msg)) {
+      // AckPending is for case when DL cannot support acks.
+      call LinkEstimator.txNoAck(call AMPacket.destination(msg));
+      call CtpInfo.recomputeRoutes();
+      if (--qe->retries) { 
+        dbg("Forwarder", "%s: not acked\n", __FUNCTION__);
+        call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_WAITACK, 
+                                        call CollectionPacket.getSequenceNumber(msg), 
+                                        call CollectionPacket.getOrigin(msg), 
+                                         call AMPacket.destination(msg));
+        startRetxmitTimer(SENDDONE_NOACK_WINDOW, SENDDONE_NOACK_OFFSET);
+      } else {
+        //max retries, dropping packet
+        if (qe->client < CLIENT_COUNT) {
+            clientPtrs[qe->client] = qe;
+            signal Send.sendDone[qe->client](msg, FAIL);
+            call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL_ACK_SEND, 
+                                        call CollectionPacket.getSequenceNumber(msg), 
+                                        call CollectionPacket.getOrigin(msg), 
+                                         call AMPacket.destination(msg));
+        } else {
+           if (call MessagePool.put(qe->msg) != SUCCESS)
+             call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR);
+           if (call QEntryPool.put(qe) != SUCCESS)
+             call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR);
+           call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL_ACK_FWD, 
+                                        call CollectionPacket.getSequenceNumber(msg), 
+                                        call CollectionPacket.getOrigin(msg), 
+                                         call AMPacket.destination(msg));
+        }
+        call SendQueue.dequeue();
+        sending = FALSE;
+        startRetxmitTimer(SENDDONE_OK_WINDOW, SENDDONE_OK_OFFSET);
+      }
+    }
+    else if (qe->client < CLIENT_COUNT) {
+      ctp_data_header_t* hdr;
+      uint8_t client = qe->client;
+      dbg("Forwarder", "%s: our packet for client %hhu, remove %p from queue\n", 
+          __FUNCTION__, client, qe);
+      call CollectionDebug.logEventMsg(NET_C_FE_SENT_MSG, 
+                                        call CollectionPacket.getSequenceNumber(msg), 
+                                        call CollectionPacket.getOrigin(msg), 
+                                         call AMPacket.destination(msg));
+      call LinkEstimator.txAck(call AMPacket.destination(msg));
+      clientPtrs[client] = qe;
+      hdr = getHeader(qe->msg);
+      call SendQueue.dequeue();
+      signal Send.sendDone[client](msg, SUCCESS);
+      sending = FALSE;
+      startRetxmitTimer(SENDDONE_OK_WINDOW, SENDDONE_OK_OFFSET);
+    }
+    else if (call MessagePool.size() < call MessagePool.maxSize()) {
+      // A successfully forwarded packet.
+      dbg("Forwarder,Route", "%s: successfully forwarded packet (client: %hhu), message pool is %hhu/%hhu.\n", __FUNCTION__, qe->client, call MessagePool.size(), call MessagePool.maxSize());
+      call CollectionDebug.logEventMsg(NET_C_FE_FWD_MSG, 
+                                        call CollectionPacket.getSequenceNumber(msg), 
+                                        call CollectionPacket.getOrigin(msg), 
+                                         call AMPacket.destination(msg));
+      call LinkEstimator.txAck(call AMPacket.destination(msg));
+      call SentCache.insert(qe->msg);
+      call SendQueue.dequeue();
+      if (call MessagePool.put(qe->msg) != SUCCESS)
+        call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR);
+      if (call QEntryPool.put(qe) != SUCCESS)
+        call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR);
+      sending = FALSE;
+      startRetxmitTimer(SENDDONE_OK_WINDOW, SENDDONE_OK_OFFSET);
+      call Leds.led2Toggle();
+    }
+    else {
+      dbg("Forwarder", "%s: BUG: we have a pool entry, but the pool is full, client is %hhu.\n", __FUNCTION__, qe->client);
+      sendDoneBug();    // It's a forwarded packet, but there's no room the pool;
+      // someone has double-stored a pointer somewhere and we have nowhere
+      // to put this, so we have to leak it...
+    }
+  }
+
+  /*
+   * Function for preparing a packet for forwarding. Performs
+   * a buffer swap from the message pool. If there are no free
+   * message in the pool, it returns the passed message and does not
+   * put it on the send queue.
+   */
+  message_t* forward(message_t* m) {
+    if (call MessagePool.empty()) {
+      dbg("Route", "%s cannot forward, message pool empty.\n", __FUNCTION__);
+      // send a debug message to the uart
+      call CollectionDebug.logEvent(NET_C_FE_MSG_POOL_EMPTY);
+    }
+    else if (call QEntryPool.empty()) {
+      dbg("Route", "%s cannot forward, queue entry pool empty.\n", 
+          __FUNCTION__);
+      // send a debug message to the uart
+      call CollectionDebug.logEvent(NET_C_FE_QENTRY_POOL_EMPTY);
+    }
+    else {
+      message_t* newMsg;
+      fe_queue_entry_t *qe;
+      uint16_t gradient;
+      
+      qe = call QEntryPool.get();
+      if (qe == NULL) {
+        call CollectionDebug.logEvent(NET_C_FE_GET_MSGPOOL_ERR);
+        return m;
+      }
+
+      newMsg = call MessagePool.get();
+      if (newMsg == NULL) {
+        call CollectionDebug.logEvent(NET_C_FE_GET_QEPOOL_ERR);
+        return m;
+      }
+
+      memset(newMsg, 0, sizeof(message_t));
+      memset(m->metadata, 0, sizeof(message_metadata_t));
+      
+      qe->msg = m;
+      qe->client = 0xff;
+      qe->retries = MAX_RETRIES;
+
+      
+      if (call SendQueue.enqueue(qe) == SUCCESS) {
+        dbg("Forwarder,Route", "%s forwarding packet %p with queue size %hhu\n", __FUNCTION__, m, call SendQueue.size());
+        // Loop-detection code:
+        if (call CtpInfo.getEtx(&gradient) == SUCCESS) {
+          // We only check for loops if we know our own metric
+          if (call CtpPacket.getEtx(m) < gradient) {
+            // The incoming packet's metric (gradient) is less than our
+            // own gradient.  Trigger a route update and backoff.
+            call CtpInfo.triggerImmediateRouteUpdate();
+            startRetxmitTimer(LOOPY_WINDOW, LOOPY_OFFSET);
+            call CollectionDebug.logEventMsg(NET_C_FE_LOOP_DETECTED,
+                                        call CollectionPacket.getSequenceNumber(m), 
+                                        call CollectionPacket.getOrigin(m), 
+                                         call AMPacket.destination(m));
+          }
+        }
+
+        if (!call RetxmitTimer.isRunning()) {
+          // sendTask is only immediately posted if we don't detect a
+          // loop.
+          post sendTask();
+        }
+        
+        // Successful function exit point:
+        return newMsg;
+      } else {
+        // There was a problem enqueuing to the send queue.
+        if (call MessagePool.put(newMsg) != SUCCESS)
+          call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR);
+        if (call QEntryPool.put(qe) != SUCCESS)
+          call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR);
+      }
+    }
+
+    // NB: at this point, we have a resource acquistion problem.
+    // Trigger an immediate route update, log the event, and drop the
+    // packet on the floor.
+    call CtpInfo.triggerImmediateRouteUpdate();
+    call CollectionDebug.logEvent(NET_C_FE_SEND_QUEUE_FULL);
+    return m;
+  }
+  /*
+   * Received a message to forward. Check whether it is a duplicate by
+   * checking the packets currently in the queue as well as the 
+   * send history cache (in case we recently forwarded this packet).
+   * The cache is important as nodes immediately forward packets
+   * but wait a period before retransmitting after an ack failure.
+   * If this node is a root, signal receive.
+   */ 
+  event message_t* 
+  SubReceive.receive(message_t* msg, void* payload, uint8_t len) {
+    uint8_t netlen;
+    collection_id_t collectid;
+    bool duplicate = FALSE;
+    fe_queue_entry_t* qe;
+    uint8_t i, thl;
+
+
+    collectid = call CtpPacket.getType(msg);
+
+    // Update the THL here, since it has lived another hop, and so
+    // that the root sees the correct THL.
+    thl = call CtpPacket.getThl(msg);
+    thl++;
+    call CtpPacket.setThl(msg, thl);
+
+    call CollectionDebug.logEventMsg(NET_C_FE_RCV_MSG,
+                                        call CollectionPacket.getSequenceNumber(msg), 
+                                        call CollectionPacket.getOrigin(msg), 
+                                         call AMPacket.destination(msg));
+    if (len > call SubSend.maxPayloadLength()) {
+      return msg;
+    }
+
+    //See if we remember having seen this packet
+    //We look in the sent cache ...
+    if (call SentCache.lookup(msg)) {
+        call CollectionDebug.logEvent(NET_C_FE_DUPLICATE_CACHE);
+        return msg;
+    }
+    //... and in the queue for duplicates
+    if (call SendQueue.size() > 0) {
+      for (i = call SendQueue.size(); --i;) {
+       qe = call SendQueue.element(i);
+       if (call CtpPacket.matchInstance(qe->msg, msg)) {
+         duplicate = TRUE;
+         break;
+       }
+      }
+    }
+    
+    if (duplicate) {
+        call CollectionDebug.logEvent(NET_C_FE_DUPLICATE_QUEUE);
+        return msg;
+    }
+
+    // If I'm the root, signal receive. 
+    else if (call RootControl.isRoot())
+      return signal Receive.receive[collectid](msg, 
+                        call Packet.getPayload(msg, &netlen), 
+                        call Packet.payloadLength(msg));
+    // I'm on the routing path and Intercept indicates that I
+    // should not forward the packet.
+    else if (!signal Intercept.forward[collectid](msg, 
+                        call Packet.getPayload(msg, &netlen), 
+                        call Packet.payloadLength(msg)))
+      return msg;
+    else {
+      dbg("Route", "Forwarding packet from %hu.\n", getHeader(msg)->origin);
+      return forward(msg);
+    }
+  }
+
+  command void* 
+  Receive.getPayload[collection_id_t id](message_t* msg, uint8_t* len) {
+    return call Packet.getPayload(msg, NULL);
+  }
+
+  command uint8_t
+  Receive.payloadLength[collection_id_t id](message_t *msg) {
+    return call Packet.payloadLength(msg);
+  }
+
+  command void *
+  Snoop.getPayload[collection_id_t id](message_t *msg, uint8_t *len) {
+    return call Packet.getPayload(msg, NULL);
+  }
+
+  command uint8_t Snoop.payloadLength[collection_id_t id](message_t *msg) {
+    return call Packet.payloadLength(msg);
+  }
+
+  event message_t* 
+  SubSnoop.receive(message_t* msg, void *payload, uint8_t len) {
+    //am_addr_t parent = call UnicastNameFreeRouting.nextHop();
+    am_addr_t proximalSrc = call AMPacket.source(msg);
+
+    // Check for the pull bit (P) [TEP123] and act accordingly.  This
+    // check is made for all packets, not just ones addressed to us.
+    if (call CtpPacket.option(msg, CTP_OPT_PULL))
+      call CtpInfo.triggerRouteUpdate();
+
+    call CtpInfo.setNeighborCongested(proximalSrc, call CtpPacket.option(msg, CTP_OPT_ECN));
+    return signal Snoop.receive[call CtpPacket.getType(msg)] 
+      (msg, payload + sizeof(ctp_data_header_t), 
+       len - sizeof(ctp_data_header_t));
+  }
+  
+  event void RetxmitTimer.fired() {
+    sending = FALSE;
+    post sendTask();
+  }
+
+  event void CongestionTimer.fired() {
+    //parentCongested = FALSE;
+    //call CollectionDebug.logEventSimple(NET_C_FE_CONGESTION_END, 0);
+    post sendTask();
+  }
+  
+
+  command bool CtpCongestion.isCongested() {
+    // A simple predicate for now to determine congestion state of
+    // this node.
+    bool congested = (call SendQueue.size() + 2 >= call SendQueue.maxSize()) ? 
+      TRUE : FALSE;
+    return ((congested || clientCongested)?TRUE:FALSE);
+  }
+
+  command void CtpCongestion.setClientCongested(bool congested) {
+    bool wasCongested = call CtpCongestion.isCongested();
+    clientCongested = congested;
+    if (!wasCongested && congested) {
+      call CtpInfo.triggerImmediateRouteUpdate();
+    } else if (wasCongested && ! (call CtpCongestion.isCongested())) {
+      call CtpInfo.triggerRouteUpdate();
+    }
+  }
+
+  command void Packet.clear(message_t* msg) {
+    call SubPacket.clear(msg);
+  }
+  
+  command uint8_t Packet.payloadLength(message_t* msg) {
+    return call SubPacket.payloadLength(msg) - sizeof(ctp_data_header_t);
+  }
+
+  command void Packet.setPayloadLength(message_t* msg, uint8_t len) {
+    call SubPacket.setPayloadLength(msg, len + sizeof(ctp_data_header_t));
+  }
+  
+  command uint8_t Packet.maxPayloadLength() {
+    return call SubPacket.maxPayloadLength() - sizeof(ctp_data_header_t);
+  }
+
+  command void* Packet.getPayload(message_t* msg, uint8_t* len) {
+    uint8_t* payload = call SubPacket.getPayload(msg, len);
+    if (len != NULL) {
+      *len -= sizeof(ctp_data_header_t);
+    }
+    return payload + sizeof(ctp_data_header_t);
+  }
+
+  command am_addr_t       CollectionPacket.getOrigin(message_t* msg) {return getHeader(msg)->origin;}
+
+  command collection_id_t CollectionPacket.getType(message_t* msg) {return getHeader(msg)->type;}
+  command uint8_t         CollectionPacket.getSequenceNumber(message_t* msg) {return getHeader(msg)->originSeqNo;}
+  command void CollectionPacket.setOrigin(message_t* msg, am_addr_t addr) {getHeader(msg)->origin = addr;}
+  command void CollectionPacket.setType(message_t* msg, collection_id_t id) {getHeader(msg)->type = id;}
+  command void CollectionPacket.setSequenceNumber(message_t* msg, uint8_t _seqno) {getHeader(msg)->originSeqNo = _seqno;}
+  
+  //command ctp_options_t CtpPacket.getOptions(message_t* msg) {return getHeader(msg)->options;}
+
+  command uint8_t       CtpPacket.getType(message_t* msg) {return getHeader(msg)->type;}
+  command am_addr_t     CtpPacket.getOrigin(message_t* msg) {return getHeader(msg)->origin;}
+  command uint16_t      CtpPacket.getEtx(message_t* msg) {return getHeader(msg)->etx;}
+  command uint8_t       CtpPacket.getSequenceNumber(message_t* msg) {return getHeader(msg)->originSeqNo;}
+  command uint8_t       CtpPacket.getThl(message_t* msg) {return getHeader(msg)->thl;}
+  
+  command void CtpPacket.setThl(message_t* msg, uint8_t thl) {getHeader(msg)->thl = thl;}
+  command void CtpPacket.setOrigin(message_t* msg, am_addr_t addr) {getHeader(msg)->origin = addr;}
+  command void CtpPacket.setType(message_t* msg, uint8_t id) {getHeader(msg)->type = id;}
+
+  command bool CtpPacket.option(message_t* msg, ctp_options_t opt) {
+    return ((getHeader(msg)->options & opt) == opt) ? TRUE : FALSE;
+  }
+
+  command void CtpPacket.setOption(message_t* msg, ctp_options_t opt) {
+    getHeader(msg)->options |= opt;
+  }
+
+  command void CtpPacket.clearOption(message_t* msg, ctp_options_t opt) {
+    getHeader(msg)->options &= ~opt;
+  }
+
+  command void CtpPacket.setEtx(message_t* msg, uint16_t e) {getHeader(msg)->etx = e;}
+  command void CtpPacket.setSequenceNumber(message_t* msg, uint8_t _seqno) {getHeader(msg)->originSeqNo = _seqno;}
+
+  // A CTP packet ID is based on the origin and the THL field, to
+  // implement duplicate suppression as described in TEP 123.
+
+  command bool CtpPacket.matchInstance(message_t* m1, message_t* m2) {
+    return (call CtpPacket.getOrigin(m1) == call CtpPacket.getOrigin(m2) &&
+           call CtpPacket.getSequenceNumber(m1) == call CtpPacket.getSequenceNumber(m2) &&
+           call CtpPacket.getThl(m1) == call CtpPacket.getThl(m2) &&
+           call CtpPacket.getType(m1) == call CtpPacket.getType(m2));
+  }
+
+  command bool CtpPacket.matchPacket(message_t* m1, message_t* m2) {
+    return (call CtpPacket.getOrigin(m1) == call CtpPacket.getOrigin(m2) &&
+           call CtpPacket.getSequenceNumber(m1) == call CtpPacket.getSequenceNumber(m2) &&
+           call CtpPacket.getType(m1) == call CtpPacket.getType(m2));
+  }
+
+  default event void
+  Send.sendDone[uint8_t client](message_t *msg, error_t error) {
+  }
+
+  default event bool
+  Intercept.forward[collection_id_t collectid](message_t* msg, void* payload, 
+                                               uint16_t len) {
+    return TRUE;
+  }
+
+  default event message_t *
+  Receive.receive[collection_id_t collectid](message_t *msg, void *payload,
+                                             uint8_t len) {
+    return msg;
+  }
+
+  default event message_t *
+  Snoop.receive[collection_id_t collectid](message_t *msg, void *payload,
+                                           uint8_t len) {
+    return msg;
+  }
+
+  default command collection_id_t CollectionId.fetch[uint8_t client]() {
+    return 0;
+  }
+
+  static void startRetxmitTimer(uint16_t mask, uint16_t offset) {
+    uint16_t r = call Random.rand16();
+    r &= mask;
+    r += offset;
+    call RetxmitTimer.startOneShot(r);
+    dbg("Forwarder", "Rexmit timer will fire in %hu ms\n", r);
+  }
+
+  static void startCongestionTimer(uint16_t mask, uint16_t offset) {
+    uint16_t r = call Random.rand16();
+    r &= mask;
+    r += offset;
+    call CongestionTimer.startOneShot(r);
+    dbg("Forwarder", "Congestion timer will fire in %hu ms\n", r);
+  }
+
+  /* signalled when this neighbor is evicted from the neighbor table */
+  event void LinkEstimator.evicted(am_addr_t neighbor) {
+  }
+
+
+  /* Default implementations for CollectionDebug calls.
+   * These allow CollectionDebug not to be wired to anything if debugging
+   * is not desired. */
+
+    default command error_t CollectionDebug.logEvent(uint8_t type) {
+        return SUCCESS;
+    }
+    default command error_t CollectionDebug.logEventSimple(uint8_t type, uint16_t arg) {
+        return SUCCESS;
+    }
+    default command error_t CollectionDebug.logEventDbg(uint8_t type, uint16_t arg1, uint16_t arg2, uint16_t arg3) {
+        return SUCCESS;
+    }
+    default command error_t CollectionDebug.logEventMsg(uint8_t type, uint16_t msg, am_addr_t origin, am_addr_t node) {
+        return SUCCESS;
+    }
+    default command error_t CollectionDebug.logEventRoute(uint8_t type, am_addr_t parent, uint8_t hopcount, uint16_t metric) {
+        return SUCCESS;
+    }
+   
+}
+
+/* Rodrigo. This is an alternative
+  event void CtpInfo.ParentCongested(bool congested) {
+    if (congested) {
+      // We've overheard our parent's ECN bit set.
+      startCongestionTimer(CONGESTED_WAIT_WINDOW, CONGESTED_WAIT_OFFSET);
+      parentCongested = TRUE;
+      call CollectionDebug.logEvent(NET_C_FE_CONGESTION_BEGIN);
+    } else {
+      // We've overheard our parent's ECN bit cleared.
+      call CongestionTimer.stop();
+      parentCongested = FALSE;
+      call CollectionDebug.logEventSimple(NET_C_FE_CONGESTION_END, 1);
+      post sendTask();
+    }
+  }
+*/
+