From f2ec0361a8796350bde71e0611e16be06ba22be0 Mon Sep 17 00:00:00 2001 From: andreaskoepke Date: Mon, 21 Apr 2008 19:57:46 +0000 Subject: [PATCH] - pass errno across interface - do not cancel threads that are not running - exit on write failures (just as we do for read failures) --- support/sdk/cpp/sf/basecomm.cpp | 6 ++-- support/sdk/cpp/sf/basecomm.h | 4 +-- support/sdk/cpp/sf/packetbuffer.cpp | 19 ++++------- support/sdk/cpp/sf/packetbuffer.h | 6 ++-- support/sdk/cpp/sf/serialcomm.cpp | 50 +++++++++++++++++++++-------- support/sdk/cpp/sf/serialcomm.h | 4 +-- support/sdk/cpp/sf/sfpacket.cpp | 1 + support/sdk/cpp/sf/tcpcomm.cpp | 34 +++++++++++--------- support/sdk/cpp/sf/tcpcomm.h | 2 +- 9 files changed, 74 insertions(+), 52 deletions(-) diff --git a/support/sdk/cpp/sf/basecomm.cpp b/support/sdk/cpp/sf/basecomm.cpp index bad3e50f..788349d6 100644 --- a/support/sdk/cpp/sf/basecomm.cpp +++ b/support/sdk/cpp/sf/basecomm.cpp @@ -46,7 +46,7 @@ BaseComm::~BaseComm() } /* all count bytes must be read before returning - blocking in that way... */ -int BaseComm::readFD(int fd, char *buffer, int count) +int BaseComm::readFD(int fd, char *buffer, int count, int *err) { int actual = 0; while (count > 0) @@ -54,6 +54,7 @@ int BaseComm::readFD(int fd, char *buffer, int count) int n = read(fd, buffer, count); if (n == -1) { + *err = errno; return -1; } if (n == 0) @@ -68,7 +69,7 @@ int BaseComm::readFD(int fd, char *buffer, int count) } /* all count bytes must be written before returning - blocking in that way... */ -int BaseComm::writeFD(int fd, const char *buffer, int count) +int BaseComm::writeFD(int fd, const char *buffer, int count, int *err) { int actual = 0; while (count > 0) @@ -77,6 +78,7 @@ int BaseComm::writeFD(int fd, const char *buffer, int count) if(n == -1) { if(errno != 0) { + *err = errno; return -1; } else { diff --git a/support/sdk/cpp/sf/basecomm.h b/support/sdk/cpp/sf/basecomm.h index 3b43bda0..be45e222 100644 --- a/support/sdk/cpp/sf/basecomm.h +++ b/support/sdk/cpp/sf/basecomm.h @@ -41,10 +41,10 @@ public: virtual ~BaseComm(); protected: /* performs blocking read on fd */ - virtual int readFD(int fd, char *buffer, int count); + virtual int readFD(int fd, char *buffer, int count, int *err); /* performs blocking write on fd */ - virtual int writeFD(int fd, const char *buffer, int count); + virtual int writeFD(int fd, const char *buffer, int count, int *err); }; #endif diff --git a/support/sdk/cpp/sf/packetbuffer.cpp b/support/sdk/cpp/sf/packetbuffer.cpp index 639835a4..635f434e 100644 --- a/support/sdk/cpp/sf/packetbuffer.cpp +++ b/support/sdk/cpp/sf/packetbuffer.cpp @@ -40,7 +40,6 @@ PacketBuffer::PacketBuffer() pthread_mutex_init(&buffer.lock, NULL); pthread_cond_init(&buffer.notempty, NULL); pthread_cond_init(&buffer.notfull, NULL); - buffer.size = 0; } @@ -57,7 +56,6 @@ void PacketBuffer::clear() { pthread_mutex_lock(&buffer.lock); // clear buffer.container.clear(); - buffer.size = 0; DEBUG("PacketBuffer::clear : cleared buffer and signal ") pthread_cond_signal(&buffer.notfull); pthread_mutex_unlock(&buffer.lock); @@ -71,7 +69,7 @@ SFPacket PacketBuffer::dequeue() pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &buffer.lock); pthread_mutex_lock(&buffer.lock); // wait until buffer is _not_ empty - while(buffer.size == 0) + while(buffer.container.size() == 0) { DEBUG("PacketBuffer::dequeue : waiting until buffer is ") pthread_cond_wait(&buffer.notempty, &buffer.lock); @@ -79,7 +77,6 @@ SFPacket PacketBuffer::dequeue() // dequeue packet = buffer.container.front(); buffer.container.pop_front(); - --buffer.size; DEBUG("PacketBuffer::dequeue : get from buffer and signal ") pthread_cond_signal(&buffer.notfull); pthread_cleanup_pop(1); @@ -93,13 +90,12 @@ bool PacketBuffer::enqueueFront(SFPacket &pPacket) pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &buffer.lock); pthread_mutex_lock(&buffer.lock); // wait until buffer is _not_ full - while(buffer.size >= cMaxBufferSize) + while(buffer.container.size() >= cMaxBufferSize) { DEBUG("PacketBuffer::enqueueFront : waiting until buffer is ") pthread_cond_wait(&buffer.notfull, &buffer.lock); } // enqueue - ++buffer.size; buffer.container.push_front(pPacket); DEBUG("PacketBuffer::enqueueFront : put in buffer and signal ") // signal that buffer is now not empty @@ -115,13 +111,12 @@ bool PacketBuffer::enqueueBack(SFPacket &pPacket) pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &buffer.lock); pthread_mutex_lock(&buffer.lock); // wait until buffer is _not_ full - while(buffer.size >= cMaxBufferSize) + while(buffer.container.size() >= cMaxBufferSize) { DEBUG("PacketBuffer::enqueueBack : waiting until buffer is ") pthread_cond_wait(&buffer.notfull, &buffer.lock); } // enqueue - ++buffer.size; buffer.container.push_back(pPacket); DEBUG("PacketBuffer::enqueueBack : put in buffer and signal ") // signal that buffer is now not empty @@ -135,8 +130,8 @@ bool PacketBuffer::isFull() { bool isFull = true; pthread_testcancel(); pthread_mutex_lock(&buffer.lock); - if (buffer.size < cMaxBufferSize) { - isFull = false; + if (buffer.container.size() < cMaxBufferSize) { + isFull = false; } pthread_mutex_unlock(&buffer.lock); return isFull; @@ -147,8 +142,8 @@ bool PacketBuffer::isEmpty() { bool isEmpty = true; pthread_testcancel(); pthread_mutex_lock(&buffer.lock); - if (buffer.size > 0) { - isEmpty = false; + if (buffer.container.size() > 0) { + isEmpty = false; } pthread_mutex_unlock(&buffer.lock); return isEmpty; diff --git a/support/sdk/cpp/sf/packetbuffer.h b/support/sdk/cpp/sf/packetbuffer.h index 8fef2696..38385945 100644 --- a/support/sdk/cpp/sf/packetbuffer.h +++ b/support/sdk/cpp/sf/packetbuffer.h @@ -37,7 +37,7 @@ #include #include "sfpacket.h" -//#define DEBUG_PACKETBUFFER +// #define DEBUG_PACKETBUFFER #undef DEBUG #ifdef DEBUG_PACKETBUFFER @@ -51,7 +51,7 @@ class PacketBuffer { protected: - static const int cMaxBufferSize = 25; + static const unsigned cMaxBufferSize = 25; typedef std::list container_t; @@ -66,8 +66,6 @@ protected: pthread_cond_t notfull; // actual buffer container_t container; - // number of packets in buffer - int size; } sharedBuffer_t; sharedBuffer_t buffer; diff --git a/support/sdk/cpp/sf/serialcomm.cpp b/support/sdk/cpp/sf/serialcomm.cpp index 5920a724..4479e18f 100644 --- a/support/sdk/cpp/sf/serialcomm.cpp +++ b/support/sdk/cpp/sf/serialcomm.cpp @@ -241,7 +241,7 @@ tcflag_t SerialComm::parseBaudrate(int requested) return baudrate; } -SerialComm::SerialComm(const char* pDevice, int pBaudrate, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), droppedReadPacketCount(0), droppedWritePacketCount(0), readPacketCount(0), writtenPacketCount(0), badPacketCount(0), sumRetries(0), device(pDevice), baudrate(pBaudrate), errorReported(false), errorMsg(""), control(pControl) +SerialComm::SerialComm(const char* pDevice, int pBaudrate, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), droppedReadPacketCount(0), droppedWritePacketCount(0), readPacketCount(0), writtenPacketCount(0), badPacketCount(0), sumRetries(0), device(pDevice), baudrate(pBaudrate), serialReadFD(-1), serialWriteFD(-1), errorReported(false), errorMsg(""), control(pControl) { writerThreadRunning = false; readerThreadRunning = false; @@ -314,8 +314,8 @@ SerialComm::~SerialComm() pthread_mutex_destroy(&ack.lock); pthread_cond_destroy(&ack.received); - close(serialReadFD); - close(serialWriteFD); + if(serialReadFD > 2) close(serialReadFD); + if(serialWriteFD > 2) close(serialWriteFD); } int SerialComm::hdlcEncode(int count, const char* from, char *to) { @@ -333,7 +333,7 @@ int SerialComm::hdlcEncode(int count, const char* from, char *to) { return offset; } -int SerialComm::writeFD(int fd, const char *buffer, int count) +int SerialComm::writeFD(int fd, const char *buffer, int count, int *err) { int cnt = 0; /* @@ -343,8 +343,9 @@ int SerialComm::writeFD(int fd, const char *buffer, int count) } FD_CLR(serialWriteFD, &wfds); */ - int tmpCnt = BaseComm::writeFD(fd, buffer, count); + int tmpCnt = BaseComm::writeFD(fd, buffer, count, err); if (tmpCnt < 0) { + *err = errno; return tmpCnt; } else { @@ -356,7 +357,7 @@ int SerialComm::writeFD(int fd, const char *buffer, int count) /* Work around buggy usb serial driver (returns 0 when no data is available, independent of the blocking/non-blocking mode) */ -int SerialComm::readFD(int fd, char *buffer, int count, int maxCount) +int SerialComm::readFD(int fd, char *buffer, int count, int maxCount, int *err) { int cnt = 0; timeval tvold; @@ -376,6 +377,7 @@ int SerialComm::readFD(int fd, char *buffer, int count, int maxCount) select(0, NULL, NULL, NULL, &tv); int tmpCnt = read(fd, buffer, maxCount); if (tmpCnt < 0) { + *err = errno; return tmpCnt; } else { @@ -387,14 +389,23 @@ int SerialComm::readFD(int fd, char *buffer, int count, int maxCount) char SerialComm::nextRaw() { char nextByte = 0; + int err = 0; if(rawFifo.tail < rawFifo.head) { nextByte = rawFifo.queue[rawFifo.tail++]; } else { // fifo empty -- need to get some bytes rawFifo.tail = 0; + rawFifo.head = readFD(serialReadFD, rawFifo.queue, rawReadBytes, maxMTU-1, &err); + if(rawFifo.head < 0) { + close(serialReadFD); + close(serialWriteFD); + serialReadFD = -1; + serialWriteFD = -1; + errno = err; + } reportError("SerialComm::nextRaw: readFD(serialReadFD, rawFifo.queue, rawReadBytes, maxMTU-1)", - rawFifo.head = readFD(serialReadFD, rawFifo.queue, rawReadBytes, maxMTU-1)); + rawFifo.head); nextByte = rawFifo.queue[rawFifo.tail++]; } return nextByte; @@ -545,11 +556,13 @@ bool SerialComm::readPacket(SFPacket &pPacket) /* writes packet */ bool SerialComm::writePacket(SFPacket &pPacket) { - char type, byte; + char type, byte = 0; uint16_t crc = 0; char buffer[2*pPacket.getLength() + 20]; int offset = 0; - + int err = 0; + int written = 0; + // put SFD into buffer buffer[offset++] = SYNC_BYTE; @@ -562,7 +575,6 @@ bool SerialComm::writePacket(SFPacket &pPacket) byte = pPacket.getSeqno(); crc = byteCRC(byte, crc); offset += hdlcEncode(1, &byte, buffer + offset); - switch (type) { case SF_ACK: @@ -587,7 +599,19 @@ bool SerialComm::writePacket(SFPacket &pPacket) // put SFD into buffer buffer[offset++] = SYNC_BYTE; - if(writeFD(serialWriteFD, buffer, offset) < offset) { + written = writeFD(serialWriteFD, buffer, offset, &err); + if(written < 0) { + if(err != EINTR) { + close(serialReadFD); + serialReadFD = -1; + close(serialWriteFD); + serialWriteFD = -1; + errno = err; + reportError("SerialComm::writePacket failed",-1); + return false; + } + } + else if(written < offset) { DEBUG("SerialComm::writePacket failed"); return false; } @@ -737,7 +761,7 @@ void SerialComm::writeSerial() void SerialComm::cancel() { pthread_t callingThread = pthread_self(); - if (pthread_equal(callingThread, readerThread)) + if(readerThreadRunning && pthread_equal(callingThread, readerThread)) { DEBUG("SerialComm::cancel : by readerThread") pthread_detach(readerThread); @@ -752,7 +776,7 @@ void SerialComm::cancel() pthread_cond_signal(&control.cancel); pthread_exit(NULL); } - else if ((pthread_equal(callingThread, writerThread))) + else if(writerThreadRunning && pthread_equal(callingThread, writerThread)) { DEBUG("SerialComm::cancel : by writerThread") pthread_detach(writerThread); diff --git a/support/sdk/cpp/sf/serialcomm.h b/support/sdk/cpp/sf/serialcomm.h index e12409c4..97498477 100644 --- a/support/sdk/cpp/sf/serialcomm.h +++ b/support/sdk/cpp/sf/serialcomm.h @@ -199,10 +199,10 @@ protected: /** * try to read at least count bytes in one go, but may read up to maxCount bytes. */ - virtual int readFD(int fd, char *buffer, int count, int maxCount); + virtual int readFD(int fd, char *buffer, int count, int maxCount, int *err); /* enables byte escaping. overwrites method from base class.*/ - virtual int writeFD(int fd, const char *buffer, int count); + virtual int writeFD(int fd, const char *buffer, int count, int *err); /* reads a packet (blocking) */ bool readPacket(SFPacket &pPacket); diff --git a/support/sdk/cpp/sf/sfpacket.cpp b/support/sdk/cpp/sf/sfpacket.cpp index 8fc555fa..d3a6555a 100644 --- a/support/sdk/cpp/sf/sfpacket.cpp +++ b/support/sdk/cpp/sf/sfpacket.cpp @@ -44,6 +44,7 @@ SFPacket::SFPacket(const SFPacket &pPacket) { length = pPacket.getLength(); type = pPacket.getType(); + seqno = pPacket.getSeqno(); setPayload(pPacket.getPayload(), length); } diff --git a/support/sdk/cpp/sf/tcpcomm.cpp b/support/sdk/cpp/sf/tcpcomm.cpp index 85b221a9..8e5efee5 100644 --- a/support/sdk/cpp/sf/tcpcomm.cpp +++ b/support/sdk/cpp/sf/tcpcomm.cpp @@ -57,7 +57,7 @@ void* writeClientsThread(void*); /* opens tcp server port for listening and start threads*/ TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), errorReported(false), errorMsg(""), control(pControl) -{ +{ // init values writerThreadRunning = false; readerThreadRunning = false; @@ -67,6 +67,7 @@ TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffe readPacketCount = 0; writtenPacketCount = 0; port = pPort; + pthread_mutex_init(&clientInfo.sleeplock, NULL); pthread_mutex_init(&clientInfo.countlock, NULL); pthread_cond_init(&clientInfo.wakeup, NULL); @@ -161,8 +162,9 @@ bool TCPComm::readPacket(int pFD, SFPacket &pPacket) { char l; char* buffer[SFPacket::getMaxPayloadLength()]; - - if (readFD(pFD, &l, 1) != 1) + int err; + + if (readFD(pFD, &l, 1, &err) != 1) { return false; } @@ -170,7 +172,7 @@ bool TCPComm::readPacket(int pFD, SFPacket &pPacket) { return false; } - if (readFD(pFD, (char*) buffer, static_cast(l)) != l) + if (readFD(pFD, (char*) buffer, static_cast(l), &err) != l) { return false; } @@ -184,7 +186,7 @@ bool TCPComm::readPacket(int pFD, SFPacket &pPacket) } } -int TCPComm::writeFD(int fd, const char *buffer, int count) +int TCPComm::writeFD(int fd, const char *buffer, int count, int *err) { int actual = 0; while (count > 0) @@ -194,8 +196,8 @@ int TCPComm::writeFD(int fd, const char *buffer, int count) #else int n = send(fd, buffer, count, MSG_NOSIGNAL); #endif - if (n == -1) - { + if (n == -1) { + *err = errno; return -1; } count -= n; @@ -209,11 +211,12 @@ int TCPComm::writeFD(int fd, const char *buffer, int count) bool TCPComm::writePacket(int pFD, SFPacket &pPacket) { char len = pPacket.getLength(); - if (writeFD(pFD, &len, 1) != 1) + int err; + if (writeFD(pFD, &len, 1, &err) != 1) { return false; } - if (writeFD(pFD, pPacket.getPayload(), len) != len) + if (writeFD(pFD, pPacket.getPayload(), len, &err) != len) { return false; } @@ -225,15 +228,16 @@ bool TCPComm::versionCheck(int clientFD) { char check[2], us[2]; int version; - + int err = 0; /* Indicate version and check if a TinyOS 2.0 serial forwarder on the other end */ us[0] = 'U'; us[1] = ' '; - if (writeFD(clientFD, us, 2) != 2) + + if (writeFD(clientFD, us, 2, &err) != 2) { return false; } - if (readFD(clientFD, check, 2) != 2) + if (readFD(clientFD, check, 2, &err) != 2) { return false; } @@ -395,14 +399,12 @@ void TCPComm::readClients() if (FD_ISSET(*it, &rfds)) { SFPacket packet; - if (readPacket(*it, packet)) - { + if(readPacket(*it, packet)) { // this call blocks until buffer is not full readBuffer.enqueueBack(packet); ++readPacketCount; } - else - { + else { DEBUG("TCPComm::readClients : removeClient") removeClient(*it); } diff --git a/support/sdk/cpp/sf/tcpcomm.h b/support/sdk/cpp/sf/tcpcomm.h index 35458587..54cff803 100644 --- a/support/sdk/cpp/sf/tcpcomm.h +++ b/support/sdk/cpp/sf/tcpcomm.h @@ -138,7 +138,7 @@ private: protected: /* performs blocking write on fd */ - virtual int writeFD(int fd, const char *buffer, int count); + virtual int writeFD(int fd, const char *buffer, int count, int *err); /* checks SF client protocol version */ bool versionCheck(int clientFD); -- 2.39.2