summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
ab3fe89)
- do not cancel threads that are not running
- exit on write failures (just as we do for read failures)
}
/* all count bytes must be read before returning - blocking in that way... */
}
/* all count bytes must be read before returning - blocking in that way... */
-int BaseComm::readFD(int fd, char *buffer, int count)
+int BaseComm::readFD(int fd, char *buffer, int count, int *err)
{
int actual = 0;
while (count > 0)
{
int actual = 0;
while (count > 0)
int n = read(fd, buffer, count);
if (n == -1)
{
int n = read(fd, buffer, count);
if (n == -1)
{
}
/* all count bytes must be written before returning - blocking in that way... */
}
/* all count bytes must be written before returning - blocking in that way... */
-int BaseComm::writeFD(int fd, const char *buffer, int count)
+int BaseComm::writeFD(int fd, const char *buffer, int count, int *err)
{
int actual = 0;
while (count > 0)
{
int actual = 0;
while (count > 0)
if(n == -1)
{
if(errno != 0) {
if(n == -1)
{
if(errno != 0) {
virtual ~BaseComm();
protected:
/* performs blocking read on fd */
virtual ~BaseComm();
protected:
/* performs blocking read on fd */
- virtual int readFD(int fd, char *buffer, int count);
+ virtual int readFD(int fd, char *buffer, int count, int *err);
/* performs blocking write on fd */
/* performs blocking write on fd */
- virtual int writeFD(int fd, const char *buffer, int count);
+ virtual int writeFD(int fd, const char *buffer, int count, int *err);
pthread_mutex_init(&buffer.lock, NULL);
pthread_cond_init(&buffer.notempty, NULL);
pthread_cond_init(&buffer.notfull, NULL);
pthread_mutex_init(&buffer.lock, NULL);
pthread_cond_init(&buffer.notempty, NULL);
pthread_cond_init(&buffer.notfull, NULL);
pthread_mutex_lock(&buffer.lock);
// clear
buffer.container.clear();
pthread_mutex_lock(&buffer.lock);
// clear
buffer.container.clear();
DEBUG("PacketBuffer::clear : cleared buffer and signal <notfull>")
pthread_cond_signal(&buffer.notfull);
pthread_mutex_unlock(&buffer.lock);
DEBUG("PacketBuffer::clear : cleared buffer and signal <notfull>")
pthread_cond_signal(&buffer.notfull);
pthread_mutex_unlock(&buffer.lock);
pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &buffer.lock);
pthread_mutex_lock(&buffer.lock);
// wait until buffer is _not_ empty
pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &buffer.lock);
pthread_mutex_lock(&buffer.lock);
// wait until buffer is _not_ empty
- while(buffer.size == 0)
+ while(buffer.container.size() == 0)
{
DEBUG("PacketBuffer::dequeue : waiting until buffer is <notempty>")
pthread_cond_wait(&buffer.notempty, &buffer.lock);
{
DEBUG("PacketBuffer::dequeue : waiting until buffer is <notempty>")
pthread_cond_wait(&buffer.notempty, &buffer.lock);
// dequeue
packet = buffer.container.front();
buffer.container.pop_front();
// dequeue
packet = buffer.container.front();
buffer.container.pop_front();
DEBUG("PacketBuffer::dequeue : get from buffer and signal <notfull>")
pthread_cond_signal(&buffer.notfull);
pthread_cleanup_pop(1);
DEBUG("PacketBuffer::dequeue : get from buffer and signal <notfull>")
pthread_cond_signal(&buffer.notfull);
pthread_cleanup_pop(1);
pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &buffer.lock);
pthread_mutex_lock(&buffer.lock);
// wait until buffer is _not_ full
pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &buffer.lock);
pthread_mutex_lock(&buffer.lock);
// wait until buffer is _not_ full
- while(buffer.size >= cMaxBufferSize)
+ while(buffer.container.size() >= cMaxBufferSize)
{
DEBUG("PacketBuffer::enqueueFront : waiting until buffer is <notfull>")
pthread_cond_wait(&buffer.notfull, &buffer.lock);
}
// enqueue
{
DEBUG("PacketBuffer::enqueueFront : waiting until buffer is <notfull>")
pthread_cond_wait(&buffer.notfull, &buffer.lock);
}
// enqueue
buffer.container.push_front(pPacket);
DEBUG("PacketBuffer::enqueueFront : put in buffer and signal <notempty>")
// signal that buffer is now not empty
buffer.container.push_front(pPacket);
DEBUG("PacketBuffer::enqueueFront : put in buffer and signal <notempty>")
// signal that buffer is now not empty
pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &buffer.lock);
pthread_mutex_lock(&buffer.lock);
// wait until buffer is _not_ full
pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &buffer.lock);
pthread_mutex_lock(&buffer.lock);
// wait until buffer is _not_ full
- while(buffer.size >= cMaxBufferSize)
+ while(buffer.container.size() >= cMaxBufferSize)
{
DEBUG("PacketBuffer::enqueueBack : waiting until buffer is <notfull>")
pthread_cond_wait(&buffer.notfull, &buffer.lock);
}
// enqueue
{
DEBUG("PacketBuffer::enqueueBack : waiting until buffer is <notfull>")
pthread_cond_wait(&buffer.notfull, &buffer.lock);
}
// enqueue
buffer.container.push_back(pPacket);
DEBUG("PacketBuffer::enqueueBack : put in buffer and signal <notempty>")
// signal that buffer is now not empty
buffer.container.push_back(pPacket);
DEBUG("PacketBuffer::enqueueBack : put in buffer and signal <notempty>")
// signal that buffer is now not empty
bool isFull = true;
pthread_testcancel();
pthread_mutex_lock(&buffer.lock);
bool isFull = true;
pthread_testcancel();
pthread_mutex_lock(&buffer.lock);
- if (buffer.size < cMaxBufferSize) {
- isFull = false;
+ if (buffer.container.size() < cMaxBufferSize) {
+ isFull = false;
}
pthread_mutex_unlock(&buffer.lock);
return isFull;
}
pthread_mutex_unlock(&buffer.lock);
return isFull;
bool isEmpty = true;
pthread_testcancel();
pthread_mutex_lock(&buffer.lock);
bool isEmpty = true;
pthread_testcancel();
pthread_mutex_lock(&buffer.lock);
- if (buffer.size > 0) {
- isEmpty = false;
+ if (buffer.container.size() > 0) {
+ isEmpty = false;
}
pthread_mutex_unlock(&buffer.lock);
return isEmpty;
}
pthread_mutex_unlock(&buffer.lock);
return isEmpty;
#include <list>
#include "sfpacket.h"
#include <list>
#include "sfpacket.h"
-//#define DEBUG_PACKETBUFFER
+// #define DEBUG_PACKETBUFFER
#undef DEBUG
#ifdef DEBUG_PACKETBUFFER
#undef DEBUG
#ifdef DEBUG_PACKETBUFFER
- static const int cMaxBufferSize = 25;
+ static const unsigned cMaxBufferSize = 25;
typedef std::list<SFPacket> container_t;
typedef std::list<SFPacket> container_t;
pthread_cond_t notfull;
// actual buffer
container_t container;
pthread_cond_t notfull;
// actual buffer
container_t container;
- // number of packets in buffer
- int size;
} sharedBuffer_t;
sharedBuffer_t buffer;
} sharedBuffer_t;
sharedBuffer_t buffer;
-SerialComm::SerialComm(const char* pDevice, int pBaudrate, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), droppedReadPacketCount(0), droppedWritePacketCount(0), readPacketCount(0), writtenPacketCount(0), badPacketCount(0), sumRetries(0), device(pDevice), baudrate(pBaudrate), errorReported(false), errorMsg(""), control(pControl)
+SerialComm::SerialComm(const char* pDevice, int pBaudrate, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), droppedReadPacketCount(0), droppedWritePacketCount(0), readPacketCount(0), writtenPacketCount(0), badPacketCount(0), sumRetries(0), device(pDevice), baudrate(pBaudrate), serialReadFD(-1), serialWriteFD(-1), errorReported(false), errorMsg(""), control(pControl)
{
writerThreadRunning = false;
readerThreadRunning = false;
{
writerThreadRunning = false;
readerThreadRunning = false;
pthread_mutex_destroy(&ack.lock);
pthread_cond_destroy(&ack.received);
pthread_mutex_destroy(&ack.lock);
pthread_cond_destroy(&ack.received);
- close(serialReadFD);
- close(serialWriteFD);
+ if(serialReadFD > 2) close(serialReadFD);
+ if(serialWriteFD > 2) close(serialWriteFD);
}
int SerialComm::hdlcEncode(int count, const char* from, char *to) {
}
int SerialComm::hdlcEncode(int count, const char* from, char *to) {
-int SerialComm::writeFD(int fd, const char *buffer, int count)
+int SerialComm::writeFD(int fd, const char *buffer, int count, int *err)
}
FD_CLR(serialWriteFD, &wfds);
*/
}
FD_CLR(serialWriteFD, &wfds);
*/
- int tmpCnt = BaseComm::writeFD(fd, buffer, count);
+ int tmpCnt = BaseComm::writeFD(fd, buffer, count, err);
/* Work around buggy usb serial driver (returns 0 when no data is
available, independent of the blocking/non-blocking mode) */
/* Work around buggy usb serial driver (returns 0 when no data is
available, independent of the blocking/non-blocking mode) */
-int SerialComm::readFD(int fd, char *buffer, int count, int maxCount)
+int SerialComm::readFD(int fd, char *buffer, int count, int maxCount, int *err)
{
int cnt = 0;
timeval tvold;
{
int cnt = 0;
timeval tvold;
select(0, NULL, NULL, NULL, &tv);
int tmpCnt = read(fd, buffer, maxCount);
if (tmpCnt < 0) {
select(0, NULL, NULL, NULL, &tv);
int tmpCnt = read(fd, buffer, maxCount);
if (tmpCnt < 0) {
char SerialComm::nextRaw() {
char nextByte = 0;
char SerialComm::nextRaw() {
char nextByte = 0;
if(rawFifo.tail < rawFifo.head) {
nextByte = rawFifo.queue[rawFifo.tail++];
}
else {
// fifo empty -- need to get some bytes
rawFifo.tail = 0;
if(rawFifo.tail < rawFifo.head) {
nextByte = rawFifo.queue[rawFifo.tail++];
}
else {
// fifo empty -- need to get some bytes
rawFifo.tail = 0;
+ rawFifo.head = readFD(serialReadFD, rawFifo.queue, rawReadBytes, maxMTU-1, &err);
+ if(rawFifo.head < 0) {
+ close(serialReadFD);
+ close(serialWriteFD);
+ serialReadFD = -1;
+ serialWriteFD = -1;
+ errno = err;
+ }
reportError("SerialComm::nextRaw: readFD(serialReadFD, rawFifo.queue, rawReadBytes, maxMTU-1)",
reportError("SerialComm::nextRaw: readFD(serialReadFD, rawFifo.queue, rawReadBytes, maxMTU-1)",
- rawFifo.head = readFD(serialReadFD, rawFifo.queue, rawReadBytes, maxMTU-1));
nextByte = rawFifo.queue[rawFifo.tail++];
}
return nextByte;
nextByte = rawFifo.queue[rawFifo.tail++];
}
return nextByte;
/* writes packet */
bool SerialComm::writePacket(SFPacket &pPacket)
{
/* writes packet */
bool SerialComm::writePacket(SFPacket &pPacket)
{
uint16_t crc = 0;
char buffer[2*pPacket.getLength() + 20];
int offset = 0;
uint16_t crc = 0;
char buffer[2*pPacket.getLength() + 20];
int offset = 0;
+ int err = 0;
+ int written = 0;
+
// put SFD into buffer
buffer[offset++] = SYNC_BYTE;
// put SFD into buffer
buffer[offset++] = SYNC_BYTE;
byte = pPacket.getSeqno();
crc = byteCRC(byte, crc);
offset += hdlcEncode(1, &byte, buffer + offset);
byte = pPacket.getSeqno();
crc = byteCRC(byte, crc);
offset += hdlcEncode(1, &byte, buffer + offset);
switch (type)
{
case SF_ACK:
switch (type)
{
case SF_ACK:
// put SFD into buffer
buffer[offset++] = SYNC_BYTE;
// put SFD into buffer
buffer[offset++] = SYNC_BYTE;
- if(writeFD(serialWriteFD, buffer, offset) < offset) {
+ written = writeFD(serialWriteFD, buffer, offset, &err);
+ if(written < 0) {
+ if(err != EINTR) {
+ close(serialReadFD);
+ serialReadFD = -1;
+ close(serialWriteFD);
+ serialWriteFD = -1;
+ errno = err;
+ reportError("SerialComm::writePacket failed",-1);
+ return false;
+ }
+ }
+ else if(written < offset) {
DEBUG("SerialComm::writePacket failed");
return false;
}
DEBUG("SerialComm::writePacket failed");
return false;
}
void SerialComm::cancel()
{
pthread_t callingThread = pthread_self();
void SerialComm::cancel()
{
pthread_t callingThread = pthread_self();
- if (pthread_equal(callingThread, readerThread))
+ if(readerThreadRunning && pthread_equal(callingThread, readerThread))
{
DEBUG("SerialComm::cancel : by readerThread")
pthread_detach(readerThread);
{
DEBUG("SerialComm::cancel : by readerThread")
pthread_detach(readerThread);
pthread_cond_signal(&control.cancel);
pthread_exit(NULL);
}
pthread_cond_signal(&control.cancel);
pthread_exit(NULL);
}
- else if ((pthread_equal(callingThread, writerThread)))
+ else if(writerThreadRunning && pthread_equal(callingThread, writerThread))
{
DEBUG("SerialComm::cancel : by writerThread")
pthread_detach(writerThread);
{
DEBUG("SerialComm::cancel : by writerThread")
pthread_detach(writerThread);
/**
* try to read at least count bytes in one go, but may read up to maxCount bytes.
*/
/**
* try to read at least count bytes in one go, but may read up to maxCount bytes.
*/
- virtual int readFD(int fd, char *buffer, int count, int maxCount);
+ virtual int readFD(int fd, char *buffer, int count, int maxCount, int *err);
/* enables byte escaping. overwrites method from base class.*/
/* enables byte escaping. overwrites method from base class.*/
- virtual int writeFD(int fd, const char *buffer, int count);
+ virtual int writeFD(int fd, const char *buffer, int count, int *err);
/* reads a packet (blocking) */
bool readPacket(SFPacket &pPacket);
/* reads a packet (blocking) */
bool readPacket(SFPacket &pPacket);
{
length = pPacket.getLength();
type = pPacket.getType();
{
length = pPacket.getLength();
type = pPacket.getType();
+ seqno = pPacket.getSeqno();
setPayload(pPacket.getPayload(), length);
}
setPayload(pPacket.getPayload(), length);
}
/* opens tcp server port for listening and start threads*/
TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), errorReported(false), errorMsg(""), control(pControl)
/* opens tcp server port for listening and start threads*/
TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), errorReported(false), errorMsg(""), control(pControl)
// init values
writerThreadRunning = false;
readerThreadRunning = false;
// init values
writerThreadRunning = false;
readerThreadRunning = false;
readPacketCount = 0;
writtenPacketCount = 0;
port = pPort;
readPacketCount = 0;
writtenPacketCount = 0;
port = pPort;
pthread_mutex_init(&clientInfo.sleeplock, NULL);
pthread_mutex_init(&clientInfo.countlock, NULL);
pthread_cond_init(&clientInfo.wakeup, NULL);
pthread_mutex_init(&clientInfo.sleeplock, NULL);
pthread_mutex_init(&clientInfo.countlock, NULL);
pthread_cond_init(&clientInfo.wakeup, NULL);
{
char l;
char* buffer[SFPacket::getMaxPayloadLength()];
{
char l;
char* buffer[SFPacket::getMaxPayloadLength()];
-
- if (readFD(pFD, &l, 1) != 1)
+ int err;
+
+ if (readFD(pFD, &l, 1, &err) != 1)
- if (readFD(pFD, (char*) buffer, static_cast<int>(l)) != l)
+ if (readFD(pFD, (char*) buffer, static_cast<int>(l), &err) != l)
-int TCPComm::writeFD(int fd, const char *buffer, int count)
+int TCPComm::writeFD(int fd, const char *buffer, int count, int *err)
{
int actual = 0;
while (count > 0)
{
int actual = 0;
while (count > 0)
#else
int n = send(fd, buffer, count, MSG_NOSIGNAL);
#endif
#else
int n = send(fd, buffer, count, MSG_NOSIGNAL);
#endif
+ if (n == -1) {
+ *err = errno;
bool TCPComm::writePacket(int pFD, SFPacket &pPacket)
{
char len = pPacket.getLength();
bool TCPComm::writePacket(int pFD, SFPacket &pPacket)
{
char len = pPacket.getLength();
- if (writeFD(pFD, &len, 1) != 1)
+ int err;
+ if (writeFD(pFD, &len, 1, &err) != 1)
- if (writeFD(pFD, pPacket.getPayload(), len) != len)
+ if (writeFD(pFD, pPacket.getPayload(), len, &err) != len)
{
char check[2], us[2];
int version;
{
char check[2], us[2];
int version;
/* Indicate version and check if a TinyOS 2.0 serial forwarder on the other end */
us[0] = 'U';
us[1] = ' ';
/* Indicate version and check if a TinyOS 2.0 serial forwarder on the other end */
us[0] = 'U';
us[1] = ' ';
- if (writeFD(clientFD, us, 2) != 2)
+
+ if (writeFD(clientFD, us, 2, &err) != 2)
- if (readFD(clientFD, check, 2) != 2)
+ if (readFD(clientFD, check, 2, &err) != 2)
if (FD_ISSET(*it, &rfds))
{
SFPacket packet;
if (FD_ISSET(*it, &rfds))
{
SFPacket packet;
- if (readPacket(*it, packet))
- {
+ if(readPacket(*it, packet)) {
// this call blocks until buffer is not full
readBuffer.enqueueBack(packet);
++readPacketCount;
}
// this call blocks until buffer is not full
readBuffer.enqueueBack(packet);
++readPacketCount;
}
DEBUG("TCPComm::readClients : removeClient")
removeClient(*it);
}
DEBUG("TCPComm::readClients : removeClient")
removeClient(*it);
}
protected:
/* performs blocking write on fd */
protected:
/* performs blocking write on fd */
- virtual int writeFD(int fd, const char *buffer, int count);
+ virtual int writeFD(int fd, const char *buffer, int count, int *err);
/* checks SF client protocol version */
bool versionCheck(int clientFD);
/* checks SF client protocol version */
bool versionCheck(int clientFD);