]> oss.titaniummirror.com Git - tinyos-2.x.git/commitdiff
- pass errno across interface
authorandreaskoepke <andreaskoepke>
Mon, 21 Apr 2008 19:57:46 +0000 (19:57 +0000)
committerandreaskoepke <andreaskoepke>
Mon, 21 Apr 2008 19:57:46 +0000 (19:57 +0000)
- do not cancel threads that are not running
- exit on write failures (just as we do for read failures)

support/sdk/cpp/sf/basecomm.cpp
support/sdk/cpp/sf/basecomm.h
support/sdk/cpp/sf/packetbuffer.cpp
support/sdk/cpp/sf/packetbuffer.h
support/sdk/cpp/sf/serialcomm.cpp
support/sdk/cpp/sf/serialcomm.h
support/sdk/cpp/sf/sfpacket.cpp
support/sdk/cpp/sf/tcpcomm.cpp
support/sdk/cpp/sf/tcpcomm.h

index bad3e50f3c5b93d4e97a85f06affbc47cae13946..788349d6af8a2c8a69f3998c5cba806473fcc8b4 100644 (file)
@@ -46,7 +46,7 @@ BaseComm::~BaseComm()
 }
 
 /* all count bytes must be read before returning - blocking in that way... */
-int BaseComm::readFD(int fd, char *buffer, int count)
+int BaseComm::readFD(int fd, char *buffer, int count, int *err)
 {
     int actual = 0;
     while (count > 0)
@@ -54,6 +54,7 @@ int BaseComm::readFD(int fd, char *buffer, int count)
         int n = read(fd, buffer, count);
         if (n == -1)
         {
+            *err = errno;
             return -1;
         }
         if (n == 0)
@@ -68,7 +69,7 @@ int BaseComm::readFD(int fd, char *buffer, int count)
 }
 
 /* all count bytes must be written before returning - blocking in that way... */
-int BaseComm::writeFD(int fd, const char *buffer, int count)
+int BaseComm::writeFD(int fd, const char *buffer, int count, int *err)
 {
     int actual = 0;
     while (count > 0)
@@ -77,6 +78,7 @@ int BaseComm::writeFD(int fd, const char *buffer, int count)
         if(n == -1)
         {
             if(errno != 0) {
+                *err = errno;
                 return -1;
             }
             else {
index 3b43bda052735d120c5f8905d2f401621ec5b1ac..be45e222665d87f195e474369b6735ce8bd18d42 100644 (file)
@@ -41,10 +41,10 @@ public:
     virtual ~BaseComm();
 protected:
     /* performs blocking read on fd */
-    virtual int readFD(int fd, char *buffer, int count);
+    virtual int readFD(int fd, char *buffer, int count, int *err);
 
     /* performs blocking write on fd */
-    virtual int writeFD(int fd, const char *buffer, int count);
+    virtual int writeFD(int fd, const char *buffer, int count, int *err);
 };
 
 #endif
index 639835a4b1908c9a9e3f5ddfc0a0fb20b2cbc375..635f434e83ac7ca834776fc0707c07e0d6096624 100644 (file)
@@ -40,7 +40,6 @@ PacketBuffer::PacketBuffer()
     pthread_mutex_init(&buffer.lock, NULL);
     pthread_cond_init(&buffer.notempty, NULL);
     pthread_cond_init(&buffer.notfull, NULL);
-    buffer.size = 0;
 }
 
 
@@ -57,7 +56,6 @@ void PacketBuffer::clear() {
     pthread_mutex_lock(&buffer.lock);
     // clear
     buffer.container.clear();
-    buffer.size = 0;
     DEBUG("PacketBuffer::clear : cleared buffer and signal <notfull>")
     pthread_cond_signal(&buffer.notfull);
     pthread_mutex_unlock(&buffer.lock);
@@ -71,7 +69,7 @@ SFPacket PacketBuffer::dequeue()
     pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &buffer.lock);
     pthread_mutex_lock(&buffer.lock);
     // wait until buffer is _not_ empty
-    while(buffer.size == 0)
+    while(buffer.container.size() == 0)
     {
         DEBUG("PacketBuffer::dequeue : waiting until buffer is <notempty>")
         pthread_cond_wait(&buffer.notempty, &buffer.lock);
@@ -79,7 +77,6 @@ SFPacket PacketBuffer::dequeue()
     // dequeue
     packet = buffer.container.front();
     buffer.container.pop_front();
-    --buffer.size;
     DEBUG("PacketBuffer::dequeue : get from buffer and signal <notfull>")
     pthread_cond_signal(&buffer.notfull);
     pthread_cleanup_pop(1); 
@@ -93,13 +90,12 @@ bool PacketBuffer::enqueueFront(SFPacket &pPacket)
     pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &buffer.lock);
     pthread_mutex_lock(&buffer.lock);
     // wait until buffer is _not_ full
-    while(buffer.size >= cMaxBufferSize)
+    while(buffer.container.size() >= cMaxBufferSize)
     {
         DEBUG("PacketBuffer::enqueueFront : waiting until buffer is <notfull>")
         pthread_cond_wait(&buffer.notfull, &buffer.lock);
     }
     // enqueue
-    ++buffer.size;
     buffer.container.push_front(pPacket);
     DEBUG("PacketBuffer::enqueueFront : put in buffer and signal <notempty>")
     // signal that buffer is now not empty
@@ -115,13 +111,12 @@ bool PacketBuffer::enqueueBack(SFPacket &pPacket)
     pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &buffer.lock);
     pthread_mutex_lock(&buffer.lock);
     // wait until buffer is _not_ full
-    while(buffer.size >= cMaxBufferSize)
+    while(buffer.container.size() >= cMaxBufferSize)
     {
         DEBUG("PacketBuffer::enqueueBack : waiting until buffer is <notfull>")
         pthread_cond_wait(&buffer.notfull, &buffer.lock);
     }
     // enqueue
-    ++buffer.size;
     buffer.container.push_back(pPacket);
     DEBUG("PacketBuffer::enqueueBack : put in buffer and signal <notempty>")
     // signal that buffer is now not empty
@@ -135,8 +130,8 @@ bool PacketBuffer::isFull() {
   bool isFull = true;
   pthread_testcancel();
   pthread_mutex_lock(&buffer.lock);
-  if (buffer.size < cMaxBufferSize) {
-    isFull = false;
+  if (buffer.container.size() < cMaxBufferSize) {
+      isFull = false;
   }
   pthread_mutex_unlock(&buffer.lock);
   return isFull;
@@ -147,8 +142,8 @@ bool PacketBuffer::isEmpty() {
   bool isEmpty = true;
   pthread_testcancel();
   pthread_mutex_lock(&buffer.lock);
-  if (buffer.size > 0) {
-    isEmpty = false;
+  if (buffer.container.size() > 0) {
+      isEmpty = false;
   }
   pthread_mutex_unlock(&buffer.lock);
   return isEmpty;
index 8fef2696ea46fd16f9be2bd03e091245924be5e8..383859459a63aa47d7685199bf7ff3bd57b321f4 100644 (file)
@@ -37,7 +37,7 @@
 #include <list>
 #include "sfpacket.h"
 
-//#define DEBUG_PACKETBUFFER
+// #define DEBUG_PACKETBUFFER
 
 #undef DEBUG
 #ifdef DEBUG_PACKETBUFFER
@@ -51,7 +51,7 @@ class PacketBuffer
 {
 protected:
 
-  static const int cMaxBufferSize = 25;
+  static const unsigned cMaxBufferSize = 25;
 
   typedef std::list<SFPacket> container_t;
 
@@ -66,8 +66,6 @@ protected:
     pthread_cond_t notfull;
     // actual buffer 
     container_t container;
-    // number of packets in buffer
-    int size;
   } sharedBuffer_t;
 
   sharedBuffer_t buffer;
index 5920a7240259b0901db6c3f2a28ebf713be244b2..4479e18f6ebdb16a0c65681d9c8c2592094c94f5 100644 (file)
@@ -241,7 +241,7 @@ tcflag_t SerialComm::parseBaudrate(int requested)
     return baudrate;
 }
 
-SerialComm::SerialComm(const char* pDevice, int pBaudrate, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), droppedReadPacketCount(0), droppedWritePacketCount(0), readPacketCount(0), writtenPacketCount(0), badPacketCount(0), sumRetries(0), device(pDevice), baudrate(pBaudrate), errorReported(false), errorMsg(""), control(pControl)
+SerialComm::SerialComm(const char* pDevice, int pBaudrate, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), droppedReadPacketCount(0), droppedWritePacketCount(0), readPacketCount(0), writtenPacketCount(0), badPacketCount(0), sumRetries(0), device(pDevice), baudrate(pBaudrate), serialReadFD(-1), serialWriteFD(-1), errorReported(false), errorMsg(""), control(pControl)
 {
     writerThreadRunning = false;
     readerThreadRunning = false;
@@ -314,8 +314,8 @@ SerialComm::~SerialComm()
     pthread_mutex_destroy(&ack.lock);
     pthread_cond_destroy(&ack.received);
 
-    close(serialReadFD);
-    close(serialWriteFD);
+    if(serialReadFD > 2) close(serialReadFD);
+    if(serialWriteFD > 2) close(serialWriteFD);
 }
 
 int SerialComm::hdlcEncode(int count, const char* from, char *to) {
@@ -333,7 +333,7 @@ int SerialComm::hdlcEncode(int count, const char* from, char *to) {
     return offset;
 }
 
-int SerialComm::writeFD(int fd, const char *buffer, int count)
+int SerialComm::writeFD(int fd, const char *buffer, int count, int *err)
 {
     int cnt = 0;
     /*
@@ -343,8 +343,9 @@ int SerialComm::writeFD(int fd, const char *buffer, int count)
     }
     FD_CLR(serialWriteFD, &wfds);
      */
-    int tmpCnt = BaseComm::writeFD(fd, buffer, count);
+    int tmpCnt = BaseComm::writeFD(fd, buffer, count, err);
     if (tmpCnt < 0) {
+        *err = errno;
         return tmpCnt;
     }
     else {
@@ -356,7 +357,7 @@ int SerialComm::writeFD(int fd, const char *buffer, int count)
 
 /* Work around buggy usb serial driver (returns 0 when no data is
    available, independent of the blocking/non-blocking mode) */
-int SerialComm::readFD(int fd, char *buffer, int count, int maxCount)
+int SerialComm::readFD(int fd, char *buffer, int count, int maxCount, int *err)
 {
     int cnt = 0;
     timeval tvold;
@@ -376,6 +377,7 @@ int SerialComm::readFD(int fd, char *buffer, int count, int maxCount)
         select(0, NULL, NULL, NULL, &tv);
         int tmpCnt = read(fd, buffer, maxCount);
         if (tmpCnt < 0) {
+            *err = errno;
             return tmpCnt;
         }
         else {
@@ -387,14 +389,23 @@ int SerialComm::readFD(int fd, char *buffer, int count, int maxCount)
 
 char SerialComm::nextRaw() {
     char nextByte = 0;
+    int err = 0;
     if(rawFifo.tail < rawFifo.head) {
         nextByte = rawFifo.queue[rawFifo.tail++];
     }
     else {
         // fifo empty -- need to get some bytes
         rawFifo.tail = 0;
+        rawFifo.head = readFD(serialReadFD, rawFifo.queue, rawReadBytes, maxMTU-1, &err);
+        if(rawFifo.head < 0) {
+            close(serialReadFD);
+            close(serialWriteFD);
+            serialReadFD = -1;
+            serialWriteFD = -1;
+            errno = err;
+        }
         reportError("SerialComm::nextRaw: readFD(serialReadFD, rawFifo.queue, rawReadBytes, maxMTU-1)",
-                    rawFifo.head = readFD(serialReadFD, rawFifo.queue, rawReadBytes, maxMTU-1));
+                    rawFifo.head);
         nextByte = rawFifo.queue[rawFifo.tail++];
     }
     return nextByte;
@@ -545,11 +556,13 @@ bool SerialComm::readPacket(SFPacket &pPacket)
 /* writes packet */
 bool SerialComm::writePacket(SFPacket &pPacket)
 {
-    char type, byte;
+    char type, byte = 0;
     uint16_t crc = 0;
     char buffer[2*pPacket.getLength() + 20];
     int offset = 0;
-
+    int err = 0;
+    int written = 0;
+    
     // put SFD into buffer 
     buffer[offset++] = SYNC_BYTE;
 
@@ -562,7 +575,6 @@ bool SerialComm::writePacket(SFPacket &pPacket)
     byte = pPacket.getSeqno();
     crc = byteCRC(byte, crc);
     offset += hdlcEncode(1, &byte, buffer + offset);
-
     switch (type)
     {
     case SF_ACK:
@@ -587,7 +599,19 @@ bool SerialComm::writePacket(SFPacket &pPacket)
     
     // put SFD into buffer
     buffer[offset++] = SYNC_BYTE;
-    if(writeFD(serialWriteFD, buffer, offset) < offset)  {
+    written = writeFD(serialWriteFD, buffer, offset, &err);
+    if(written < 0) {
+        if(err != EINTR) {
+            close(serialReadFD);
+            serialReadFD = -1;
+            close(serialWriteFD);
+            serialWriteFD = -1;
+            errno = err;
+            reportError("SerialComm::writePacket failed",-1);
+            return false;
+        }
+    }
+    else if(written < offset) {
         DEBUG("SerialComm::writePacket failed");
         return false;
     }
@@ -737,7 +761,7 @@ void SerialComm::writeSerial()
 void SerialComm::cancel()
 {
     pthread_t callingThread = pthread_self();
-    if (pthread_equal(callingThread, readerThread))
+    if(readerThreadRunning && pthread_equal(callingThread, readerThread))
     {
         DEBUG("SerialComm::cancel : by readerThread")
         pthread_detach(readerThread);
@@ -752,7 +776,7 @@ void SerialComm::cancel()
        pthread_cond_signal(&control.cancel);
         pthread_exit(NULL);
     }
-    else if ((pthread_equal(callingThread, writerThread)))
+    else if(writerThreadRunning && pthread_equal(callingThread, writerThread))
     {
         DEBUG("SerialComm::cancel : by writerThread")
         pthread_detach(writerThread);
index e12409c4fa5fc515669cde3679a0447917ae7aa9..974984775d031369031d66190970a37242c32fc9 100644 (file)
@@ -199,10 +199,10 @@ protected:
     /**
      *  try to read at least count bytes in one go, but may read up to maxCount bytes.
      */
-    virtual int readFD(int fd, char *buffer, int count, int maxCount);
+    virtual int readFD(int fd, char *buffer, int count, int maxCount, int *err);
 
     /* enables byte escaping. overwrites method from base class.*/
-    virtual int writeFD(int fd, const char *buffer, int count);
+    virtual int writeFD(int fd, const char *buffer, int count, int *err);
 
     /* reads a packet (blocking) */
     bool readPacket(SFPacket &pPacket);
index 8fc555fa291b4fa89d75a3bae3a59bd6af862d9d..d3a6555a861fa4d9bf58ae669a58dc53adffb3fe 100644 (file)
@@ -44,6 +44,7 @@ SFPacket::SFPacket(const SFPacket &pPacket)
 {
     length = pPacket.getLength();
     type = pPacket.getType();
+    seqno = pPacket.getSeqno();
     setPayload(pPacket.getPayload(), length);
 }
 
index 85b221a9db6ac7c5c66cbb55eaca81a5ae140fdb..8e5efee52a1f6c2c08b60806ca3633bb33d56746 100644 (file)
@@ -57,7 +57,7 @@ void* writeClientsThread(void*);
 
 /* opens tcp server port for listening and start threads*/
 TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), errorReported(false), errorMsg(""), control(pControl)
-{
+{   
     // init values
     writerThreadRunning = false;
     readerThreadRunning = false;
@@ -67,6 +67,7 @@ TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffe
     readPacketCount = 0;
     writtenPacketCount = 0;
     port = pPort;
+    
     pthread_mutex_init(&clientInfo.sleeplock, NULL);
     pthread_mutex_init(&clientInfo.countlock, NULL);
     pthread_cond_init(&clientInfo.wakeup, NULL);
@@ -161,8 +162,9 @@ bool TCPComm::readPacket(int pFD, SFPacket &pPacket)
 {
     char l;
     char* buffer[SFPacket::getMaxPayloadLength()];
-
-    if (readFD(pFD, &l, 1) != 1)
+    int err;
+    
+    if (readFD(pFD, &l, 1, &err) != 1)
     {
         return false;
     }
@@ -170,7 +172,7 @@ bool TCPComm::readPacket(int pFD, SFPacket &pPacket)
     {
         return false;
     }
-    if (readFD(pFD, (char*) buffer, static_cast<int>(l)) != l)
+    if (readFD(pFD, (char*) buffer, static_cast<int>(l), &err) != l)
     {
         return false;
     }
@@ -184,7 +186,7 @@ bool TCPComm::readPacket(int pFD, SFPacket &pPacket)
     }
 }
 
-int TCPComm::writeFD(int fd, const char *buffer, int count)
+int TCPComm::writeFD(int fd, const char *buffer, int count, int *err)
 {
     int actual = 0;
     while (count > 0)
@@ -194,8 +196,8 @@ int TCPComm::writeFD(int fd, const char *buffer, int count)
 #else
         int n = send(fd, buffer, count, MSG_NOSIGNAL);
 #endif
-        if (n == -1)
-        {
+        if (n == -1) {
+            *err = errno;
             return -1;
         }
         count -= n;
@@ -209,11 +211,12 @@ int TCPComm::writeFD(int fd, const char *buffer, int count)
 bool TCPComm::writePacket(int pFD, SFPacket &pPacket)
 {
     char len = pPacket.getLength();
-    if (writeFD(pFD, &len, 1) != 1)
+    int err;
+    if (writeFD(pFD, &len, 1, &err) != 1)
     {
         return false;
     }
-    if (writeFD(pFD, pPacket.getPayload(), len) != len)
+    if (writeFD(pFD, pPacket.getPayload(), len, &err) != len)
     {
         return false;
     }
@@ -225,15 +228,16 @@ bool TCPComm::versionCheck(int clientFD)
 {
     char check[2], us[2];
     int version;
-
+    int err = 0;
     /* Indicate version and check if a TinyOS 2.0 serial forwarder on the other end */
     us[0] = 'U';
     us[1] = ' ';
-    if (writeFD(clientFD, us, 2) != 2)
+    
+    if (writeFD(clientFD, us, 2, &err) != 2)
     {
         return false;
     }
-    if (readFD(clientFD, check, 2) != 2)
+    if (readFD(clientFD, check, 2, &err) != 2)
     {
         return false;
     }
@@ -395,14 +399,12 @@ void TCPComm::readClients()
                 if (FD_ISSET(*it, &rfds))
                 {
                     SFPacket packet;
-                    if (readPacket(*it, packet))
-                    {
+                    if(readPacket(*it, packet)) {
                         // this call blocks until buffer is not full
                         readBuffer.enqueueBack(packet);
                         ++readPacketCount;
                     }
-                    else
-                    {
+                    else {
                         DEBUG("TCPComm::readClients : removeClient")
                         removeClient(*it);
                     }
index 35458587068dc25774975ac92c6de99890edeb08..54cff803def0192077d09e3a201de59e0c04266f 100644 (file)
@@ -138,7 +138,7 @@ private:
 
 protected:
     /* performs blocking write on fd */
-    virtual int writeFD(int fd, const char *buffer, int count);
+    virtual int writeFD(int fd, const char *buffer, int count, int *err);
 
     /* checks SF client protocol version */
     bool versionCheck(int clientFD);