#include <ctime>
#include <cstdlib>
+#include <cstring>
#include <iostream>
#include <fcntl.h>
#include <termios.h>
#include <sstream>
#include <sys/time.h>
#include <errno.h>
+#include <stdint.h>
using namespace std;
newtio.c_oflag = 0;
if ((tcflush(serialReadFD, TCIFLUSH) >= 0 && tcsetattr(serialReadFD, TCSANOW, &newtio) >= 0)
- && (tcflush(serialWriteFD, TCIFLUSH) >= 0 && tcsetattr(serialWriteFD, TCSANOW, &newtio) >= 0)
- && !errorReported)
+ && (tcflush(serialWriteFD, TCIFLUSH) >= 0 && tcsetattr(serialWriteFD, TCSANOW, &newtio) >= 0)
+ && !errorReported)
{
DEBUG("SerialComm::SerialComm : opened device "<< pDevice << " with baudrate = " << pBaudrate)
- }
+ }
else
{
close(serialReadFD);
{
int cnt = 0;
/*
- FD_SET(serialWriteFD, &wfds);
- if(select(serialWriteFD + 1, NULL, &wfds, NULL, NULL) < 0) {
- return -1;
- }
- FD_CLR(serialWriteFD, &wfds);
- */
+ FD_SET(serialWriteFD, &wfds);
+ if(select(serialWriteFD + 1, NULL, &wfds, NULL, NULL) < 0) {
+ return -1;
+ }
+ FD_CLR(serialWriteFD, &wfds);
+ */
int tmpCnt = BaseComm::writeFD(fd, buffer, count, err);
if (tmpCnt < 0) {
*err = errno;
/* reads packet */
bool SerialComm::readPacket(SFPacket &pPacket)
{
- bool sync = false;
- bool escape = false;
- bool completePacket = false;
+ uint8_t buffer[maxMTU + 10];
int count = 0;
- uint16_t crc = 0;
- char buffer[maxMTU];
- while(!completePacket)
- {
- buffer[count] = nextRaw();
-
- if(sync && (count == 1) && (buffer[count] == SYNC_BYTE)) {
- DEBUG("SerialComm::readPacket double sync byte");
- sync = false;
- escape = false;
- count = 1;
- crc = 0;
- buffer[0] = SYNC_BYTE;
- }
-
- if (!sync)
- {
- // wait for sync
- if (buffer[0] == SYNC_BYTE)
- {
- sync = true;
- escape = false;
- count = 1;
- crc = 0;
+ rx_states_t state = WAIT_FOR_SYNC;
+ for(;;) {
+ uint8_t nextByte = nextRaw();
+ if(state == WAIT_FOR_SYNC) {
+ if(nextByte == SYNC_BYTE) {
+ count = 0;
+ state = IN_SYNC;
}
}
- else if (count >= maxMTU)
- {
- DEBUG("SerialComm::readPacket : frame too long - size = " << count << " : resynchronising")
- sync = false;
- escape = false;
- count = crc = 0;
- badPacketCount++;
- }
- else if (escape)
- {
- if (buffer[count] == SYNC_BYTE)
- {
- DEBUG("SerialComm::readPacket : resynchronising")
- sync = false;
- escape = false;
- count = crc = 0;
- badPacketCount++;
- }
- else
- {
- buffer[count] ^= 0x20;
- if (count > 3)
- {
- crc = SerialComm::byteCRC(buffer[count-3], crc);
+ else if(state == IN_SYNC) {
+ if(nextByte == SYNC_BYTE) {
+ if(count < minMTU) {
+ DEBUG("SerialComm::readPacket : frame too short - size = " << count << " : resynchronising ");
+ badPacketCount++;
+ count = 0;
+ }
+ else {
+ if(checkCrc(buffer, count)) {
+ pPacket.setType(buffer[typeOffset]);
+ pPacket.setSeqno(buffer[seqnoOffset]);
+ switch (buffer[typeOffset]) {
+ case SF_ACK:
+ break;
+ case SF_PACKET_NO_ACK:
+ case SF_PACKET_ACK:
+ pPacket.setPayload((char *)(&buffer[payloadOffset]-1), count+1+1 - serialHeaderBytes);
+ break;
+ default:
+ DEBUG("SerialComm::readPacket : unknown packet type = " \
+ << static_cast<uint16_t>(buffer[typeOffset] & 0xff));
+ break;
+ }
+ break; // leave loop
+ }
+ else {
+ DEBUG("SerialComm::readPacket : bad crc");
+ count = 0;
+ badPacketCount++;
+ }
}
- ++count;
- escape = false;
}
- }
- else if (buffer[count] == ESCAPE_BYTE)
- {
- // next byte is escaped
- escape = true;
- }
- else if (buffer[count] == SYNC_BYTE)
- {
- // calculate last crc byte
- if (count > 3)
- {
- crc = SerialComm::byteCRC(buffer[count-3], crc);
+ else if(nextByte == ESCAPE_BYTE) {
+ state = ESCAPED;
}
- uint16_t packetCRC = (buffer[count - 2] & 0xff) | ((buffer[count - 1] << 8) & 0xff00);
- if (count < minMTU)
- {
- DEBUG("SerialComm::readPacket : frame too short - size = " << count << " : resynchronising ")
- sync = false;
- escape = false;
- count = crc = 0;
- badPacketCount++;
+ else {
+ buffer[count++] = nextByte;
+ if(count >= maxMTU) {
+ DEBUG("SerialComm::readPacket : packet too long, resynchronizing");
+ count = 0;
+ badPacketCount++;
+ state = WAIT_FOR_SYNC;
+ }
}
- else if (crc != packetCRC)
- {
- DEBUG("SerialComm::readPacket : bad crc - calculated crc = " << crc << " packet crc = " << packetCRC << " : resynchronising " )
- sync = false;
- escape = false;
- count = crc = 0;
- badPacketCount++;
+ }
+ else if(state == ESCAPED) {
+ if(nextByte == SYNC_BYTE) {
+ DEBUG("SerialComm::readPacket : state ESCAPED, packet got sync byte, resynchronizing");
+ count = 0;
+ badPacketCount++;
+ state = IN_SYNC;
}
- else
- {
- pPacket.setType(buffer[typeOffset]);
- pPacket.setSeqno(buffer[seqnoOffset]);
- switch (buffer[typeOffset])
- {
- case SF_ACK:
- break;
- case SF_PACKET_NO_ACK:
- case SF_PACKET_ACK:
- // buffer / payload
- // FIXME: strange packet format!? because seqno is not really defined - missing :(
- pPacket.setPayload(&buffer[payloadOffset]-1, count+1+1 - serialHeaderBytes);
- break;
- default:
- DEBUG("SerialComm::readPacket : unknown packet type = " << static_cast<uint16_t>(buffer[typeOffset] & 0xff))
- ;
+ else {
+ buffer[count++] = nextByte ^ 0x20;
+ if(count >= maxMTU) {
+ DEBUG("SerialComm::readPacket : state ESCAPED, packet too long, resynchronizing");
+ count = 0;
+ badPacketCount++;
+ state = WAIT_FOR_SYNC;
}
- completePacket = true;
-#ifdef DEBUG_RAW_SERIALCOMM
-
- DEBUG("SerialComm::readPacket : raw data >>")
- for (int j=0; j <= count; j++)
- {
- cout << std::hex << static_cast<uint16_t>(buffer[j] & 0xff) << " " << std::dec;
- }
- cout << endl;
- cout << "as payload >> " << endl;
- const char* ptr = pPacket.getPayload();
- for (int j=0; j < pPacket.getLength(); j++)
- {
- cout << std::hex << static_cast<uint16_t>(ptr[j] & 0xff) << " " << std::dec;
+ else {
+ state = IN_SYNC;
}
- cout << endl;
-#endif
-
}
}
- else
- {
- if (count > 3)
- {
- crc = SerialComm::byteCRC(buffer[count-3], crc);
- }
- ++count;
- }
}
return true;
}
-
/* writes packet */
bool SerialComm::writePacket(SFPacket &pPacket)
{
SFPacket packet;
readPacket(packet);
switch (packet.getType())
- {
- case SF_ACK:
+ {
+ case SF_ACK:
// successful delivery
// FIXME: seqnos are not implemented on the node !
pthread_cond_signal(&ack.received);
break;
- case SF_PACKET_ACK:
- {
- // put ack in front of queue
- SFPacket ack(SF_ACK, packet.getSeqno());
- writeBuffer.enqueueFront(ack);
- }
- case SF_PACKET_NO_ACK:
+ case SF_PACKET_ACK:
+ {
+ // put ack in front of queue
+ SFPacket ack(SF_ACK, packet.getSeqno());
+ writeBuffer.enqueueFront(ack);
+ }
+ case SF_PACKET_NO_ACK:
// do nothing - fall through
- default:
+ default:
if (!readBuffer.isFull())
- {
+ {
++readPacketCount;
// put silently into buffer...
readBuffer.enqueueBack(packet);
- }
+ }
else
- {
+ {
while(readBuffer.isFull()) {
readBuffer.dequeue();
++droppedReadPacketCount;
}
readBuffer.enqueueBack(packet);
// DEBUG("SerialComm::readSerial : dropped packet")
- }
- }
+ }
+ }
}
}
while (true)
{
if (!retry)
- {
+ {
+ cerr << " serial deqeue packet, empty: " << writeBuffer.isEmpty() << endl;
packet = writeBuffer.dequeue();
- }
+ }
switch (packet.getType())
- {
- case SF_ACK:
+ {
+ case SF_ACK:
// successful delivery
if (!writePacket(packet))
- {
+ {
DEBUG("SerialComm::writeSerial : writePacket failed (SF_ACK)")
- reportError("SerialComm::writeSerial : writePacket(SF_ACK)", -1);
- }
+ reportError("SerialComm::writeSerial : writePacket(SF_ACK)", -1);
+ }
break;
- case SF_PACKET_ACK:
+ case SF_PACKET_ACK:
// do nothing - fall through
- case SF_PACKET_NO_ACK:
+ case SF_PACKET_NO_ACK:
// do nothing - fall through
- default:
+ default:
if (!retry)
++writtenPacketCount;
// FIXME: this is the only currently supported type by the mote
packet.setType(SF_PACKET_ACK);
if (!writePacket(packet))
- {
+ {
DEBUG("SerialComm::writeSerial : writePacket failed (SF_PACKET)")
- reportError("SerialComm::writeSerial : writeFD(SF_PACKET)", -1);
- }
+ reportError("SerialComm::writeSerial : writeFD(SF_PACKET)", -1);
+ }
// wait for ack...
struct timeval currentTime;
struct timespec ackTime;
ackTime.tv_sec = currentTime.tv_sec;
ackTime.tv_nsec = currentTime.tv_usec * 1000;
- ackTime.tv_sec += timeout / (1000*1000*1000);
- ackTime.tv_nsec += timeout % (1000*1000*1000);
+ ackTime.tv_sec += timeout / (1000*1000*1000);
+ ackTime.tv_nsec += timeout % (1000*1000*1000);
pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &ack.lock);
int retval = pthread_cond_timedwait(&ack.received, &ack.lock, &ackTime);
if (!((retryCount < maxRetries) && (retval == ETIMEDOUT)))
- {
- if (retryCount >= maxRetries) ++droppedWritePacketCount;
+ {
+ if (retryCount >= maxRetries) ++droppedWritePacketCount;
retry = false;
retryCount = 0;
- }
+ }
else
- {
+ {
++retryCount;
retry = true;
DEBUG("SerialComm::writeSerial : packet retryCount = " << retryCount);
- ++sumRetries;
- }
+ ++sumRetries;
+ }
// removes the cleanup handler and executes it (unlock mutex)
pthread_cleanup_pop(1); }
}
if(readerThreadRunning && pthread_equal(callingThread, readerThread))
{
DEBUG("SerialComm::cancel : by readerThread")
- pthread_detach(readerThread);
+ pthread_detach(readerThread);
if (writerThreadRunning)
- {
+ {
pthread_cancel(writerThread);
DEBUG("SerialComm::cancel : writerThread canceled, joining")
- pthread_join(writerThread, NULL);
+ pthread_join(writerThread, NULL);
writerThreadRunning = false;
- }
+ }
readerThreadRunning = false;
- pthread_cond_signal(&control.cancel);
+ pthread_cond_signal(&control.cancel);
pthread_exit(NULL);
}
else if(writerThreadRunning && pthread_equal(callingThread, writerThread))
{
DEBUG("SerialComm::cancel : by writerThread")
- pthread_detach(writerThread);
+ pthread_detach(writerThread);
if (readerThreadRunning)
- {
+ {
pthread_cancel(readerThread);
DEBUG("SerialComm::cancel : readerThread canceled, joining")
- pthread_join(readerThread, NULL);
+ pthread_join(readerThread, NULL);
readerThreadRunning = false;
- }
+ }
writerThreadRunning = false;
- pthread_cond_signal(&control.cancel);
+ pthread_cond_signal(&control.cancel);
pthread_exit(NULL);
}
else
{
DEBUG("SerialComm::cancel : by other thread")
- if (readerThreadRunning)
- {
- pthread_cancel(readerThread);
- DEBUG("SerialComm::cancel : readerThread canceled, joining")
- pthread_join(readerThread, NULL);
- readerThreadRunning = false;
- }
+ if (readerThreadRunning)
+ {
+ pthread_cancel(readerThread);
+ DEBUG("SerialComm::cancel : readerThread canceled, joining")
+ pthread_join(readerThread, NULL);
+ readerThreadRunning = false;
+ }
if (writerThreadRunning)
- {
+ {
pthread_cancel(writerThread);
DEBUG("SerialComm::cancel : writerThread canceled, joining")
- pthread_join(writerThread, NULL);
+ pthread_join(writerThread, NULL);
writerThreadRunning = false;
- }
- pthread_cond_signal(&control.cancel);
+ }
+ pthread_cond_signal(&control.cancel);
}
}
if ((result < 0) && (!errorReported))
{
errorMsg << "error : SF-Server ( SerialComm on device = " << device << " ) : "
- << msg << " ( result = " << result << " )" << endl
- << "error-description : " << strerror(errno) << endl;
+ << msg << " ( result = " << result << " )" << endl
+ << "error-description : " << strerror(errno) << endl;
- cerr << errorMsg.str();
+ cerr << errorMsg.str();
errorReported = true;
cancel();
}