X-Git-Url: https://oss.titaniummirror.com/gitweb/?a=blobdiff_plain;f=support%2Fsdk%2Fcpp%2Fsf%2Fserialcomm.cpp;h=a5869f1c6a0cd99dfedaf5076fdab60d02f96c07;hb=e9bfab607e051bae6afb47b44892ce37541d1b44;hp=ea4e4bfa142401c7a3d3735206aee5f680bc18bc;hpb=79eb6b24acb56f21deaf15f0235a20a3d20455bd;p=tinyos-2.x.git diff --git a/support/sdk/cpp/sf/serialcomm.cpp b/support/sdk/cpp/sf/serialcomm.cpp index ea4e4bfa..a5869f1c 100644 --- a/support/sdk/cpp/sf/serialcomm.cpp +++ b/support/sdk/cpp/sf/serialcomm.cpp @@ -35,6 +35,7 @@ #include #include +#include #include #include #include @@ -42,6 +43,7 @@ #include #include #include +#include using namespace std; @@ -241,7 +243,7 @@ tcflag_t SerialComm::parseBaudrate(int requested) return baudrate; } -SerialComm::SerialComm(const char* pDevice, int pBaudrate, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), droppedReadPacketCount(0), droppedWritePacketCount(0), readPacketCount(0), writtenPacketCount(0), badPacketCount(0), sumRetries(0), device(pDevice), baudrate(pBaudrate), errorReported(false), errorMsg(""), control(pControl) +SerialComm::SerialComm(const char* pDevice, int pBaudrate, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), droppedReadPacketCount(0), droppedWritePacketCount(0), readPacketCount(0), writtenPacketCount(0), badPacketCount(0), sumRetries(0), device(pDevice), baudrate(pBaudrate), serialReadFD(-1), serialWriteFD(-1), errorReported(false), errorMsg(""), control(pControl) { writerThreadRunning = false; readerThreadRunning = false; @@ -275,11 +277,11 @@ SerialComm::SerialComm(const char* pDevice, int pBaudrate, PacketBuffer &pReadBu newtio.c_oflag = 0; if ((tcflush(serialReadFD, TCIFLUSH) >= 0 && tcsetattr(serialReadFD, TCSANOW, &newtio) >= 0) - && (tcflush(serialWriteFD, TCIFLUSH) >= 0 && tcsetattr(serialWriteFD, TCSANOW, &newtio) >= 0) - && !errorReported) + && (tcflush(serialWriteFD, TCIFLUSH) >= 0 && tcsetattr(serialWriteFD, TCSANOW, &newtio) >= 0) + && !errorReported) { DEBUG("SerialComm::SerialComm : opened device "<< pDevice << " with baudrate = " << pBaudrate) - } + } else { close(serialReadFD); @@ -314,8 +316,8 @@ SerialComm::~SerialComm() pthread_mutex_destroy(&ack.lock); pthread_cond_destroy(&ack.received); - close(serialReadFD); - close(serialWriteFD); + if(serialReadFD > 2) close(serialReadFD); + if(serialWriteFD > 2) close(serialWriteFD); } int SerialComm::hdlcEncode(int count, const char* from, char *to) { @@ -333,18 +335,19 @@ int SerialComm::hdlcEncode(int count, const char* from, char *to) { return offset; } -int SerialComm::writeFD(int fd, const char *buffer, int count) +int SerialComm::writeFD(int fd, const char *buffer, int count, int *err) { int cnt = 0; /* - FD_SET(serialWriteFD, &wfds); - if(select(serialWriteFD + 1, NULL, &wfds, NULL, NULL) < 0) { - return -1; - } - FD_CLR(serialWriteFD, &wfds); - */ - int tmpCnt = BaseComm::writeFD(fd, buffer, count); + FD_SET(serialWriteFD, &wfds); + if(select(serialWriteFD + 1, NULL, &wfds, NULL, NULL) < 0) { + return -1; + } + FD_CLR(serialWriteFD, &wfds); + */ + int tmpCnt = BaseComm::writeFD(fd, buffer, count, err); if (tmpCnt < 0) { + *err = errno; return tmpCnt; } else { @@ -356,7 +359,7 @@ int SerialComm::writeFD(int fd, const char *buffer, int count) /* Work around buggy usb serial driver (returns 0 when no data is available, independent of the blocking/non-blocking mode) */ -int SerialComm::readFD(int fd, char *buffer, int count, int maxCount) +int SerialComm::readFD(int fd, char *buffer, int count, int maxCount, int *err) { int cnt = 0; timeval tvold; @@ -376,6 +379,7 @@ int SerialComm::readFD(int fd, char *buffer, int count, int maxCount) select(0, NULL, NULL, NULL, &tv); int tmpCnt = read(fd, buffer, maxCount); if (tmpCnt < 0) { + *err = errno; return tmpCnt; } else { @@ -387,14 +391,23 @@ int SerialComm::readFD(int fd, char *buffer, int count, int maxCount) char SerialComm::nextRaw() { char nextByte = 0; + int err = 0; if(rawFifo.tail < rawFifo.head) { nextByte = rawFifo.queue[rawFifo.tail++]; } else { // fifo empty -- need to get some bytes rawFifo.tail = 0; + rawFifo.head = readFD(serialReadFD, rawFifo.queue, rawReadBytes, maxMTU-1, &err); + if(rawFifo.head < 0) { + close(serialReadFD); + close(serialWriteFD); + serialReadFD = -1; + serialWriteFD = -1; + errno = err; + } reportError("SerialComm::nextRaw: readFD(serialReadFD, rawFifo.queue, rawReadBytes, maxMTU-1)", - rawFifo.head = readFD(serialReadFD, rawFifo.queue, rawReadBytes, maxMTU-1)); + rawFifo.head); nextByte = rawFifo.queue[rawFifo.tail++]; } return nextByte; @@ -403,153 +416,101 @@ char SerialComm::nextRaw() { /* reads packet */ bool SerialComm::readPacket(SFPacket &pPacket) { - bool sync = false; - bool escape = false; - bool completePacket = false; + uint8_t buffer[maxMTU + 10]; int count = 0; - uint16_t crc = 0; - char buffer[maxMTU]; - while(!completePacket) - { - buffer[count] = nextRaw(); - - if(sync && (count == 1) && (buffer[count] == SYNC_BYTE)) { - DEBUG("SerialComm::readPacket double sync byte"); - sync = false; - escape = false; - count = 1; - crc = 0; - buffer[0] = SYNC_BYTE; - } - - if (!sync) - { - // wait for sync - if (buffer[0] == SYNC_BYTE) - { - sync = true; - escape = false; - count = 1; - crc = 0; + rx_states_t state = WAIT_FOR_SYNC; + for(;;) { + uint8_t nextByte = nextRaw(); + if(state == WAIT_FOR_SYNC) { + if(nextByte == SYNC_BYTE) { + count = 0; + state = IN_SYNC; } } - else if (count >= maxMTU) - { - DEBUG("SerialComm::readPacket : frame too long - size = " << count << " : resynchronising") - sync = false; - escape = false; - count = crc = 0; - badPacketCount++; - } - else if (escape) - { - if (buffer[count] == SYNC_BYTE) - { - DEBUG("SerialComm::readPacket : resynchronising") - sync = false; - escape = false; - count = crc = 0; - badPacketCount++; - } - else - { - buffer[count] ^= 0x20; - if (count > 3) - { - crc = SerialComm::byteCRC(buffer[count-3], crc); + else if(state == IN_SYNC) { + if(nextByte == SYNC_BYTE) { + if(count < minMTU) { + DEBUG("SerialComm::readPacket : frame too short - size = " << count << " : resynchronising "); + badPacketCount++; + count = 0; + } + else { + bool dobreak = true; + DEBUG("SerialComm::readPacket : frame size = " << count); + if(checkCrc(buffer, count)) { + pPacket.setType(buffer[typeOffset]); + pPacket.setSeqno(buffer[seqnoOffset]); + switch (buffer[typeOffset]) { + case SF_ACK: + break; + case SF_PACKET_NO_ACK: + pPacket.setPayload((char *)(&buffer[payloadOffset]-1), count+1+1 - serialHeaderBytes); + break; + case SF_PACKET_ACK: + pPacket.setPayload((char *)(&buffer[payloadOffset]), count+1 - serialHeaderBytes); + break; + default: + dobreak = false; + DEBUG("SerialComm::readPacket : unknown packet type = " \ + << static_cast(buffer[typeOffset] & 0xff)); + break; + } + if(dobreak) break; // leave loop + } + else { + DEBUG("SerialComm::readPacket : bad crc"); + count = 0; + badPacketCount++; + } } - ++count; - escape = false; } - } - else if (buffer[count] == ESCAPE_BYTE) - { - // next byte is escaped - escape = true; - } - else if (buffer[count] == SYNC_BYTE) - { - // calculate last crc byte - if (count > 3) - { - crc = SerialComm::byteCRC(buffer[count-3], crc); + else if(nextByte == ESCAPE_BYTE) { + state = ESCAPED; } - uint16_t packetCRC = (buffer[count - 2] & 0xff) | ((buffer[count - 1] << 8) & 0xff00); - if (count < minMTU) - { - DEBUG("SerialComm::readPacket : frame too short - size = " << count << " : resynchronising ") - sync = false; - escape = false; - count = crc = 0; - badPacketCount++; + else { + buffer[count++] = nextByte; + if(count >= maxMTU) { + DEBUG("SerialComm::readPacket : packet too long, resynchronizing"); + count = 0; + badPacketCount++; + state = WAIT_FOR_SYNC; + } } - else if (crc != packetCRC) - { - DEBUG("SerialComm::readPacket : bad crc - calculated crc = " << crc << " packet crc = " << packetCRC << " : resynchronising " ) - sync = false; - escape = false; - count = crc = 0; - badPacketCount++; + } + else if(state == ESCAPED) { + if(nextByte == SYNC_BYTE) { + DEBUG("SerialComm::readPacket : state ESCAPED, packet got sync byte, resynchronizing"); + count = 0; + badPacketCount++; + state = IN_SYNC; } - else - { - pPacket.setType(buffer[typeOffset]); - pPacket.setSeqno(buffer[seqnoOffset]); - switch (buffer[typeOffset]) - { - case SF_ACK: - break; - case SF_PACKET_NO_ACK: - case SF_PACKET_ACK: - // buffer / payload - // FIXME: strange packet format!? because seqno is not really defined - missing :( - pPacket.setPayload(&buffer[payloadOffset]-1, count+1+1 - serialHeaderBytes); - break; - default: - DEBUG("SerialComm::readPacket : unknown packet type = " << static_cast(buffer[typeOffset] & 0xff)) - ; - } - completePacket = true; -#ifdef DEBUG_RAW_SERIALCOMM - - DEBUG("SerialComm::readPacket : raw data >>") - for (int j=0; j <= count; j++) - { - cout << std::hex << static_cast(buffer[j] & 0xff) << " " << std::dec; + else { + buffer[count++] = nextByte ^ 0x20; + if(count >= maxMTU) { + DEBUG("SerialComm::readPacket : state ESCAPED, packet too long, resynchronizing"); + count = 0; + badPacketCount++; + state = WAIT_FOR_SYNC; } - cout << endl; - cout << "as payload >> " << endl; - const char* ptr = pPacket.getPayload(); - for (int j=0; j < pPacket.getLength(); j++) - { - cout << std::hex << static_cast(ptr[j] & 0xff) << " " << std::dec; + else { + state = IN_SYNC; } - cout << endl; -#endif - - } - } - else - { - if (count > 3) - { - crc = SerialComm::byteCRC(buffer[count-3], crc); } - ++count; } } return true; } - /* writes packet */ bool SerialComm::writePacket(SFPacket &pPacket) { - char type, byte; + char type, byte = 0; uint16_t crc = 0; char buffer[2*pPacket.getLength() + 20]; int offset = 0; - + int err = 0; + int written = 0; + // put SFD into buffer buffer[offset++] = SYNC_BYTE; @@ -562,7 +523,6 @@ bool SerialComm::writePacket(SFPacket &pPacket) byte = pPacket.getSeqno(); crc = byteCRC(byte, crc); offset += hdlcEncode(1, &byte, buffer + offset); - switch (type) { case SF_ACK: @@ -587,7 +547,19 @@ bool SerialComm::writePacket(SFPacket &pPacket) // put SFD into buffer buffer[offset++] = SYNC_BYTE; - if(writeFD(serialWriteFD, buffer, offset) < offset) { + written = writeFD(serialWriteFD, buffer, offset, &err); + if(written < 0) { + if(err != EINTR) { + close(serialReadFD); + serialReadFD = -1; + close(serialWriteFD); + serialWriteFD = -1; + errno = err; + reportError("SerialComm::writePacket failed",-1); + return false; + } + } + else if(written < offset) { DEBUG("SerialComm::writePacket failed"); return false; } @@ -619,33 +591,37 @@ void SerialComm::readSerial() SFPacket packet; readPacket(packet); switch (packet.getType()) - { - case SF_ACK: + { + case SF_ACK: // successful delivery // FIXME: seqnos are not implemented on the node ! pthread_cond_signal(&ack.received); break; - case SF_PACKET_ACK: - { - // put ack in front of queue - SFPacket ack(SF_ACK, packet.getSeqno()); - writeBuffer.enqueueFront(ack); - } - case SF_PACKET_NO_ACK: + case SF_PACKET_ACK: + { + // put ack in front of queue + SFPacket ack(SF_ACK, packet.getSeqno()); + writeBuffer.enqueueFront(ack); + } + case SF_PACKET_NO_ACK: // do nothing - fall through - default: + default: if (!readBuffer.isFull()) - { + { ++readPacketCount; // put silently into buffer... readBuffer.enqueueBack(packet); - } + } else - { - ++droppedReadPacketCount; + { + while(readBuffer.isFull()) { + readBuffer.dequeue(); + ++droppedReadPacketCount; + } + readBuffer.enqueueBack(packet); // DEBUG("SerialComm::readSerial : dropped packet") - } - } + } + } } } @@ -667,33 +643,34 @@ void SerialComm::writeSerial() while (true) { if (!retry) - { + { + cerr << " serial deqeue packet, empty: " << writeBuffer.isEmpty() << endl; packet = writeBuffer.dequeue(); - } + } switch (packet.getType()) - { - case SF_ACK: + { + case SF_ACK: // successful delivery if (!writePacket(packet)) - { + { DEBUG("SerialComm::writeSerial : writePacket failed (SF_ACK)") - reportError("SerialComm::writeSerial : writePacket(SF_ACK)", -1); - } + reportError("SerialComm::writeSerial : writePacket(SF_ACK)", -1); + } break; - case SF_PACKET_ACK: + case SF_PACKET_ACK: // do nothing - fall through - case SF_PACKET_NO_ACK: + case SF_PACKET_NO_ACK: // do nothing - fall through - default: + default: if (!retry) ++writtenPacketCount; // FIXME: this is the only currently supported type by the mote packet.setType(SF_PACKET_ACK); if (!writePacket(packet)) - { + { DEBUG("SerialComm::writeSerial : writePacket failed (SF_PACKET)") - reportError("SerialComm::writeSerial : writeFD(SF_PACKET)", -1); - } + reportError("SerialComm::writeSerial : writeFD(SF_PACKET)", -1); + } // wait for ack... struct timeval currentTime; struct timespec ackTime; @@ -706,24 +683,24 @@ void SerialComm::writeSerial() ackTime.tv_sec = currentTime.tv_sec; ackTime.tv_nsec = currentTime.tv_usec * 1000; - ackTime.tv_sec += timeout / (1000*1000*1000); - ackTime.tv_nsec += timeout % (1000*1000*1000); + ackTime.tv_sec += timeout / (1000*1000*1000); + ackTime.tv_nsec += timeout % (1000*1000*1000); pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &ack.lock); int retval = pthread_cond_timedwait(&ack.received, &ack.lock, &ackTime); if (!((retryCount < maxRetries) && (retval == ETIMEDOUT))) - { - if (retryCount >= maxRetries) ++droppedWritePacketCount; + { + if (retryCount >= maxRetries) ++droppedWritePacketCount; retry = false; retryCount = 0; - } + } else - { + { ++retryCount; retry = true; DEBUG("SerialComm::writeSerial : packet retryCount = " << retryCount); - ++sumRetries; - } + ++sumRetries; + } // removes the cleanup handler and executes it (unlock mutex) pthread_cleanup_pop(1); } } @@ -733,54 +710,54 @@ void SerialComm::writeSerial() void SerialComm::cancel() { pthread_t callingThread = pthread_self(); - if (pthread_equal(callingThread, readerThread)) + if(readerThreadRunning && pthread_equal(callingThread, readerThread)) { DEBUG("SerialComm::cancel : by readerThread") - pthread_detach(readerThread); + pthread_detach(readerThread); if (writerThreadRunning) - { + { pthread_cancel(writerThread); DEBUG("SerialComm::cancel : writerThread canceled, joining") - pthread_join(writerThread, NULL); + pthread_join(writerThread, NULL); writerThreadRunning = false; - } + } readerThreadRunning = false; - pthread_cond_signal(&control.cancel); + pthread_cond_signal(&control.cancel); pthread_exit(NULL); } - else if ((pthread_equal(callingThread, writerThread))) + else if(writerThreadRunning && pthread_equal(callingThread, writerThread)) { DEBUG("SerialComm::cancel : by writerThread") - pthread_detach(writerThread); + pthread_detach(writerThread); if (readerThreadRunning) - { + { pthread_cancel(readerThread); DEBUG("SerialComm::cancel : readerThread canceled, joining") - pthread_join(readerThread, NULL); + pthread_join(readerThread, NULL); readerThreadRunning = false; - } + } writerThreadRunning = false; - pthread_cond_signal(&control.cancel); + pthread_cond_signal(&control.cancel); pthread_exit(NULL); } else { DEBUG("SerialComm::cancel : by other thread") - if (readerThreadRunning) - { - pthread_cancel(readerThread); - DEBUG("SerialComm::cancel : readerThread canceled, joining") - pthread_join(readerThread, NULL); - readerThreadRunning = false; - } + if (readerThreadRunning) + { + pthread_cancel(readerThread); + DEBUG("SerialComm::cancel : readerThread canceled, joining") + pthread_join(readerThread, NULL); + readerThreadRunning = false; + } if (writerThreadRunning) - { + { pthread_cancel(writerThread); DEBUG("SerialComm::cancel : writerThread canceled, joining") - pthread_join(writerThread, NULL); + pthread_join(writerThread, NULL); writerThreadRunning = false; - } - pthread_cond_signal(&control.cancel); + } + pthread_cond_signal(&control.cancel); } } @@ -790,10 +767,10 @@ int SerialComm::reportError(const char *msg, int result) if ((result < 0) && (!errorReported)) { errorMsg << "error : SF-Server ( SerialComm on device = " << device << " ) : " - << msg << " ( result = " << result << " )" << endl - << "error-description : " << strerror(errno) << endl; + << msg << " ( result = " << result << " )" << endl + << "error-description : " << strerror(errno) << endl; - cerr << errorMsg.str(); + cerr << errorMsg.str(); errorReported = true; cancel(); }