From b218f81ffed390c2f2918bbe8e3b6c80eb8b4e39 Mon Sep 17 00:00:00 2001 From: scipio Date: Thu, 13 Aug 2009 20:50:12 +0000 Subject: [PATCH] More comments. --- tos/lib/net/ctp/CtpForwardingEngineP.nc | 222 +++++++++++++----------- 1 file changed, 117 insertions(+), 105 deletions(-) diff --git a/tos/lib/net/ctp/CtpForwardingEngineP.nc b/tos/lib/net/ctp/CtpForwardingEngineP.nc index 964e35a5..d6f6dc9c 100644 --- a/tos/lib/net/ctp/CtpForwardingEngineP.nc +++ b/tos/lib/net/ctp/CtpForwardingEngineP.nc @@ -183,11 +183,6 @@ implementation { // Start with all states false uint8_t forwardingState = 0; - /* Keep track of the last parent address we sent to, so that - unacked packets to an old parent are not incorrectly attributed - to a new parent. */ - am_addr_t lastParent; - /* Network-level sequence number, so that receivers * can distinguish retransmissions from different packets. */ uint8_t seqno; @@ -217,10 +212,9 @@ implementation { int i; for (i = 0; i < CLIENT_COUNT; i++) { clientPtrs[i] = clientEntries + i; - dbg("Forwarder", "clientPtrs[%hhu] = %p\n", i, clientPtrs[i]); + dbg("CtpForwarder", "clientPtrs[%hhu] = %p\n", i, clientPtrs[i]); } loopbackMsgPtr = &loopbackMsg; - lastParent = call AMPacket.address(); seqno = 0; return SUCCESS; } @@ -238,6 +232,12 @@ implementation { /* sendTask is where the first phase of all send logic * exists (the second phase is in SubSend.sendDone()). */ task void sendTask(); + + /* sendComplete is called by sendDone when it is done + * with a packet (either due to too many retransmissions or + * an acknowledgment). It frees mmemory appropriately and + * cleans up the sending state */ + void sendComplete(fe_queue_entry_t* qe, message_t* msg, bool success); /* ForwardingEngine keeps track of whether the underlying radio is powered on. If not, it enqueues packets; @@ -246,7 +246,6 @@ implementation { if (err == SUCCESS) { setState(RADIO_ON); if (!call SendQueue.empty()) { - dbg("FHangBug", "%s posted sendTask.\n", __FUNCTION__); post sendTask(); } } @@ -257,7 +256,7 @@ implementation { r %= window; r += offset; call RetxmitTimer.startOneShot(r); - dbg("Forwarder", "Rexmit timer will fire in %hu ms\n", r); + dbg("CtpForwarder", "Rexmit timer will fire in %hu ms\n", r); } /* @@ -266,7 +265,6 @@ implementation { * sending packets. */ event void UnicastNameFreeRouting.routeFound() { - dbg("FHangBug", "%s posted sendTask.\n", __FUNCTION__); post sendTask(); } @@ -300,7 +298,7 @@ implementation { command error_t Send.send[uint8_t client](message_t* msg, uint8_t len) { ctp_data_header_t* hdr; fe_queue_entry_t *qe; - dbg("Forwarder", "%s: sending packet from client %hhu: %x, len %hhu\n", __FUNCTION__, client, msg, len); + dbg("CtpForwarder", "%s: sending packet from client %hhu: %x, len %hhu\n", __FUNCTION__, client, msg, len); if (!hasState(ROUTING_ON)) {return EOFF;} if (len > call Send.maxPayloadLength[client]()) {return ESIZE;} @@ -312,7 +310,7 @@ implementation { hdr->thl = 0; if (clientPtrs[client] == NULL) { - dbg("Forwarder", "%s: send failed as client is busy.\n", __FUNCTION__); + dbg("CtpForwarder", "%s: send failed as client is busy.\n", __FUNCTION__); return EBUSY; } @@ -320,21 +318,19 @@ implementation { qe->msg = msg; qe->client = client; qe->retries = MAX_RETRIES; - dbg("Forwarder", "%s: queue entry for %hhu is %hhu deep\n", __FUNCTION__, client, call SendQueue.size()); + dbg("CtpForwarder", "%s: queue entry for %hhu is %hhu deep\n", __FUNCTION__, client, call SendQueue.size()); if (call SendQueue.enqueue(qe) == SUCCESS) { if (hasState(RADIO_ON) && !hasState(SENDING)) { - dbg("FHangBug", "%s posted sendTask.\n", __FUNCTION__); post sendTask(); } clientPtrs[client] = NULL; return SUCCESS; } else { - dbg("Forwarder", + dbg("CtpForwarder", "%s: send failed as packet could not be enqueued.\n", __FUNCTION__); - // send a debug message to the uart call CollectionDebug.logEvent(NET_C_FE_SEND_QUEUE_FULL); // Return the pool entry, as it's not for me... @@ -376,7 +372,7 @@ implementation { task void sendTask() { uint16_t gradient; - dbg("Forwarder", "%s: Trying to send a packet. Queue size is %hhu.\n", __FUNCTION__, call SendQueue.size()); + dbg("CtpForwarder", "%s: Trying to send a packet. Queue size is %hhu.\n", __FUNCTION__, call SendQueue.size()); if (hasState(SENDING) || call SendQueue.empty()) { call CollectionDebug.logEvent(NET_C_FE_SENDQUEUE_EMPTY); return; @@ -392,7 +388,7 @@ implementation { * is lost (e.g., a bug in the routing engine), we retry. * Otherwise the forwarder might hang indefinitely. As this test * doesn't require radio activity, the energy cost is minimal. */ - dbg("Forwarder", "%s: no route, don't send, try again in %i.\n", __FUNCTION__, NO_ROUTE_RETRY); + dbg("CtpForwarder", "%s: no route, don't send, try again in %i.\n", __FUNCTION__, NO_ROUTE_RETRY); call RetxmitTimer.startOneShot(NO_ROUTE_RETRY); call CollectionDebug.logEvent(NET_C_FE_NO_ROUTE); return; @@ -423,7 +419,7 @@ implementation { } // Not a duplicate: we've decided we're going to send. - dbg("Forwarder", "Sending queue entry %p\n", qe); + dbg("CtpForwarder", "Sending queue entry %p\n", qe); if (call RootControl.isRoot()) { /* Code path for roots: copy the packet and signal receive. */ @@ -435,7 +431,7 @@ implementation { payload = call Packet.getPayload(loopbackMsgPtr, call Packet.payloadLength(loopbackMsgPtr)); payloadLength = call Packet.payloadLength(loopbackMsgPtr); - dbg("Forwarder", "%s: I'm a root, so loopback and signal receive.\n", __FUNCTION__); + dbg("CtpForwarder", "%s: I'm a root, so loopback and signal receive.\n", __FUNCTION__); loopbackMsgPtr = signal Receive.receive[collectid](loopbackMsgPtr, payload, payloadLength); @@ -457,18 +453,18 @@ implementation { if (subsendResult == SUCCESS) { // Successfully submitted to the data-link layer. setState(SENDING); - dbg("Forwarder", "%s: subsend succeeded with %p.\n", __FUNCTION__, qe->msg); + dbg("CtpForwarder", "%s: subsend succeeded with %p.\n", __FUNCTION__, qe->msg); return; } // The packet is too big: truncate it and retry. else if (subsendResult == ESIZE) { - dbg("Forwarder", "%s: subsend failed from ESIZE: truncate packet.\n", __FUNCTION__); + dbg("CtpForwarder", "%s: subsend failed from ESIZE: truncate packet.\n", __FUNCTION__); call Packet.setPayloadLength(qe->msg, call Packet.maxPayloadLength()); post sendTask(); call CollectionDebug.logEvent(NET_C_FE_SUBSEND_SIZE); } else { - dbg("Forwarder", "%s: subsend failed from %i\n", __FUNCTION__, (int)subsendResult); + dbg("CtpForwarder", "%s: subsend failed from %i\n", __FUNCTION__, (int)subsendResult); } } } @@ -486,57 +482,13 @@ implementation { * */ - void packetComplete(fe_queue_entry_t* qe, message_t* msg, bool success) { - // Four cases: - // Local packet: success or failure - // Forwarded packet: success or failure - if (qe->client < CLIENT_COUNT) { - clientPtrs[qe->client] = qe; - signal Send.sendDone[qe->client](msg, SUCCESS); - if (success) { - dbg("CtpForwarder", "%s: packet %hu.%hhu for client %hhu acknowledged.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg), qe->client); - call CollectionDebug.logEventMsg(NET_C_FE_SENT_MSG, - call CollectionPacket.getSequenceNumber(msg), - call CollectionPacket.getOrigin(msg), - call AMPacket.destination(msg)); - } else { - dbg("CtpForwarder", "%s: packet %hu.%hhu for client %hhu dropped.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg), qe->client); - call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL_ACK_SEND, - call CollectionPacket.getSequenceNumber(msg), - call CollectionPacket.getOrigin(msg), - call AMPacket.destination(msg)); - } - } - else { - if (success) { - call SentCache.insert(qe->msg); - dbg("CtpForwarder", "%s: forwarded packet %hu.%hhu acknowledged: insert in transmit queue.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg)); - call CollectionDebug.logEventMsg(NET_C_FE_FWD_MSG, - call CollectionPacket.getSequenceNumber(msg), - call CollectionPacket.getOrigin(msg), - call AMPacket.destination(msg)); - } - else { - dbg("CtpForwarder", "%s: forwarded packet %hu.%hhu dropped.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg)); - call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL_ACK_FWD, - call CollectionPacket.getSequenceNumber(msg), - call CollectionPacket.getOrigin(msg), - call AMPacket.destination(msg)); - } - if (call MessagePool.put(qe->msg) != SUCCESS) - call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); - if (call QEntryPool.put(qe) != SUCCESS) - call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); - } - } - event void SubSend.sendDone(message_t* msg, error_t error) { fe_queue_entry_t *qe = call SendQueue.head(); - dbg("Forwarder", "%s to %hu and %hhu\n", __FUNCTION__, call AMPacket.destination(msg), error); + dbg("CtpForwarder", "%s to %hu and %hhu\n", __FUNCTION__, call AMPacket.destination(msg), error); if (error != SUCCESS) { /* The radio wasn't able to send the packet: retransmit it. */ - dbg("Forwarder", "%s: send failed\n", __FUNCTION__); + dbg("CtpForwarder", "%s: send failed\n", __FUNCTION__); call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL, call CollectionPacket.getSequenceNumber(msg), call CollectionPacket.getOrigin(msg), @@ -548,7 +500,7 @@ implementation { call LinkEstimator.txNoAck(call AMPacket.destination(msg)); call CtpInfo.recomputeRoutes(); if (--qe->retries) { - dbg("Forwarder", "%s: not acked, retransmit\n", __FUNCTION__); + dbg("CtpForwarder", "%s: not acked, retransmit\n", __FUNCTION__); call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_WAITACK, call CollectionPacket.getSequenceNumber(msg), call CollectionPacket.getOrigin(msg), @@ -560,7 +512,7 @@ implementation { clearState(SENDING); startRetxmitTimer(SENDDONE_OK_WINDOW, SENDDONE_OK_OFFSET); - packetComplete(qe, msg, FALSE); + sendComplete(qe, msg, FALSE); } } else { @@ -571,7 +523,7 @@ implementation { clearState(SENDING); startRetxmitTimer(SENDDONE_OK_WINDOW, SENDDONE_OK_OFFSET); call LinkEstimator.txAck(call AMPacket.destination(msg)); - packetComplete(qe, msg, TRUE); + sendComplete(qe, msg, TRUE); } } @@ -583,14 +535,11 @@ implementation { */ message_t* ONE forward(message_t* ONE m) { if (call MessagePool.empty()) { - dbg("Route", "%s cannot forward, message pool empty.\n", __FUNCTION__); - // send a debug message to the uart + dbg("CtpForwarder", "%s cannot forward, message pool empty.\n", __FUNCTION__); call CollectionDebug.logEvent(NET_C_FE_MSG_POOL_EMPTY); } else if (call QEntryPool.empty()) { - dbg("Route", "%s cannot forward, queue entry pool empty.\n", - __FUNCTION__); - // send a debug message to the uart + dbg("CtpForwarder", "%s cannot forward, queue entry pool empty.\n", __FUNCTION__); call CollectionDebug.logEvent(NET_C_FE_QENTRY_POOL_EMPTY); } else { @@ -600,13 +549,14 @@ implementation { qe = call QEntryPool.get(); if (qe == NULL) { - call CollectionDebug.logEvent(NET_C_FE_GET_MSGPOOL_ERR); + call CollectionDebug.logEvent(NET_C_FE_GET_QEPOOL_ERR); return m; } newMsg = call MessagePool.get(); if (newMsg == NULL) { - call CollectionDebug.logEvent(NET_C_FE_GET_QEPOOL_ERR); + call QEntryPool.put(qe); + call CollectionDebug.logEvent(NET_C_FE_GET_MSGPOOL_ERR); return m; } @@ -618,15 +568,26 @@ implementation { qe->retries = MAX_RETRIES; - if (call SendQueue.enqueue(qe) == SUCCESS) { - dbg("Forwarder,Route", "%s forwarding packet %p with queue size %hhu\n", __FUNCTION__, m, call SendQueue.size()); + if (call SendQueue.enqueue(qe) != SUCCESS) { + // There was a problem enqueuing: drop packet and free memory + if (call MessagePool.put(newMsg) != SUCCESS) + call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); + if (call QEntryPool.put(qe) != SUCCESS) + call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); + + call CollectionDebug.logEvent(NET_C_FE_SEND_QUEUE_FULL); + // Fall to base case of just returning received buffer + } + else { + // Put the packet on the queue to send + dbg("CtpForwarder", "%s forwarding packet %p with queue size %hhu\n", __FUNCTION__, m, call SendQueue.size()); // Loop-detection code: if (call CtpInfo.getEtx(&gradient) == SUCCESS) { // We only check for loops if we know our own metric if (call CtpPacket.getEtx(m) <= gradient) { // If our etx metric is less than or equal to the etx value // on the packet (etx of the previous hop node), then we believe - // we are in a loop. + // we may be in a loop. // Trigger a route update and backoff. call CtpInfo.triggerImmediateRouteUpdate(); startRetxmitTimer(LOOPY_WINDOW, LOOPY_OFFSET); @@ -640,26 +601,16 @@ implementation { if (!call RetxmitTimer.isRunning()) { // sendTask is only immediately posted if we don't detect a // loop. - dbg("FHangBug", "%s: posted sendTask.\n", __FUNCTION__); post sendTask(); } // Successful function exit point: return newMsg; - } else { - // There was a problem enqueuing to the send queue. - if (call MessagePool.put(newMsg) != SUCCESS) - call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); - if (call QEntryPool.put(qe) != SUCCESS) - call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); } } - // NB: at this point, we have a resource acquistion problem. - // Log the event, and drop the - // packet on the floor. - - call CollectionDebug.logEvent(NET_C_FE_SEND_QUEUE_FULL); + // Only reach this code if we weren't able to enqueue: + // return packet to link layer, "dropping" it. return m; } @@ -669,12 +620,21 @@ implementation { * send history cache (in case we recently forwarded this packet). * The cache is important as nodes immediately forward packets * but wait a period before retransmitting after an ack failure. - * If this node is a root, signal receive. + * If this node is a root, signal receive. If not, call + * forward(). + * + * There are two standard forwarding call paths, which depend on + * whether the forwarding engine is already sending packets. If not, + * the forwarder can send immediately. If it is, then it just puts + * the packet on the queue and waits for the the forwarder to pull + * it off in time: + * + * sending: receive -> forward -> sendTask -> sendDone + * !sending: receive -> forwarder ... RetxtmitTimer -> sendTask -> sendDone */ event message_t* SubReceive.receive(message_t* msg, void* payload, uint8_t len) { collection_id_t collectid; - bool duplicate = FALSE; fe_queue_entry_t* qe; uint8_t i, thl; @@ -706,16 +666,11 @@ implementation { for (i = call SendQueue.size(); --i;) { qe = call SendQueue.element(i); if (call CtpPacket.matchInstance(qe->msg, msg)) { - duplicate = TRUE; - break; + call CollectionDebug.logEvent(NET_C_FE_DUPLICATE_QUEUE); + return msg; } } } - - if (duplicate) { - call CollectionDebug.logEvent(NET_C_FE_DUPLICATE_QUEUE); - return msg; - } // If I'm the root, signal receive. else if (call RootControl.isRoot()) @@ -729,7 +684,7 @@ implementation { call Packet.payloadLength(msg))) return msg; else { - dbg("Route", "Forwarding packet from %hu.\n", getHeader(msg)->origin); + dbg("CtpForwarder", "Forwarding packet from %hu.\n", getHeader(msg)->origin); return forward(msg); } } @@ -750,10 +705,67 @@ implementation { (msg, payload + sizeof(ctp_data_header_t), len - sizeof(ctp_data_header_t)); } + + + /* + * sendComplete is called by sendDone when it is done + * with a packet (either due to too many retransmissions or + * an acknowledgment). It frees memory appropriately and + * cleans up the sending state. For local packets, this + * means freeing the client's state and signaling sendDone; + * for forwarded packets it means returning the queue entry + * and packet to their respective pools. + * + * + */ + void sendComplete(fe_queue_entry_t* qe, message_t* msg, bool success) { + // Four cases: local success, local failure, forwarded successes, forwarded failure + if (qe->client < CLIENT_COUNT) { + if (success) { // Local success + dbg("CtpForwarder", "%s: packet %hu.%hhu for client %hhu acknowledged.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg), qe->client); + call CollectionDebug.logEventMsg(NET_C_FE_SENT_MSG, + call CollectionPacket.getSequenceNumber(msg), + call CollectionPacket.getOrigin(msg), + call AMPacket.destination(msg)); + } else { // Local failure + dbg("CtpForwarder", "%s: packet %hu.%hhu for client %hhu dropped.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg), qe->client); + call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL_ACK_SEND, + call CollectionPacket.getSequenceNumber(msg), + call CollectionPacket.getOrigin(msg), + call AMPacket.destination(msg)); + } + // Client memory cleanup + clientPtrs[qe->client] = qe; + signal Send.sendDone[qe->client](msg, SUCCESS); + } + else { + if (success) { // Forwarded success + call SentCache.insert(qe->msg); + dbg("CtpForwarder", "%s: forwarded packet %hu.%hhu acknowledged: insert in transmit queue.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg)); + call CollectionDebug.logEventMsg(NET_C_FE_FWD_MSG, + call CollectionPacket.getSequenceNumber(msg), + call CollectionPacket.getOrigin(msg), + call AMPacket.destination(msg)); + } + else { // Forwarded failure + dbg("CtpForwarder", "%s: forwarded packet %hu.%hhu dropped.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg)); + call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL_ACK_FWD, + call CollectionPacket.getSequenceNumber(msg), + call CollectionPacket.getOrigin(msg), + call AMPacket.destination(msg)); + } + // Forwarding memory cleanup + if (call MessagePool.put(qe->msg) != SUCCESS) + call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); + if (call QEntryPool.put(qe) != SUCCESS) + call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); + } + } + + event void RetxmitTimer.fired() { clearState(SENDING); - dbg("FHangBug", "%s posted sendTask.\n", __FUNCTION__); post sendTask(); } -- 2.39.2