From 4987f5d3bccf7f7931bc1f78b6b2f3363995fedf Mon Sep 17 00:00:00 2001 From: gnawali Date: Sat, 15 Aug 2009 18:11:30 +0000 Subject: [PATCH] reverting to 1.21 until we fix a bug introduced in 1.22 --- tos/lib/net/ctp/CtpForwardingEngineP.nc | 222 +++++++++++------------- 1 file changed, 105 insertions(+), 117 deletions(-) diff --git a/tos/lib/net/ctp/CtpForwardingEngineP.nc b/tos/lib/net/ctp/CtpForwardingEngineP.nc index d6f6dc9c..964e35a5 100644 --- a/tos/lib/net/ctp/CtpForwardingEngineP.nc +++ b/tos/lib/net/ctp/CtpForwardingEngineP.nc @@ -183,6 +183,11 @@ implementation { // Start with all states false uint8_t forwardingState = 0; + /* Keep track of the last parent address we sent to, so that + unacked packets to an old parent are not incorrectly attributed + to a new parent. */ + am_addr_t lastParent; + /* Network-level sequence number, so that receivers * can distinguish retransmissions from different packets. */ uint8_t seqno; @@ -212,9 +217,10 @@ implementation { int i; for (i = 0; i < CLIENT_COUNT; i++) { clientPtrs[i] = clientEntries + i; - dbg("CtpForwarder", "clientPtrs[%hhu] = %p\n", i, clientPtrs[i]); + dbg("Forwarder", "clientPtrs[%hhu] = %p\n", i, clientPtrs[i]); } loopbackMsgPtr = &loopbackMsg; + lastParent = call AMPacket.address(); seqno = 0; return SUCCESS; } @@ -232,12 +238,6 @@ implementation { /* sendTask is where the first phase of all send logic * exists (the second phase is in SubSend.sendDone()). */ task void sendTask(); - - /* sendComplete is called by sendDone when it is done - * with a packet (either due to too many retransmissions or - * an acknowledgment). It frees mmemory appropriately and - * cleans up the sending state */ - void sendComplete(fe_queue_entry_t* qe, message_t* msg, bool success); /* ForwardingEngine keeps track of whether the underlying radio is powered on. If not, it enqueues packets; @@ -246,6 +246,7 @@ implementation { if (err == SUCCESS) { setState(RADIO_ON); if (!call SendQueue.empty()) { + dbg("FHangBug", "%s posted sendTask.\n", __FUNCTION__); post sendTask(); } } @@ -256,7 +257,7 @@ implementation { r %= window; r += offset; call RetxmitTimer.startOneShot(r); - dbg("CtpForwarder", "Rexmit timer will fire in %hu ms\n", r); + dbg("Forwarder", "Rexmit timer will fire in %hu ms\n", r); } /* @@ -265,6 +266,7 @@ implementation { * sending packets. */ event void UnicastNameFreeRouting.routeFound() { + dbg("FHangBug", "%s posted sendTask.\n", __FUNCTION__); post sendTask(); } @@ -298,7 +300,7 @@ implementation { command error_t Send.send[uint8_t client](message_t* msg, uint8_t len) { ctp_data_header_t* hdr; fe_queue_entry_t *qe; - dbg("CtpForwarder", "%s: sending packet from client %hhu: %x, len %hhu\n", __FUNCTION__, client, msg, len); + dbg("Forwarder", "%s: sending packet from client %hhu: %x, len %hhu\n", __FUNCTION__, client, msg, len); if (!hasState(ROUTING_ON)) {return EOFF;} if (len > call Send.maxPayloadLength[client]()) {return ESIZE;} @@ -310,7 +312,7 @@ implementation { hdr->thl = 0; if (clientPtrs[client] == NULL) { - dbg("CtpForwarder", "%s: send failed as client is busy.\n", __FUNCTION__); + dbg("Forwarder", "%s: send failed as client is busy.\n", __FUNCTION__); return EBUSY; } @@ -318,19 +320,21 @@ implementation { qe->msg = msg; qe->client = client; qe->retries = MAX_RETRIES; - dbg("CtpForwarder", "%s: queue entry for %hhu is %hhu deep\n", __FUNCTION__, client, call SendQueue.size()); + dbg("Forwarder", "%s: queue entry for %hhu is %hhu deep\n", __FUNCTION__, client, call SendQueue.size()); if (call SendQueue.enqueue(qe) == SUCCESS) { if (hasState(RADIO_ON) && !hasState(SENDING)) { + dbg("FHangBug", "%s posted sendTask.\n", __FUNCTION__); post sendTask(); } clientPtrs[client] = NULL; return SUCCESS; } else { - dbg("CtpForwarder", + dbg("Forwarder", "%s: send failed as packet could not be enqueued.\n", __FUNCTION__); + // send a debug message to the uart call CollectionDebug.logEvent(NET_C_FE_SEND_QUEUE_FULL); // Return the pool entry, as it's not for me... @@ -372,7 +376,7 @@ implementation { task void sendTask() { uint16_t gradient; - dbg("CtpForwarder", "%s: Trying to send a packet. Queue size is %hhu.\n", __FUNCTION__, call SendQueue.size()); + dbg("Forwarder", "%s: Trying to send a packet. Queue size is %hhu.\n", __FUNCTION__, call SendQueue.size()); if (hasState(SENDING) || call SendQueue.empty()) { call CollectionDebug.logEvent(NET_C_FE_SENDQUEUE_EMPTY); return; @@ -388,7 +392,7 @@ implementation { * is lost (e.g., a bug in the routing engine), we retry. * Otherwise the forwarder might hang indefinitely. As this test * doesn't require radio activity, the energy cost is minimal. */ - dbg("CtpForwarder", "%s: no route, don't send, try again in %i.\n", __FUNCTION__, NO_ROUTE_RETRY); + dbg("Forwarder", "%s: no route, don't send, try again in %i.\n", __FUNCTION__, NO_ROUTE_RETRY); call RetxmitTimer.startOneShot(NO_ROUTE_RETRY); call CollectionDebug.logEvent(NET_C_FE_NO_ROUTE); return; @@ -419,7 +423,7 @@ implementation { } // Not a duplicate: we've decided we're going to send. - dbg("CtpForwarder", "Sending queue entry %p\n", qe); + dbg("Forwarder", "Sending queue entry %p\n", qe); if (call RootControl.isRoot()) { /* Code path for roots: copy the packet and signal receive. */ @@ -431,7 +435,7 @@ implementation { payload = call Packet.getPayload(loopbackMsgPtr, call Packet.payloadLength(loopbackMsgPtr)); payloadLength = call Packet.payloadLength(loopbackMsgPtr); - dbg("CtpForwarder", "%s: I'm a root, so loopback and signal receive.\n", __FUNCTION__); + dbg("Forwarder", "%s: I'm a root, so loopback and signal receive.\n", __FUNCTION__); loopbackMsgPtr = signal Receive.receive[collectid](loopbackMsgPtr, payload, payloadLength); @@ -453,18 +457,18 @@ implementation { if (subsendResult == SUCCESS) { // Successfully submitted to the data-link layer. setState(SENDING); - dbg("CtpForwarder", "%s: subsend succeeded with %p.\n", __FUNCTION__, qe->msg); + dbg("Forwarder", "%s: subsend succeeded with %p.\n", __FUNCTION__, qe->msg); return; } // The packet is too big: truncate it and retry. else if (subsendResult == ESIZE) { - dbg("CtpForwarder", "%s: subsend failed from ESIZE: truncate packet.\n", __FUNCTION__); + dbg("Forwarder", "%s: subsend failed from ESIZE: truncate packet.\n", __FUNCTION__); call Packet.setPayloadLength(qe->msg, call Packet.maxPayloadLength()); post sendTask(); call CollectionDebug.logEvent(NET_C_FE_SUBSEND_SIZE); } else { - dbg("CtpForwarder", "%s: subsend failed from %i\n", __FUNCTION__, (int)subsendResult); + dbg("Forwarder", "%s: subsend failed from %i\n", __FUNCTION__, (int)subsendResult); } } } @@ -482,13 +486,57 @@ implementation { * */ + void packetComplete(fe_queue_entry_t* qe, message_t* msg, bool success) { + // Four cases: + // Local packet: success or failure + // Forwarded packet: success or failure + if (qe->client < CLIENT_COUNT) { + clientPtrs[qe->client] = qe; + signal Send.sendDone[qe->client](msg, SUCCESS); + if (success) { + dbg("CtpForwarder", "%s: packet %hu.%hhu for client %hhu acknowledged.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg), qe->client); + call CollectionDebug.logEventMsg(NET_C_FE_SENT_MSG, + call CollectionPacket.getSequenceNumber(msg), + call CollectionPacket.getOrigin(msg), + call AMPacket.destination(msg)); + } else { + dbg("CtpForwarder", "%s: packet %hu.%hhu for client %hhu dropped.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg), qe->client); + call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL_ACK_SEND, + call CollectionPacket.getSequenceNumber(msg), + call CollectionPacket.getOrigin(msg), + call AMPacket.destination(msg)); + } + } + else { + if (success) { + call SentCache.insert(qe->msg); + dbg("CtpForwarder", "%s: forwarded packet %hu.%hhu acknowledged: insert in transmit queue.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg)); + call CollectionDebug.logEventMsg(NET_C_FE_FWD_MSG, + call CollectionPacket.getSequenceNumber(msg), + call CollectionPacket.getOrigin(msg), + call AMPacket.destination(msg)); + } + else { + dbg("CtpForwarder", "%s: forwarded packet %hu.%hhu dropped.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg)); + call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL_ACK_FWD, + call CollectionPacket.getSequenceNumber(msg), + call CollectionPacket.getOrigin(msg), + call AMPacket.destination(msg)); + } + if (call MessagePool.put(qe->msg) != SUCCESS) + call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); + if (call QEntryPool.put(qe) != SUCCESS) + call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); + } + } + event void SubSend.sendDone(message_t* msg, error_t error) { fe_queue_entry_t *qe = call SendQueue.head(); - dbg("CtpForwarder", "%s to %hu and %hhu\n", __FUNCTION__, call AMPacket.destination(msg), error); + dbg("Forwarder", "%s to %hu and %hhu\n", __FUNCTION__, call AMPacket.destination(msg), error); if (error != SUCCESS) { /* The radio wasn't able to send the packet: retransmit it. */ - dbg("CtpForwarder", "%s: send failed\n", __FUNCTION__); + dbg("Forwarder", "%s: send failed\n", __FUNCTION__); call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL, call CollectionPacket.getSequenceNumber(msg), call CollectionPacket.getOrigin(msg), @@ -500,7 +548,7 @@ implementation { call LinkEstimator.txNoAck(call AMPacket.destination(msg)); call CtpInfo.recomputeRoutes(); if (--qe->retries) { - dbg("CtpForwarder", "%s: not acked, retransmit\n", __FUNCTION__); + dbg("Forwarder", "%s: not acked, retransmit\n", __FUNCTION__); call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_WAITACK, call CollectionPacket.getSequenceNumber(msg), call CollectionPacket.getOrigin(msg), @@ -512,7 +560,7 @@ implementation { clearState(SENDING); startRetxmitTimer(SENDDONE_OK_WINDOW, SENDDONE_OK_OFFSET); - sendComplete(qe, msg, FALSE); + packetComplete(qe, msg, FALSE); } } else { @@ -523,7 +571,7 @@ implementation { clearState(SENDING); startRetxmitTimer(SENDDONE_OK_WINDOW, SENDDONE_OK_OFFSET); call LinkEstimator.txAck(call AMPacket.destination(msg)); - sendComplete(qe, msg, TRUE); + packetComplete(qe, msg, TRUE); } } @@ -535,11 +583,14 @@ implementation { */ message_t* ONE forward(message_t* ONE m) { if (call MessagePool.empty()) { - dbg("CtpForwarder", "%s cannot forward, message pool empty.\n", __FUNCTION__); + dbg("Route", "%s cannot forward, message pool empty.\n", __FUNCTION__); + // send a debug message to the uart call CollectionDebug.logEvent(NET_C_FE_MSG_POOL_EMPTY); } else if (call QEntryPool.empty()) { - dbg("CtpForwarder", "%s cannot forward, queue entry pool empty.\n", __FUNCTION__); + dbg("Route", "%s cannot forward, queue entry pool empty.\n", + __FUNCTION__); + // send a debug message to the uart call CollectionDebug.logEvent(NET_C_FE_QENTRY_POOL_EMPTY); } else { @@ -549,14 +600,13 @@ implementation { qe = call QEntryPool.get(); if (qe == NULL) { - call CollectionDebug.logEvent(NET_C_FE_GET_QEPOOL_ERR); + call CollectionDebug.logEvent(NET_C_FE_GET_MSGPOOL_ERR); return m; } newMsg = call MessagePool.get(); if (newMsg == NULL) { - call QEntryPool.put(qe); - call CollectionDebug.logEvent(NET_C_FE_GET_MSGPOOL_ERR); + call CollectionDebug.logEvent(NET_C_FE_GET_QEPOOL_ERR); return m; } @@ -568,26 +618,15 @@ implementation { qe->retries = MAX_RETRIES; - if (call SendQueue.enqueue(qe) != SUCCESS) { - // There was a problem enqueuing: drop packet and free memory - if (call MessagePool.put(newMsg) != SUCCESS) - call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); - if (call QEntryPool.put(qe) != SUCCESS) - call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); - - call CollectionDebug.logEvent(NET_C_FE_SEND_QUEUE_FULL); - // Fall to base case of just returning received buffer - } - else { - // Put the packet on the queue to send - dbg("CtpForwarder", "%s forwarding packet %p with queue size %hhu\n", __FUNCTION__, m, call SendQueue.size()); + if (call SendQueue.enqueue(qe) == SUCCESS) { + dbg("Forwarder,Route", "%s forwarding packet %p with queue size %hhu\n", __FUNCTION__, m, call SendQueue.size()); // Loop-detection code: if (call CtpInfo.getEtx(&gradient) == SUCCESS) { // We only check for loops if we know our own metric if (call CtpPacket.getEtx(m) <= gradient) { // If our etx metric is less than or equal to the etx value // on the packet (etx of the previous hop node), then we believe - // we may be in a loop. + // we are in a loop. // Trigger a route update and backoff. call CtpInfo.triggerImmediateRouteUpdate(); startRetxmitTimer(LOOPY_WINDOW, LOOPY_OFFSET); @@ -601,16 +640,26 @@ implementation { if (!call RetxmitTimer.isRunning()) { // sendTask is only immediately posted if we don't detect a // loop. + dbg("FHangBug", "%s: posted sendTask.\n", __FUNCTION__); post sendTask(); } // Successful function exit point: return newMsg; + } else { + // There was a problem enqueuing to the send queue. + if (call MessagePool.put(newMsg) != SUCCESS) + call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); + if (call QEntryPool.put(qe) != SUCCESS) + call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); } } - // Only reach this code if we weren't able to enqueue: - // return packet to link layer, "dropping" it. + // NB: at this point, we have a resource acquistion problem. + // Log the event, and drop the + // packet on the floor. + + call CollectionDebug.logEvent(NET_C_FE_SEND_QUEUE_FULL); return m; } @@ -620,21 +669,12 @@ implementation { * send history cache (in case we recently forwarded this packet). * The cache is important as nodes immediately forward packets * but wait a period before retransmitting after an ack failure. - * If this node is a root, signal receive. If not, call - * forward(). - * - * There are two standard forwarding call paths, which depend on - * whether the forwarding engine is already sending packets. If not, - * the forwarder can send immediately. If it is, then it just puts - * the packet on the queue and waits for the the forwarder to pull - * it off in time: - * - * sending: receive -> forward -> sendTask -> sendDone - * !sending: receive -> forwarder ... RetxtmitTimer -> sendTask -> sendDone + * If this node is a root, signal receive. */ event message_t* SubReceive.receive(message_t* msg, void* payload, uint8_t len) { collection_id_t collectid; + bool duplicate = FALSE; fe_queue_entry_t* qe; uint8_t i, thl; @@ -666,11 +706,16 @@ implementation { for (i = call SendQueue.size(); --i;) { qe = call SendQueue.element(i); if (call CtpPacket.matchInstance(qe->msg, msg)) { - call CollectionDebug.logEvent(NET_C_FE_DUPLICATE_QUEUE); - return msg; + duplicate = TRUE; + break; } } } + + if (duplicate) { + call CollectionDebug.logEvent(NET_C_FE_DUPLICATE_QUEUE); + return msg; + } // If I'm the root, signal receive. else if (call RootControl.isRoot()) @@ -684,7 +729,7 @@ implementation { call Packet.payloadLength(msg))) return msg; else { - dbg("CtpForwarder", "Forwarding packet from %hu.\n", getHeader(msg)->origin); + dbg("Route", "Forwarding packet from %hu.\n", getHeader(msg)->origin); return forward(msg); } } @@ -705,67 +750,10 @@ implementation { (msg, payload + sizeof(ctp_data_header_t), len - sizeof(ctp_data_header_t)); } - - - /* - * sendComplete is called by sendDone when it is done - * with a packet (either due to too many retransmissions or - * an acknowledgment). It frees memory appropriately and - * cleans up the sending state. For local packets, this - * means freeing the client's state and signaling sendDone; - * for forwarded packets it means returning the queue entry - * and packet to their respective pools. - * - * - */ - void sendComplete(fe_queue_entry_t* qe, message_t* msg, bool success) { - // Four cases: local success, local failure, forwarded successes, forwarded failure - if (qe->client < CLIENT_COUNT) { - if (success) { // Local success - dbg("CtpForwarder", "%s: packet %hu.%hhu for client %hhu acknowledged.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg), qe->client); - call CollectionDebug.logEventMsg(NET_C_FE_SENT_MSG, - call CollectionPacket.getSequenceNumber(msg), - call CollectionPacket.getOrigin(msg), - call AMPacket.destination(msg)); - } else { // Local failure - dbg("CtpForwarder", "%s: packet %hu.%hhu for client %hhu dropped.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg), qe->client); - call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL_ACK_SEND, - call CollectionPacket.getSequenceNumber(msg), - call CollectionPacket.getOrigin(msg), - call AMPacket.destination(msg)); - } - // Client memory cleanup - clientPtrs[qe->client] = qe; - signal Send.sendDone[qe->client](msg, SUCCESS); - } - else { - if (success) { // Forwarded success - call SentCache.insert(qe->msg); - dbg("CtpForwarder", "%s: forwarded packet %hu.%hhu acknowledged: insert in transmit queue.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg)); - call CollectionDebug.logEventMsg(NET_C_FE_FWD_MSG, - call CollectionPacket.getSequenceNumber(msg), - call CollectionPacket.getOrigin(msg), - call AMPacket.destination(msg)); - } - else { // Forwarded failure - dbg("CtpForwarder", "%s: forwarded packet %hu.%hhu dropped.\n", __FUNCTION__, call CollectionPacket.getOrigin(msg), call CollectionPacket.getSequenceNumber(msg)); - call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL_ACK_FWD, - call CollectionPacket.getSequenceNumber(msg), - call CollectionPacket.getOrigin(msg), - call AMPacket.destination(msg)); - } - // Forwarding memory cleanup - if (call MessagePool.put(qe->msg) != SUCCESS) - call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); - if (call QEntryPool.put(qe) != SUCCESS) - call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); - } - } - - event void RetxmitTimer.fired() { clearState(SENDING); + dbg("FHangBug", "%s posted sendTask.\n", __FUNCTION__); post sendTask(); } -- 2.39.2