From 48e7ce9859df7a0878f5a97f39f1942636bd7cb2 Mon Sep 17 00:00:00 2001 From: andreaskoepke Date: Thu, 15 Mar 2007 13:03:06 +0000 Subject: [PATCH] implemented AMSend.cancel, sendDone(ECANCEL) will be signalled after the command returned. updated logic of nextPacket --- tos/system/AMQueueImplP.nc | 313 +++++++++++++++++++------------------ 1 file changed, 164 insertions(+), 149 deletions(-) diff --git a/tos/system/AMQueueImplP.nc b/tos/system/AMQueueImplP.nc index aa75dafa..8ea5c7aa 100644 --- a/tos/system/AMQueueImplP.nc +++ b/tos/system/AMQueueImplP.nc @@ -1,26 +1,26 @@ // $Id$ /* - * "Copyright (c) 2005 Stanford University. All rights reserved. - * - * Permission to use, copy, modify, and distribute this software and - * its documentation for any purpose, without fee, and without written - * agreement is hereby granted, provided that the above copyright - * notice, the following two paragraphs and the author appear in all - * copies of this software. - * - * IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE TO ANY PARTY FOR - * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES - * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN - * IF STANFORD UNIVERSITY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH - * DAMAGE. - * - * STANFORD UNIVERSITY SPECIFICALLY DISCLAIMS ANY WARRANTIES, - * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE - * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND STANFORD UNIVERSITY - * HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, - * ENHANCEMENTS, OR MODIFICATIONS." - */ +* "Copyright (c) 2005 Stanford University. All rights reserved. +* +* Permission to use, copy, modify, and distribute this software and +* its documentation for any purpose, without fee, and without written +* agreement is hereby granted, provided that the above copyright +* notice, the following two paragraphs and the author appear in all +* copies of this software. +* +* IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE TO ANY PARTY FOR +* DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES +* ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN +* IF STANFORD UNIVERSITY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH +* DAMAGE. +* +* STANFORD UNIVERSITY SPECIFICALLY DISCLAIMS ANY WARRANTIES, +* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE +* PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND STANFORD UNIVERSITY +* HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, +* ENHANCEMENTS, OR MODIFICATIONS." +*/ /** * An AM send queue that provides a Service Instance pattern for @@ -35,153 +35,168 @@ #include "AM.h" generic module AMQueueImplP(int numClients) { - provides interface Send[uint8_t client]; - uses{ - interface AMSend[am_id_t id]; - interface AMPacket; - interface Packet; - } + provides interface Send[uint8_t client]; + uses{ + interface AMSend[am_id_t id]; + interface AMPacket; + interface Packet; + } } implementation { - + typedef struct { + message_t* msg; + } queue_entry_t; - enum { - QUEUE_EMPTY = 255, - }; - - typedef struct { - message_t* msg; - } queue_entry_t; - - uint8_t current = QUEUE_EMPTY; - queue_entry_t queue[numClients]; - + uint8_t current = numClients; // mark as empty + queue_entry_t queue[numClients]; + uint8_t cancelMask[numClients/8 + 1]; - void tryToSend(); + void tryToSend(); - void nextPacket() { - uint16_t i; - uint8_t initial = current; - if (initial == QUEUE_EMPTY) { - initial = 0; + void nextPacket() { + uint8_t i; + current = (current + 1) % numClients; + for(i = 0; i < numClients; i++) { + if((queue[current].msg == NULL) || + (cancelMask[current/8] & (1 << current%8))) + { + current = (current + 1) % numClients; + } + else { + break; + } + } + if(i >= numClients) current = numClients; } - i = initial; - for (; i < (initial + numClients); i++) { - uint8_t client = (uint8_t)i % numClients; - if (queue[client].msg != NULL) { - current = client; - return; - } - } - current = QUEUE_EMPTY; - } - - /** - * Accepts a properly formatted AM packet for later sending. - * Assumes that someone has filled in the AM packet fields - * (destination, AM type). - * - * @param msg - the message to send - * @param len - the length of the payload - * - */ - - command error_t Send.send[uint8_t clientId](message_t* msg, - uint8_t len) { - if (clientId > numClients) {return FAIL;} - if (queue[clientId].msg != NULL) {return EBUSY;} - dbg("AMQueue", "AMQueue: request to send from %hhu (%p): passed checks\n", clientId, msg); - - queue[clientId].msg = msg; - call Packet.setPayloadLength(msg, len); + /** + * Accepts a properly formatted AM packet for later sending. + * Assumes that someone has filled in the AM packet fields + * (destination, AM type). + * + * @param msg - the message to send + * @param len - the length of the payload + * + */ + command error_t Send.send[uint8_t clientId](message_t* msg, + uint8_t len) { + if (clientId >= numClients) { + return FAIL; + } + if (queue[clientId].msg != NULL) { + return EBUSY; + } + dbg("AMQueue", "AMQueue: request to send from %hhu (%p): passed checks\n", clientId, msg); + + queue[clientId].msg = msg; + call Packet.setPayloadLength(msg, len); - if (current == QUEUE_EMPTY) { - error_t err; - am_id_t amId = call AMPacket.type(msg); - am_addr_t dest = call AMPacket.destination(msg); - - dbg("AMQueue", "%s: request to send from %hhu (%p): queue empty\n", __FUNCTION__, clientId, msg); - current = clientId; - - err = call AMSend.send[amId](dest, msg, len); - if (err != SUCCESS) { - dbg("AMQueue", "%s: underlying send failed.\n", __FUNCTION__); - current = QUEUE_EMPTY; - queue[clientId].msg = NULL; - } - return err; - } - else { - dbg("AMQueue", "AMQueue: request to send from %hhu (%p): queue not empty\n", clientId, msg); + if (current >= numClients) { // queue empty + error_t err; + am_id_t amId = call AMPacket.type(msg); + am_addr_t dest = call AMPacket.destination(msg); + + dbg("AMQueue", "%s: request to send from %hhu (%p): queue empty\n", __FUNCTION__, clientId, msg); + current = clientId; + + err = call AMSend.send[amId](dest, msg, len); + if (err != SUCCESS) { + dbg("AMQueue", "%s: underlying send failed.\n", __FUNCTION__); + current = numClients; + queue[clientId].msg = NULL; + + } + return err; + } + else { + dbg("AMQueue", "AMQueue: request to send from %hhu (%p): queue not empty\n", clientId, msg); + } + return SUCCESS; } - return SUCCESS; - } - command error_t Send.cancel[uint8_t clientId](message_t* msg) { - if (clientId > numClients || // Not a valid client - queue[clientId].msg == NULL || // No packet pending - queue[clientId].msg != msg) { // Not the right packet - return FAIL; + task void CancelTask() { + uint8_t i,j,mask,last; + message_t *msg; + for(i = 0; i < numClients/8 + 1; i++) { + if(cancelMask[i]) { + for(mask = 1, j = 0; j < 8; j++) { + if(cancelMask[i] & mask) { + last = i*8 + j; + msg = queue[last].msg; + queue[last].msg = NULL; + cancelMask[i] &= ~mask; + signal Send.sendDone[last](msg, ECANCEL); + } + mask <<= 1; + } + } + } } - if (current == clientId) { - am_id_t amId = call AMPacket.type(msg); - error_t err = call AMSend.cancel[amId](msg); - if (err == SUCCESS) { - // remove it from the queue - nextPacket(); - } - return err; + + command error_t Send.cancel[uint8_t clientId](message_t* msg) { + if (clientId >= numClients || // Not a valid client + queue[clientId].msg == NULL || // No packet pending + queue[clientId].msg != msg) { // Not the right packet + return FAIL; + } + if(current == clientId) { + am_id_t amId = call AMPacket.type(msg); + error_t err = call AMSend.cancel[amId](msg); + return err; + } + else { + cancelMask[clientId/8] |= 1 << clientId % 8; + post CancelTask(); + return SUCCESS; + } } - else { - queue[clientId].msg = NULL; - return SUCCESS; + + void sendDone(uint8_t last, message_t *msg, error_t err) { + queue[last].msg = NULL; + tryToSend(); + signal Send.sendDone[last](msg, err); } - } - task void errorTask() { - message_t* msg = queue[current].msg; - queue[current].msg = NULL; - signal Send.sendDone[current](msg, FAIL); - tryToSend(); - } + task void errorTask() { + sendDone(current, queue[current].msg, FAIL); + } - // NOTE: Increments current! - void tryToSend() { - nextPacket(); - if (current != QUEUE_EMPTY) { - error_t nextErr; - message_t* nextMsg = queue[current].msg; - am_id_t nextId = call AMPacket.type(nextMsg); - am_addr_t nextDest = call AMPacket.destination(nextMsg); - uint8_t len = call Packet.payloadLength(nextMsg); - nextErr = call AMSend.send[nextId](nextDest, nextMsg, len); - if (nextErr != SUCCESS) { - post errorTask(); - } + // NOTE: Increments current! + void tryToSend() { + nextPacket(); + if (current < numClients) { // queue not empty + error_t nextErr; + message_t* nextMsg = queue[current].msg; + am_id_t nextId = call AMPacket.type(nextMsg); + am_addr_t nextDest = call AMPacket.destination(nextMsg); + uint8_t len = call Packet.payloadLength(nextMsg); + nextErr = call AMSend.send[nextId](nextDest, nextMsg, len); + if(nextErr != SUCCESS) { + post errorTask(); + } + } } - } - event void AMSend.sendDone[am_id_t id](message_t* msg, error_t err) { - if (queue[current].msg == msg) { - uint8_t last = current; - dbg("PointerBug", "%s received send done for %p, signaling for %p.\n", __FUNCTION__, msg, queue[current].msg); - queue[last].msg = NULL; - tryToSend(); - signal Send.sendDone[last](msg, err); + event void AMSend.sendDone[am_id_t id](message_t* msg, error_t err) { + if(queue[current].msg == msg) { + sendDone(current, msg, err); + } + else { + dbg("PointerBug", "%s received send done for %p, signaling for %p.\n", + __FUNCTION__, msg, queue[current].msg); + } + } + + command uint8_t Send.maxPayloadLength[uint8_t id]() { + return call AMSend.maxPayloadLength[0](); } - } - - command uint8_t Send.maxPayloadLength[uint8_t id]() { - return call AMSend.maxPayloadLength[0](); - } - command void* Send.getPayload[uint8_t id](message_t* m) { - return call AMSend.getPayload[0](m); - } + command void* Send.getPayload[uint8_t id](message_t* m) { + return call AMSend.getPayload[0](m); + } - default event void Send.sendDone[uint8_t id](message_t* msg, error_t err) { - // Do nothing - } + default event void Send.sendDone[uint8_t id](message_t* msg, error_t err) { + // Do nothing + } } -- 2.39.2