From 0b4b913ee813feda2c6a2efd8a3e8353d9ccbc9d Mon Sep 17 00:00:00 2001 From: scipio Date: Mon, 10 Aug 2009 23:50:06 +0000 Subject: [PATCH] Cleaning up forwarder code. --- tos/lib/net/ctp/Ctp.h | 1 + tos/lib/net/ctp/CtpForwardingEngineP.nc | 318 ++++++++++-------------- 2 files changed, 138 insertions(+), 181 deletions(-) diff --git a/tos/lib/net/ctp/Ctp.h b/tos/lib/net/ctp/Ctp.h index 7d3b19b5..fe5ad7e6 100644 --- a/tos/lib/net/ctp/Ctp.h +++ b/tos/lib/net/ctp/Ctp.h @@ -57,6 +57,7 @@ enum { // CTP Options: CTP_OPT_PULL = 0x80, // TEP 123: P field CTP_OPT_ECN = 0x40, // TEP 123: C field + CTP_OPT_ALL = 0xff }; typedef nx_uint8_t nx_ctp_options_t; diff --git a/tos/lib/net/ctp/CtpForwardingEngineP.nc b/tos/lib/net/ctp/CtpForwardingEngineP.nc index ebfae93b..63c6e8e4 100644 --- a/tos/lib/net/ctp/CtpForwardingEngineP.nc +++ b/tos/lib/net/ctp/CtpForwardingEngineP.nc @@ -162,36 +162,22 @@ implementation { * masked by the given mask and added to the given offset. */ static void startRetxmitTimer(uint16_t mask, uint16_t offset); + void clearState(uint8_t state); + bool hasState(uint8_t state); + void setState(uint8_t state); - /* Indicates whether our client is congested */ - bool clientCongested = FALSE; - - /* Tracks our parent's congestion state. */ - bool parentCongested = FALSE; - - /* Threshold for congestion */ - uint8_t congestionThreshold; - - /* Keeps track of whether the routing layer is running; if not, - * it will not send packets. */ - bool running = FALSE; - - /* Keeps track of whether the radio is on; no sense sending packets - * if the radio is off. */ - bool radioOn = FALSE; - - /* Keeps track of whether an ack is pending on an outgoing packet, - * so that the engine can work unreliably when the data-link layer - * does not support acks. */ - bool ackPending = FALSE; - - /* Keeps track of whether the packet on the head of the queue - * is being used, and control access to the data-link layer. Note - * that CTP may be busy sending but there might be no transmission - * scheduled to the link layer, because CTP is using its own layer 3 - * timers to prevent self-interference.*/ - bool sending = FALSE; + // CTP state variables. + enum { + QUEUE_CONGESTED = 0x1, // Need to set C bit? + ROUTING_ON = 0x2, // Forwarding running? + RADIO_ON = 0x4, // Radio is on? + ACK_PENDING = 0x8, // Have an ACK pending? + SENDING = 0x10 // Am sending a packet? + }; + // Start with all states false + uint8_t forwardingState = 0; + /* Keep track of the last parent address we sent to, so that unacked packets to an old parent are not incorrectly attributed to a new parent. */ @@ -228,7 +214,6 @@ implementation { clientPtrs[i] = clientEntries + i; dbg("Forwarder", "clientPtrs[%hhu] = %p\n", i, clientPtrs[i]); } - congestionThreshold = (call SendQueue.maxSize()) >> 1; loopbackMsgPtr = &loopbackMsg; lastParent = call AMPacket.address(); seqno = 0; @@ -236,12 +221,12 @@ implementation { } command error_t StdControl.start() { - running = TRUE; + setState(ROUTING_ON); return SUCCESS; } command error_t StdControl.stop() { - running = FALSE; + clearState(ROUTING_ON); return SUCCESS; } @@ -254,8 +239,9 @@ implementation { when it turns on, it then starts sending packets. */ event void RadioControl.startDone(error_t err) { if (err == SUCCESS) { - radioOn = TRUE; + setState(RADIO_ON); if (!call SendQueue.empty()) { + dbg("FHangBug", "%s posted sendTask.\n", __FUNCTION__); post sendTask(); } } @@ -275,6 +261,7 @@ implementation { * sending packets. */ event void UnicastNameFreeRouting.routeFound() { + dbg("FHangBug", "%s posted sendTask.\n", __FUNCTION__); post sendTask(); } @@ -286,7 +273,7 @@ implementation { event void RadioControl.stopDone(error_t err) { if (err == SUCCESS) { - radioOn = FALSE; + clearState(RADIO_ON); } } @@ -309,7 +296,7 @@ implementation { ctp_data_header_t* hdr; fe_queue_entry_t *qe; dbg("Forwarder", "%s: sending packet from client %hhu: %x, len %hhu\n", __FUNCTION__, client, msg, len); - if (!running) {return EOFF;} + if (!hasState(ROUTING_ON)) {return EOFF;} if (len > call Send.maxPayloadLength[client]()) {return ESIZE;} call Packet.setPayloadLength(msg, len); @@ -330,7 +317,8 @@ implementation { qe->retries = MAX_RETRIES; dbg("Forwarder", "%s: queue entry for %hhu is %hhu deep\n", __FUNCTION__, client, call SendQueue.size()); if (call SendQueue.enqueue(qe) == SUCCESS) { - if (radioOn && !call RetxmitTimer.isRunning()) { + if (hasState(RADIO_ON) && !hasState(SENDING)) { + dbg("FHangBug", "%s posted sendTask.\n", __FUNCTION__); post sendTask(); } clientPtrs[client] = NULL; @@ -382,141 +370,105 @@ implementation { */ task void sendTask() { + uint16_t gradient; dbg("Forwarder", "%s: Trying to send a packet. Queue size is %hhu.\n", __FUNCTION__, call SendQueue.size()); - if (sending) { - dbg("Forwarder", "%s: busy, don't send.\n", __FUNCTION__); - call CollectionDebug.logEvent(NET_C_FE_SEND_BUSY); - return; - } - else if (call SendQueue.empty()) { - dbg("Forwarder", "%s: queue empty, nothing to send.\n", __FUNCTION__); + if (hasState(SENDING) || call SendQueue.empty()) { call CollectionDebug.logEvent(NET_C_FE_SENDQUEUE_EMPTY); return; } - else if (!call RootControl.isRoot() && - !call UnicastNameFreeRouting.hasRoute()) { - // Technically, this retry isn't necessary, as if a route - // is found we'll get an event. But just in case such an event - // is lost (e.g., a bug in the routing engine), we retry. - // Otherwise the forwarder might hang indefinitely. As this test - // doesn't require radio activity, the energy cost is minimal. + else if ((!call RootControl.isRoot() && + !call UnicastNameFreeRouting.hasRoute()) || + (call CtpInfo.getEtx(&gradient) != SUCCESS)) { + /* This code path is for when we don't have a valid next + * hop. We set a retry timer. + * + * Technically, this timer isn't necessary, as if a route + * is found we'll get an event. But just in case such an event + * is lost (e.g., a bug in the routing engine), we retry. + * Otherwise the forwarder might hang indefinitely. As this test + * doesn't require radio activity, the energy cost is minimal. */ dbg("Forwarder", "%s: no route, don't send, try again in %i.\n", __FUNCTION__, NO_ROUTE_RETRY); call RetxmitTimer.startOneShot(NO_ROUTE_RETRY); call CollectionDebug.logEvent(NET_C_FE_NO_ROUTE); return; } else { - // We can send a packet. + /* We can send a packet. + First check if it's a duplicate; + if not, try to send/forward. */ error_t subsendResult; fe_queue_entry_t* qe = call SendQueue.head(); uint8_t payloadLen = call SubPacket.payloadLength(qe->msg); am_addr_t dest = call UnicastNameFreeRouting.nextHop(); - uint16_t gradient; - // Make sure we haven't sent this packet before with the same THL. - // Note that this implies it's a forwarded packet, so we can - // circumvent the client or forwarded branch for freeing - // the buffer. if (call SentCache.lookup(qe->msg)) { + /* This packet is a duplicate, so suppress it: free memory and + * send next packet. Duplicates are only possible for + * forwarded packets, so we can circumvent the client or + * forwarded branch for freeing the buffer. */ call CollectionDebug.logEvent(NET_C_FE_DUPLICATE_CACHE_AT_SEND); call SendQueue.dequeue(); - if (call MessagePool.put(qe->msg) != SUCCESS) - call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); - if (call QEntryPool.put(qe) != SUCCESS) - call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); + if (call MessagePool.put(qe->msg) != SUCCESS) + call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); + if (call QEntryPool.put(qe) != SUCCESS) + call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); + post sendTask(); return; } - /* If our current parent is not the same as the last parent - we sent do, then reset the count of unacked packets: don't - penalize a new parent for the failures of a prior one.*/ - // Give the high retry count, keeping this seems like a bad idea. - // If you've reached MAX_RETRIES, you've cycled through a bunch of - // parents. -pal - /* - if (dest != lastParent) { - qe->retries = MAX_RETRIES; - lastParent = dest; - } - */ - - // We've decided we're going to send. + + // Not a duplicate: we've decided we're going to send. dbg("Forwarder", "Sending queue entry %p\n", qe); - // If we're a root, copy the packet to a receive buffer and signal - // receive. We have to copy because send expects the buffer back, - // but receive might do a buffer swap. + if (call RootControl.isRoot()) { + /* Code path for roots: copy the packet and signal receive. */ collection_id_t collectid = getHeader(qe->msg)->type; + uint8_t* payload; + uint8_t payloadLength; + memcpy(loopbackMsgPtr, qe->msg, sizeof(message_t)); - ackPending = FALSE; - + + payload = call Packet.getPayload(loopbackMsgPtr, call Packet.payloadLength(loopbackMsgPtr)); + payloadLength = call Packet.payloadLength(loopbackMsgPtr); dbg("Forwarder", "%s: I'm a root, so loopback and signal receive.\n", __FUNCTION__); loopbackMsgPtr = signal Receive.receive[collectid](loopbackMsgPtr, - call Packet.getPayload(loopbackMsgPtr, call Packet.payloadLength(loopbackMsgPtr)), - call Packet.payloadLength(loopbackMsgPtr)); + payload, + payloadLength); signal SubSend.sendDone(qe->msg, SUCCESS); - return; - } - - // Loop-detection functionality: - if (call CtpInfo.getEtx(&gradient) != SUCCESS) { - // If we have no metric, set our gradient conservatively so - // that other nodes don't automatically drop our packets. - gradient = 0; - } - call CtpPacket.setEtx(qe->msg, gradient); - - ackPending = (call PacketAcknowledgements.requestAck(qe->msg) == SUCCESS); - - // Make sure the ECN bit is not set. - call CtpPacket.clearOption(qe->msg, CTP_OPT_ECN); - - subsendResult = call SubSend.send(dest, qe->msg, payloadLen); - if (subsendResult == SUCCESS) { - // Successfully submitted to the data-link layer. - sending = TRUE; - dbg("Forwarder", "%s: subsend succeeded with %p.\n", __FUNCTION__, qe->msg); - if (qe->client < CLIENT_COUNT) { - dbg("Forwarder", "%s: client packet.\n", __FUNCTION__); - } - else { - dbg("Forwarder", "%s: forwarded packet.\n", __FUNCTION__); - } - return; - } - else if (subsendResult == EOFF) { - // The radio has been turned off underneath us. Assume that - // this is for the best. When the radio is turned back on, we'll - // handle a startDone event and resume sending. - radioOn = FALSE; - dbg("Forwarder", "%s: subsend failed from EOFF.\n", __FUNCTION__); - // send a debug message to the uart - call CollectionDebug.logEvent(NET_C_FE_SUBSEND_OFF); - } - else if (subsendResult == EBUSY) { - // This shouldn't happen, as we sit on top of a client and - // control our own output; it means we're trying to - // double-send (bug). This means we expect a sendDone, so just - // wait for that: when the sendDone comes in, we'll try - // sending this packet again. - dbg("Forwarder", "%s: subsend failed from EBUSY.\n", __FUNCTION__); - // send a debug message to the uart - call CollectionDebug.logEvent(NET_C_FE_SUBSEND_BUSY); } - // The packet is too big: truncate it and retry. - else if (subsendResult == ESIZE) { - dbg("Forwarder", "%s: subsend failed from ESIZE: truncate packet.\n", __FUNCTION__); - call Packet.setPayloadLength(qe->msg, call Packet.maxPayloadLength()); - post sendTask(); - call CollectionDebug.logEvent(NET_C_FE_SUBSEND_SIZE); + else { + /* The basic forwarding/sending case. */ + call CtpPacket.setEtx(qe->msg, gradient); + call CtpPacket.clearOption(qe->msg, CTP_OPT_ECN | CTP_OPT_PULL); + if (call PacketAcknowledgements.requestAck(qe->msg) == SUCCESS) { + setState(ACK_PENDING); + } + if (hasState(QUEUE_CONGESTED)) { + call CtpPacket.setOption(qe->msg, CTP_OPT_ECN); + clearState(QUEUE_CONGESTED); + } + + subsendResult = call SubSend.send(dest, qe->msg, payloadLen); + if (subsendResult == SUCCESS) { + // Successfully submitted to the data-link layer. + setState(SENDING); + dbg("Forwarder", "%s: subsend succeeded with %p.\n", __FUNCTION__, qe->msg); + return; + } + // The packet is too big: truncate it and retry. + else if (subsendResult == ESIZE) { + dbg("Forwarder", "%s: subsend failed from ESIZE: truncate packet.\n", __FUNCTION__); + call Packet.setPayloadLength(qe->msg, call Packet.maxPayloadLength()); + post sendTask(); + call CollectionDebug.logEvent(NET_C_FE_SUBSEND_SIZE); + } + else { + dbg("Forwarder", "%s: subsend failed from %i\n", __FUNCTION__, (int)subsendResult); + } } } } - void sendDoneBug() { - // send a debug message to the uart - call CollectionDebug.logEvent(NET_C_FE_BAD_SENDDONE); - } /* * The second phase of a send operation; based on whether the transmission was @@ -532,13 +484,9 @@ implementation { event void SubSend.sendDone(message_t* msg, error_t error) { fe_queue_entry_t *qe = call SendQueue.head(); dbg("Forwarder", "%s to %hu and %hhu\n", __FUNCTION__, call AMPacket.destination(msg), error); - if (qe == NULL || qe->msg != msg) { - dbg("Forwarder", "%s: BUG: not our packet (%p != %p)!\n", __FUNCTION__, msg, qe->msg); - sendDoneBug(); // Not our packet, something is very wrong... - return; - } - else if (error != SUCCESS) { - // Immediate retransmission is the worst thing to do. + + if (error != SUCCESS) { + /* The radio wasn't able to send the packet: retransmit it. */ dbg("Forwarder", "%s: send failed\n", __FUNCTION__); call CollectionDebug.logEventMsg(NET_C_FE_SENDDONE_FAIL, call CollectionPacket.getSequenceNumber(msg), @@ -546,8 +494,8 @@ implementation { call AMPacket.destination(msg)); startRetxmitTimer(SENDDONE_FAIL_WINDOW, SENDDONE_FAIL_OFFSET); } - else if (ackPending && !call PacketAcknowledgements.wasAcked(msg)) { - // AckPending is for case when DL cannot support acks. + else if (hasState(ACK_PENDING) && !call PacketAcknowledgements.wasAcked(msg)) { + /* Retransmission for unacked packet. Might drop the packet. */ call LinkEstimator.txNoAck(call AMPacket.destination(msg)); call CtpInfo.recomputeRoutes(); if (--qe->retries) { @@ -579,49 +527,45 @@ implementation { call AMPacket.destination(msg)); } call SendQueue.dequeue(); - sending = FALSE; + clearState(SENDING); startRetxmitTimer(SENDDONE_OK_WINDOW, SENDDONE_OK_OFFSET); } } - else if (qe->client < CLIENT_COUNT) { - ctp_data_header_t* hdr; - uint8_t client = qe->client; - dbg("Forwarder", "%s: our packet for client %hhu, remove %p from queue\n", - __FUNCTION__, client, qe); - call CollectionDebug.logEventMsg(NET_C_FE_SENT_MSG, + else { + /* Packet was acknowledged. Updated the link estimator, + free the buffer (pool or sendDone), start timer to + send next packet. */ + call SendQueue.dequeue(); + clearState(SENDING); + startRetxmitTimer(SENDDONE_OK_WINDOW, SENDDONE_OK_OFFSET); + call LinkEstimator.txAck(call AMPacket.destination(msg)); + + if (qe->client < CLIENT_COUNT) { + call CollectionDebug.logEventMsg(NET_C_FE_SENT_MSG, call CollectionPacket.getSequenceNumber(msg), call CollectionPacket.getOrigin(msg), call AMPacket.destination(msg)); - call LinkEstimator.txAck(call AMPacket.destination(msg)); - clientPtrs[client] = qe; - hdr = getHeader(qe->msg); - call SendQueue.dequeue(); - signal Send.sendDone[client](msg, SUCCESS); - sending = FALSE; - startRetxmitTimer(SENDDONE_OK_WINDOW, SENDDONE_OK_OFFSET); - } - else if (call MessagePool.size() < call MessagePool.maxSize()) { - // A successfully forwarded packet. - dbg("Forwarder,Route", "%s: successfully forwarded packet (client: %hhu), message pool is %hhu/%hhu.\n", __FUNCTION__, qe->client, call MessagePool.size(), call MessagePool.maxSize()); - call CollectionDebug.logEventMsg(NET_C_FE_FWD_MSG, + signal Send.sendDone[qe->client](msg, SUCCESS); + dbg("Forwarder", "%s: our packet for client %hhu, remove %p from queue\n", + __FUNCTION__, client, qe); + clientPtrs[qe->client] = qe; + } + else if (call MessagePool.size() < call MessagePool.maxSize()) { + // A successfully forwarded packet. + dbg("Forwarder,Route", "%s: successfully forwarded packet (client: %hhu), message pool is %hhu/%hhu.\n", __FUNCTION__, qe->client, call MessagePool.size(), call MessagePool.maxSize()); + call CollectionDebug.logEventMsg(NET_C_FE_FWD_MSG, call CollectionPacket.getSequenceNumber(msg), call CollectionPacket.getOrigin(msg), call AMPacket.destination(msg)); - call LinkEstimator.txAck(call AMPacket.destination(msg)); - call SentCache.insert(qe->msg); - call SendQueue.dequeue(); - if (call MessagePool.put(qe->msg) != SUCCESS) - call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); - if (call QEntryPool.put(qe) != SUCCESS) - call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); - sending = FALSE; - startRetxmitTimer(SENDDONE_OK_WINDOW, SENDDONE_OK_OFFSET); - } - else { - dbg("Forwarder", "%s: BUG: we have a pool entry, but the pool is full, client is %hhu.\n", __FUNCTION__, qe->client); - sendDoneBug(); // It's a forwarded packet, but there's no room the pool; - // someone has double-stored a pointer somewhere and we have nowhere - // to put this, so we have to leak it... + call SentCache.insert(qe->msg); + if (call MessagePool.put(qe->msg) != SUCCESS) + call CollectionDebug.logEvent(NET_C_FE_PUT_MSGPOOL_ERR); + if (call QEntryPool.put(qe) != SUCCESS) + call CollectionDebug.logEvent(NET_C_FE_PUT_QEPOOL_ERR); + } + else { + dbg("Forwarder", "%s: BUG: we have a pool entry, but the pool is full, client is %hhu.\n", __FUNCTION__, qe->client); + } } } @@ -690,6 +634,7 @@ implementation { if (!call RetxmitTimer.isRunning()) { // sendTask is only immediately posted if we don't detect a // loop. + dbg("FHangBug", "%s: posted sendTask.\n", __FUNCTION__); post sendTask(); } @@ -801,7 +746,8 @@ implementation { } event void RetxmitTimer.fired() { - sending = FALSE; + clearState(SENDING); + dbg("FHangBug", "%s posted sendTask.\n", __FUNCTION__); post sendTask(); } @@ -888,6 +834,16 @@ implementation { } + void clearState(uint8_t state) { + forwardingState = forwardingState & ~state; + } + bool hasState(uint8_t state) { + return forwardingState & state; + } + void setState(uint8_t state) { + forwardingState = forwardingState | state; + } + /******** Defaults. **************/ default event void -- 2.39.2