From 79eb6b24acb56f21deaf15f0235a20a3d20455bd Mon Sep 17 00:00:00 2001 From: andreaskoepke Date: Fri, 6 Jul 2007 20:29:12 +0000 Subject: [PATCH] re-implementation of C serial forwarder in C++ --- support/sdk/cpp/sf/Makefile | 60 ++ support/sdk/cpp/sf/README.txt | 157 ++++++ support/sdk/cpp/sf/basecomm.cpp | 92 ++++ support/sdk/cpp/sf/basecomm.h | 50 ++ support/sdk/cpp/sf/packetbuffer.cpp | 172 ++++++ support/sdk/cpp/sf/packetbuffer.h | 96 ++++ support/sdk/cpp/sf/serialcomm.cpp | 815 ++++++++++++++++++++++++++++ support/sdk/cpp/sf/serialcomm.h | 242 +++++++++ support/sdk/cpp/sf/serialprotocol.h | 18 + support/sdk/cpp/sf/sf.cpp | 62 +++ support/sdk/cpp/sf/sfcontrol.cpp | 671 +++++++++++++++++++++++ support/sdk/cpp/sf/sfcontrol.h | 151 ++++++ support/sdk/cpp/sf/sfpacket.cpp | 141 +++++ support/sdk/cpp/sf/sfpacket.h | 117 ++++ support/sdk/cpp/sf/sharedinfo.h | 46 ++ support/sdk/cpp/sf/tcpcomm.cpp | 546 +++++++++++++++++++ support/sdk/cpp/sf/tcpcomm.h | 187 +++++++ 17 files changed, 3623 insertions(+) create mode 100644 support/sdk/cpp/sf/Makefile create mode 100644 support/sdk/cpp/sf/README.txt create mode 100644 support/sdk/cpp/sf/basecomm.cpp create mode 100644 support/sdk/cpp/sf/basecomm.h create mode 100644 support/sdk/cpp/sf/packetbuffer.cpp create mode 100644 support/sdk/cpp/sf/packetbuffer.h create mode 100644 support/sdk/cpp/sf/serialcomm.cpp create mode 100644 support/sdk/cpp/sf/serialcomm.h create mode 100644 support/sdk/cpp/sf/serialprotocol.h create mode 100644 support/sdk/cpp/sf/sf.cpp create mode 100644 support/sdk/cpp/sf/sfcontrol.cpp create mode 100644 support/sdk/cpp/sf/sfcontrol.h create mode 100644 support/sdk/cpp/sf/sfpacket.cpp create mode 100644 support/sdk/cpp/sf/sfpacket.h create mode 100644 support/sdk/cpp/sf/sharedinfo.h create mode 100644 support/sdk/cpp/sf/tcpcomm.cpp create mode 100644 support/sdk/cpp/sf/tcpcomm.h diff --git a/support/sdk/cpp/sf/Makefile b/support/sdk/cpp/sf/Makefile new file mode 100644 index 00000000..255ffb90 --- /dev/null +++ b/support/sdk/cpp/sf/Makefile @@ -0,0 +1,60 @@ +# +# Copyright (c) 2007, Technische Universitaet Berlin +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# - Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# - Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# - Neither the name of the Technische Universitaet Berlin nor the names +# of its contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED +# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, +# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +# +# @author Philipp Huppertz +# @author Andreas Koepke +# + + +CC=g++ +CFLAGS= -Wall -O3 -pthread + +all: sf + +sf: sf.o sfcontrol.o serialcomm.o tcpcomm.o basecomm.o packetbuffer.o sfpacket.o + $(CC) $(CFLAGS) sf.o sfcontrol.o serialcomm.o tcpcomm.o basecomm.o packetbuffer.o sfpacket.o -o sf + +%.o: %.cpp + $(CC) -c $(CFLAGS) $< + +serialcomm.o: serialcomm.cpp serialcomm.h basecomm.h sfpacket.h packetbuffer.h sharedinfo.h + +tcpcomm.o: tcpcomm.cpp sharedinfo.h tcpcomm.h sfpacket.h packetbuffer.h basecomm.h + +sfpacket.o: sfpacket.cpp sfpacket.h serialprotocol.h + +basecomm.o: basecomm.cpp basecomm.h + +sfcontrol.o: sfcontrol.cpp sfcontrol.h sharedinfo.h packetbuffer.h tcpcomm.h serialcomm.h + +packetbuffer.o: packetbuffer.cpp packetbuffer.h sfpacket.h + +clean: + rm -rf *.o sf + diff --git a/support/sdk/cpp/sf/README.txt b/support/sdk/cpp/sf/README.txt new file mode 100644 index 00000000..5ad11d81 --- /dev/null +++ b/support/sdk/cpp/sf/README.txt @@ -0,0 +1,157 @@ +1. PREFACE: + + This is a re-implementation of the C serial forwarder, covering the + same functionality with some improvements. + + It maintains the features of the C version (low CPU usage, small + memory footprint), but with increased reliability: it does not loose + packets while it waits for an ACK from the mote. In addition it has + a control interface listening on a port, so if you run it as a + daemon you can still ask it for various statistics, start and stop + additional SFs for individual motes... + + C++ makes this implementation a bit more readable. + +2. INSTALLATION: + + Make sure that your environment has a C++ compiler, supports POSIX + threads and can make a select on files and on sockets. + + cd to src + + Open the Makefile and adjust the CC variable and the CFLAGS to match + your environment. Please pay attention to the -c (compile only) flag + in the stem rule. If you use a Linux and g++ you should be fine out + of the box. + + run make and wait + + Your compiler might issue a warning: + + "sfpacket.cpp: warning: comparison is always true due to limited range + of data type" + + you can safely ignore it. + + You should end up with an exectuable called sf in this directory, copy + it whereever you need it. + + TODO: some of the things can be caught if we use a automake/autoconf + environment. This is an overkill until we iron out the different + platforms. + +3. USAGE + Start it with: sf + or : sf control-port PORT_NUMBER daemon + + Arguments: + control-port PORT_NUMBER : TCP port on which commands are + accepted, to play with it: use telnet. Commands are executed + once a new line '\n' is entered. If you write your own client, + make sure that it sends a terminating '\n' after the command. + + daemon : this switch (if present) makes sf aware that it may + be running as a daemon. Currently this only means that it will + not read from stdin. + + No arguments: + If sf is started without arguments it listen on + standard input for commands (for a list type "help" when sf is running). + If it is started with a given control-port (e.g.: sf control-port 9009) + sf listen on the given TCP control port _and_ the standard + input. + + Once you have it running and accepting commands either via the TCP + port or via stdin, you can issue several commands (executed once a + new line '\n' is entered): + + start - starts a sf-server on a given port and device + stop - stops a running sf-server + list - lists all running sf-servers + info - prints out some information about a given sf-server + close - closes the TCP connection to the control-client + exit - immediatly exits and kills all running sf-servers + + By typing "help" followd by a command (e.g.: "help start" detailed + information about that command is printed. + + The parameters of start are modelled after the command line of the C + serial forwarder. + + The info command prints out some stats: + + The TCP SIDE (this is where your PC side application hooks up to the + SF) prints: + + clients: the number of clients (or PC side apps/MoteIFs) connected to + this port. + + packets read: correct packets received via TCP + + packets written: packets send vi TCP to your application + + The SERIAL LINE interface prints: + packets read: the number of packets read from the mote. + + dropped: the number of packets that could not be send via TCP + (usually because no client was connected) + + bad: number of packets with CRC or length errors, often 1: it + needs a packet to synchronize to the stream. + + packets written: packets written to the mote + + dropped: number of packets where no ACK was received + from the mote after 25 retries (with linear increasing + backoff). Go check your mote application ;-) + + total retries: total number of packets where ACKs from + the motes where not received in time. These packets are + usually ACKed on a retry, these are not in failures in + general. + +4. AUTHOR + + Philipp Huppertz + +5. MAINTAINERS + + Andreas Koepke + Jan Hauer + +6. KNOWN BUGS + + - Only one control client is allowed at one point in time. + - The daemon switch is less powerful than it promises. + - serialprotocol.h should be generated, as is done for the C + version. + - automake/autoconf build is missing + +7. LICENSE + + Copyright (c) 2007, Technische Universitaet Berlin + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + - Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + - Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + - Neither the name of the Technische Universitaet Berlin nor the names + of its contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/support/sdk/cpp/sf/basecomm.cpp b/support/sdk/cpp/sf/basecomm.cpp new file mode 100644 index 00000000..bad3e50f --- /dev/null +++ b/support/sdk/cpp/sf/basecomm.cpp @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2007, Technische Universitaet Berlin + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * - Neither the name of the Technische Universitaet Berlin nor the names + * of its contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +/** + * @author Philipp Huppertz + */ + + +#include +#include + +#include "basecomm.h" + +BaseComm::BaseComm() +{ +} + + +BaseComm::~BaseComm() +{ +} + +/* all count bytes must be read before returning - blocking in that way... */ +int BaseComm::readFD(int fd, char *buffer, int count) +{ + int actual = 0; + while (count > 0) + { + int n = read(fd, buffer, count); + if (n == -1) + { + return -1; + } + if (n == 0) + { + return actual; + } + count -= n; + actual += n; + buffer += n; + } + return actual; +} + +/* all count bytes must be written before returning - blocking in that way... */ +int BaseComm::writeFD(int fd, const char *buffer, int count) +{ + int actual = 0; + while (count > 0) + { + int n = write(fd, buffer, count); + if(n == -1) + { + if(errno != 0) { + return -1; + } + else { + // looks like a temporary glitch + n = 0; + } + } + count -= n; + actual += n; + buffer += n; + } + return actual; +} diff --git a/support/sdk/cpp/sf/basecomm.h b/support/sdk/cpp/sf/basecomm.h new file mode 100644 index 00000000..3b43bda0 --- /dev/null +++ b/support/sdk/cpp/sf/basecomm.h @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2007, Technische Universitaet Berlin + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * - Neither the name of the Technische Universitaet Berlin nor the names + * of its contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +/** + * @author Philipp Huppertz + */ + +#ifndef BASECOMM_H +#define BASECOMM_H + +class BaseComm +{ +public: + BaseComm(); + + virtual ~BaseComm(); +protected: + /* performs blocking read on fd */ + virtual int readFD(int fd, char *buffer, int count); + + /* performs blocking write on fd */ + virtual int writeFD(int fd, const char *buffer, int count); +}; + +#endif diff --git a/support/sdk/cpp/sf/packetbuffer.cpp b/support/sdk/cpp/sf/packetbuffer.cpp new file mode 100644 index 00000000..639835a4 --- /dev/null +++ b/support/sdk/cpp/sf/packetbuffer.cpp @@ -0,0 +1,172 @@ +/* + * Copyright (c) 2007, Technische Universitaet Berlin + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * - Neither the name of the Technische Universitaet Berlin nor the names + * of its contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +/** + * @author Philipp Huppertz + */ + +#include "packetbuffer.h" + +#include "pthread.h" +#include + +PacketBuffer::PacketBuffer() +{ + pthread_mutex_init(&buffer.lock, NULL); + pthread_cond_init(&buffer.notempty, NULL); + pthread_cond_init(&buffer.notfull, NULL); + buffer.size = 0; +} + + +PacketBuffer::~PacketBuffer() +{ + pthread_cond_destroy(&buffer.notempty); + pthread_cond_destroy(&buffer.notfull); + pthread_mutex_destroy(&buffer.lock); +} + +// clears the buffer +void PacketBuffer::clear() { + pthread_testcancel(); + pthread_mutex_lock(&buffer.lock); + // clear + buffer.container.clear(); + buffer.size = 0; + DEBUG("PacketBuffer::clear : cleared buffer and signal ") + pthread_cond_signal(&buffer.notfull); + pthread_mutex_unlock(&buffer.lock); +} + +// gets a packet from the buffer (NULL = buffer empty) +SFPacket PacketBuffer::dequeue() +{ + SFPacket packet; + pthread_testcancel(); + pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &buffer.lock); + pthread_mutex_lock(&buffer.lock); + // wait until buffer is _not_ empty + while(buffer.size == 0) + { + DEBUG("PacketBuffer::dequeue : waiting until buffer is ") + pthread_cond_wait(&buffer.notempty, &buffer.lock); + } + // dequeue + packet = buffer.container.front(); + buffer.container.pop_front(); + --buffer.size; + DEBUG("PacketBuffer::dequeue : get from buffer and signal ") + pthread_cond_signal(&buffer.notfull); + pthread_cleanup_pop(1); + return packet; +} + +// puts a packet into buffer... (SUCCESS = true) +bool PacketBuffer::enqueueFront(SFPacket &pPacket) +{ + pthread_testcancel(); + pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &buffer.lock); + pthread_mutex_lock(&buffer.lock); + // wait until buffer is _not_ full + while(buffer.size >= cMaxBufferSize) + { + DEBUG("PacketBuffer::enqueueFront : waiting until buffer is ") + pthread_cond_wait(&buffer.notfull, &buffer.lock); + } + // enqueue + ++buffer.size; + buffer.container.push_front(pPacket); + DEBUG("PacketBuffer::enqueueFront : put in buffer and signal ") + // signal that buffer is now not empty + pthread_cond_signal(&buffer.notempty); + pthread_cleanup_pop(1); + return true; +} + +// puts a packet into buffer... (SUCCESS = true) +bool PacketBuffer::enqueueBack(SFPacket &pPacket) +{ + pthread_testcancel(); + pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &buffer.lock); + pthread_mutex_lock(&buffer.lock); + // wait until buffer is _not_ full + while(buffer.size >= cMaxBufferSize) + { + DEBUG("PacketBuffer::enqueueBack : waiting until buffer is ") + pthread_cond_wait(&buffer.notfull, &buffer.lock); + } + // enqueue + ++buffer.size; + buffer.container.push_back(pPacket); + DEBUG("PacketBuffer::enqueueBack : put in buffer and signal ") + // signal that buffer is now not empty + pthread_cond_signal(&buffer.notempty); + pthread_cleanup_pop(1); + return true; +} + +/* checks if packet buffer is full */ +bool PacketBuffer::isFull() { + bool isFull = true; + pthread_testcancel(); + pthread_mutex_lock(&buffer.lock); + if (buffer.size < cMaxBufferSize) { + isFull = false; + } + pthread_mutex_unlock(&buffer.lock); + return isFull; +} + +/* checks if packet buffer is empty */ +bool PacketBuffer::isEmpty() { + bool isEmpty = true; + pthread_testcancel(); + pthread_mutex_lock(&buffer.lock); + if (buffer.size > 0) { + isEmpty = false; + } + pthread_mutex_unlock(&buffer.lock); + return isEmpty; +} + +/* checks if pPacket is in queue */ +bool PacketBuffer::isInQueue(SFPacket &pPacket) +{ + bool result = false; + DEBUG("PacketBuffer::isInQueue : lock") + pthread_testcancel(); + pthread_mutex_lock(&buffer.lock); + container_t::const_iterator it = find(buffer.container.begin(), buffer.container.end(), pPacket); + if( it != buffer.container.end() ) + { + result = true; + } + pthread_mutex_unlock(&buffer.lock); + DEBUG("PacketBuffer::isInQueue : unlock") + return result; +} diff --git a/support/sdk/cpp/sf/packetbuffer.h b/support/sdk/cpp/sf/packetbuffer.h new file mode 100644 index 00000000..8fef2696 --- /dev/null +++ b/support/sdk/cpp/sf/packetbuffer.h @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2007, Technische Universitaet Berlin + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * - Neither the name of the Technische Universitaet Berlin nor the names + * of its contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +/** + * @author Philipp Huppertz + */ + +#ifndef PACKETBUFFER_H +#define PACKETBUFFER_H + +#include +#include +#include "sfpacket.h" + +//#define DEBUG_PACKETBUFFER + +#undef DEBUG +#ifdef DEBUG_PACKETBUFFER +#include +#define DEBUG(message) std::cout << message << std::endl; +#else +#define DEBUG(message) +#endif + +class PacketBuffer +{ +protected: + + static const int cMaxBufferSize = 25; + + typedef std::list container_t; + + // thread safe buffer + typedef struct + { + // mutex lock for any of this vars + pthread_mutex_t lock; + // notempty cond + pthread_cond_t notempty; + // not full cond + pthread_cond_t notfull; + // actual buffer + container_t container; + // number of packets in buffer + int size; + } sharedBuffer_t; + + sharedBuffer_t buffer; + +public: + PacketBuffer(); + + ~PacketBuffer(); + + void clear(); + + SFPacket dequeue(); + + bool enqueueFront(SFPacket &pPacket); + + bool enqueueBack(SFPacket &pPacket); + + bool isFull(); + + bool isEmpty(); + + bool isInQueue(SFPacket &pPacket); + +}; + +#endif diff --git a/support/sdk/cpp/sf/serialcomm.cpp b/support/sdk/cpp/sf/serialcomm.cpp new file mode 100644 index 00000000..ea4e4bfa --- /dev/null +++ b/support/sdk/cpp/sf/serialcomm.cpp @@ -0,0 +1,815 @@ +/* + * Copyright (c) 2007, Technische Universitaet Berlin + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * - Neither the name of the Technische Universitaet Berlin nor the names + * of its contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +/** + * @author Philipp Huppertz + */ + +#include "serialcomm.h" +#include "sharedinfo.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace std; + +/* forward declarations of pthrad helper functions*/ +void* readSerialThread(void*); +void* writeSerialThread(void*); + +tcflag_t SerialComm::parseBaudrate(int requested) +{ + int baudrate; + + switch (requested) + { +#ifdef B50 + case 50: + baudrate = B50; + break; +#endif +#ifdef B75 + + case 75: + baudrate = B75; + break; +#endif +#ifdef B110 + + case 110: + baudrate = B110; + break; +#endif +#ifdef B134 + + case 134: + baudrate = B134; + break; +#endif +#ifdef B150 + + case 150: + baudrate = B150; + break; +#endif +#ifdef B200 + + case 200: + baudrate = B200; + break; +#endif +#ifdef B300 + + case 300: + baudrate = B300; + break; +#endif +#ifdef B600 + + case 600: + baudrate = B600; + break; +#endif +#ifdef B1200 + + case 1200: + baudrate = B1200; + break; +#endif +#ifdef B1800 + + case 1800: + baudrate = B1800; + break; +#endif +#ifdef B2400 + + case 2400: + baudrate = B2400; + break; +#endif +#ifdef B4800 + + case 4800: + baudrate = B4800; + break; +#endif +#ifdef B9600 + + case 9600: + baudrate = B9600; + break; +#endif +#ifdef B19200 + + case 19200: + baudrate = B19200; + break; +#endif +#ifdef B38400 + + case 38400: + baudrate = B38400; + break; +#endif +#ifdef B57600 + + case 57600: + baudrate = B57600; + break; +#endif +#ifdef B115200 + + case 115200: + baudrate = B115200; + break; +#endif +#ifdef B230400 + + case 230400: + baudrate = B230400; + break; +#endif +#ifdef B460800 + + case 460800: + baudrate = B460800; + break; +#endif +#ifdef B500000 + + case 500000: + baudrate = B500000; + break; +#endif +#ifdef B576000 + + case 576000: + baudrate = B576000; + break; +#endif +#ifdef B921600 + + case 921600: + baudrate = B921600; + break; +#endif +#ifdef B1000000 + + case 1000000: + baudrate = B1000000; + break; +#endif +#ifdef B1152000 + + case 1152000: + baudrate = B1152000; + break; +#endif +#ifdef B1500000 + + case 1500000: + baudrate = B1500000; + break; +#endif +#ifdef B2000000 + + case 2000000: + baudrate = B2000000; + break; +#endif +#ifdef B2500000 + + case 2500000: + baudrate = B2500000; + break; +#endif +#ifdef B3000000 + + case 3000000: + baudrate = B3000000; + break; +#endif +#ifdef B3500000 + + case 3500000: + baudrate = B3500000; + break; +#endif +#ifdef B4000000 + + case 4000000: + baudrate = B4000000; + break; +#endif + + default: + baudrate = 0; + } + return baudrate; +} + +SerialComm::SerialComm(const char* pDevice, int pBaudrate, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), droppedReadPacketCount(0), droppedWritePacketCount(0), readPacketCount(0), writtenPacketCount(0), badPacketCount(0), sumRetries(0), device(pDevice), baudrate(pBaudrate), errorReported(false), errorMsg(""), control(pControl) +{ + writerThreadRunning = false; + readerThreadRunning = false; + rawFifo.head = rawFifo.tail = 0; + tcflag_t baudflag = parseBaudrate(pBaudrate); + + srand ( time(NULL) ); + seqno = rand(); + FD_ZERO(&rfds); + FD_ZERO(&wfds); + + serialReadFD = open(device.c_str(), O_RDONLY | O_NOCTTY | O_NONBLOCK); + serialWriteFD = open(device.c_str(), O_WRONLY | O_NOCTTY); + + if (((serialReadFD < 0) || (serialWriteFD < 0) || (!baudflag)) && !(errorReported == true)) + { + ostringstream msg; + msg << "could not open device = " << pDevice << " with baudrate = " << pBaudrate; + reportError(msg.str().c_str() ,-1); + } + + /* Serial port setting */ + struct termios newtio; + memset(&newtio, 0, sizeof(newtio)); + newtio.c_cflag = CS8 | CLOCAL | CREAD; + newtio.c_iflag = IGNPAR | IGNBRK; + cfsetispeed(&newtio, baudflag); + cfsetospeed(&newtio, baudflag); + + /* Raw output_file */ + newtio.c_oflag = 0; + + if ((tcflush(serialReadFD, TCIFLUSH) >= 0 && tcsetattr(serialReadFD, TCSANOW, &newtio) >= 0) + && (tcflush(serialWriteFD, TCIFLUSH) >= 0 && tcsetattr(serialWriteFD, TCSANOW, &newtio) >= 0) + && !errorReported) + { + DEBUG("SerialComm::SerialComm : opened device "<< pDevice << " with baudrate = " << pBaudrate) + } + else + { + close(serialReadFD); + close(serialWriteFD); + if (!errorReported) + { + ostringstream msg; + msg << "could not set ioflags for opened device = " << pDevice; + reportError(msg.str().c_str(),-1); + } + } + + pthread_mutex_init(&ack.lock, NULL); + pthread_cond_init(&ack.received, NULL); + + if (!errorReported) + { + // start thread for reading from serial line + if (reportError("SerialComm::SerialComm : pthread_create( &readerThread, NULL, readSerialThread, this)", pthread_create( &readerThread, NULL, readSerialThread, this)) == 0) + readerThreadRunning = true; + // start thread for writing to serial line + if (reportError("SerialComm::SerialComm : pthread_create( &writerThread, NULL, writeSerialThread, this)", pthread_create( &writerThread, NULL, writeSerialThread, this)) == 0) + writerThreadRunning = true; + } +} + + +SerialComm::~SerialComm() +{ + cancel(); + + pthread_mutex_destroy(&ack.lock); + pthread_cond_destroy(&ack.received); + + close(serialReadFD); + close(serialWriteFD); +} + +int SerialComm::hdlcEncode(int count, const char* from, char *to) { + int offset = 0; + for(int i = 0; i < count; i++) { + if (from[i] == SYNC_BYTE || from[i] == ESCAPE_BYTE) + { + to[offset++] = ESCAPE_BYTE; + to[offset++] = from[i] ^ 0x20; + } + else { + to[offset++] = from[i]; + } + } + return offset; +} + +int SerialComm::writeFD(int fd, const char *buffer, int count) +{ + int cnt = 0; + /* + FD_SET(serialWriteFD, &wfds); + if(select(serialWriteFD + 1, NULL, &wfds, NULL, NULL) < 0) { + return -1; + } + FD_CLR(serialWriteFD, &wfds); + */ + int tmpCnt = BaseComm::writeFD(fd, buffer, count); + if (tmpCnt < 0) { + return tmpCnt; + } + else { + cnt += tmpCnt; + } + return cnt; +} + + +/* Work around buggy usb serial driver (returns 0 when no data is + available, independent of the blocking/non-blocking mode) */ +int SerialComm::readFD(int fd, char *buffer, int count, int maxCount) +{ + int cnt = 0; + timeval tvold; + timeval tv; + unsigned to = (10000000 / baudrate) * count; // time out in usec + tvold.tv_sec = to / 1000000; + tvold.tv_usec = to % 1000000; + while (cnt == 0) + { + // no FD_ZERO here because of performance issues. It is done in constructor... + FD_SET(serialReadFD, &rfds); + if (select(serialReadFD + 1, &rfds, NULL, NULL, NULL) < 0) { + return -1; + } + FD_CLR(serialReadFD, &rfds); + tv = tvold; + select(0, NULL, NULL, NULL, &tv); + int tmpCnt = read(fd, buffer, maxCount); + if (tmpCnt < 0) { + return tmpCnt; + } + else { + cnt += tmpCnt; + } + } + return cnt; +} + +char SerialComm::nextRaw() { + char nextByte = 0; + if(rawFifo.tail < rawFifo.head) { + nextByte = rawFifo.queue[rawFifo.tail++]; + } + else { + // fifo empty -- need to get some bytes + rawFifo.tail = 0; + reportError("SerialComm::nextRaw: readFD(serialReadFD, rawFifo.queue, rawReadBytes, maxMTU-1)", + rawFifo.head = readFD(serialReadFD, rawFifo.queue, rawReadBytes, maxMTU-1)); + nextByte = rawFifo.queue[rawFifo.tail++]; + } + return nextByte; +} + +/* reads packet */ +bool SerialComm::readPacket(SFPacket &pPacket) +{ + bool sync = false; + bool escape = false; + bool completePacket = false; + int count = 0; + uint16_t crc = 0; + char buffer[maxMTU]; + while(!completePacket) + { + buffer[count] = nextRaw(); + + if(sync && (count == 1) && (buffer[count] == SYNC_BYTE)) { + DEBUG("SerialComm::readPacket double sync byte"); + sync = false; + escape = false; + count = 1; + crc = 0; + buffer[0] = SYNC_BYTE; + } + + if (!sync) + { + // wait for sync + if (buffer[0] == SYNC_BYTE) + { + sync = true; + escape = false; + count = 1; + crc = 0; + } + } + else if (count >= maxMTU) + { + DEBUG("SerialComm::readPacket : frame too long - size = " << count << " : resynchronising") + sync = false; + escape = false; + count = crc = 0; + badPacketCount++; + } + else if (escape) + { + if (buffer[count] == SYNC_BYTE) + { + DEBUG("SerialComm::readPacket : resynchronising") + sync = false; + escape = false; + count = crc = 0; + badPacketCount++; + } + else + { + buffer[count] ^= 0x20; + if (count > 3) + { + crc = SerialComm::byteCRC(buffer[count-3], crc); + } + ++count; + escape = false; + } + } + else if (buffer[count] == ESCAPE_BYTE) + { + // next byte is escaped + escape = true; + } + else if (buffer[count] == SYNC_BYTE) + { + // calculate last crc byte + if (count > 3) + { + crc = SerialComm::byteCRC(buffer[count-3], crc); + } + uint16_t packetCRC = (buffer[count - 2] & 0xff) | ((buffer[count - 1] << 8) & 0xff00); + if (count < minMTU) + { + DEBUG("SerialComm::readPacket : frame too short - size = " << count << " : resynchronising ") + sync = false; + escape = false; + count = crc = 0; + badPacketCount++; + } + else if (crc != packetCRC) + { + DEBUG("SerialComm::readPacket : bad crc - calculated crc = " << crc << " packet crc = " << packetCRC << " : resynchronising " ) + sync = false; + escape = false; + count = crc = 0; + badPacketCount++; + } + else + { + pPacket.setType(buffer[typeOffset]); + pPacket.setSeqno(buffer[seqnoOffset]); + switch (buffer[typeOffset]) + { + case SF_ACK: + break; + case SF_PACKET_NO_ACK: + case SF_PACKET_ACK: + // buffer / payload + // FIXME: strange packet format!? because seqno is not really defined - missing :( + pPacket.setPayload(&buffer[payloadOffset]-1, count+1+1 - serialHeaderBytes); + break; + default: + DEBUG("SerialComm::readPacket : unknown packet type = " << static_cast(buffer[typeOffset] & 0xff)) + ; + } + completePacket = true; +#ifdef DEBUG_RAW_SERIALCOMM + + DEBUG("SerialComm::readPacket : raw data >>") + for (int j=0; j <= count; j++) + { + cout << std::hex << static_cast(buffer[j] & 0xff) << " " << std::dec; + } + cout << endl; + cout << "as payload >> " << endl; + const char* ptr = pPacket.getPayload(); + for (int j=0; j < pPacket.getLength(); j++) + { + cout << std::hex << static_cast(ptr[j] & 0xff) << " " << std::dec; + } + cout << endl; +#endif + + } + } + else + { + if (count > 3) + { + crc = SerialComm::byteCRC(buffer[count-3], crc); + } + ++count; + } + } + return true; +} + + +/* writes packet */ +bool SerialComm::writePacket(SFPacket &pPacket) +{ + char type, byte; + uint16_t crc = 0; + char buffer[2*pPacket.getLength() + 20]; + int offset = 0; + + // put SFD into buffer + buffer[offset++] = SYNC_BYTE; + + // packet type + byte = type = pPacket.getType(); + crc = byteCRC(byte, crc); + offset += hdlcEncode(1, &byte, buffer + offset); + + // seqno + byte = pPacket.getSeqno(); + crc = byteCRC(byte, crc); + offset += hdlcEncode(1, &byte, buffer + offset); + + switch (type) + { + case SF_ACK: + break; + case SF_PACKET_NO_ACK: + case SF_PACKET_ACK: + // compute crc + for(int i = 0; i < pPacket.getLength(); i++) { + crc = byteCRC(pPacket.getPayload()[i], crc); + } + offset += hdlcEncode(pPacket.getLength(), pPacket.getPayload(), buffer + offset); + break; + default: + return false; + } + + // crc two bytes + byte = crc & 0xff; + offset += hdlcEncode(1, &byte, buffer + offset); + byte = (crc >> 8) & 0xff; + offset += hdlcEncode(1, &byte, buffer + offset); + + // put SFD into buffer + buffer[offset++] = SYNC_BYTE; + if(writeFD(serialWriteFD, buffer, offset) < offset) { + DEBUG("SerialComm::writePacket failed"); + return false; + } + return true; +} + +string SerialComm::getDevice() const +{ + return device; +} + +int SerialComm::getBaudRate() const +{ + return baudrate; +} + +/* helper function to start serial reader pthread */ +void* readSerialThread(void* ob) +{ + static_cast(ob)->readSerial(); + return NULL; +} + +/* reads from connected clients */ +void SerialComm::readSerial() +{ + while (true) + { + SFPacket packet; + readPacket(packet); + switch (packet.getType()) + { + case SF_ACK: + // successful delivery + // FIXME: seqnos are not implemented on the node ! + pthread_cond_signal(&ack.received); + break; + case SF_PACKET_ACK: + { + // put ack in front of queue + SFPacket ack(SF_ACK, packet.getSeqno()); + writeBuffer.enqueueFront(ack); + } + case SF_PACKET_NO_ACK: + // do nothing - fall through + default: + if (!readBuffer.isFull()) + { + ++readPacketCount; + // put silently into buffer... + readBuffer.enqueueBack(packet); + } + else + { + ++droppedReadPacketCount; + // DEBUG("SerialComm::readSerial : dropped packet") + } + } + } +} + +/* helper function to start serial writer pthread */ +void* writeSerialThread(void* ob) +{ + static_cast(ob)->writeSerial(); + return NULL; +} + +/* writes to serial/node */ +void SerialComm::writeSerial() +{ + SFPacket packet; + bool retry = false; + int retryCount = 0; + long long timeout; + + while (true) + { + if (!retry) + { + packet = writeBuffer.dequeue(); + } + switch (packet.getType()) + { + case SF_ACK: + // successful delivery + if (!writePacket(packet)) + { + DEBUG("SerialComm::writeSerial : writePacket failed (SF_ACK)") + reportError("SerialComm::writeSerial : writePacket(SF_ACK)", -1); + } + break; + case SF_PACKET_ACK: + // do nothing - fall through + case SF_PACKET_NO_ACK: + // do nothing - fall through + default: + if (!retry) + ++writtenPacketCount; + // FIXME: this is the only currently supported type by the mote + packet.setType(SF_PACKET_ACK); + if (!writePacket(packet)) + { + DEBUG("SerialComm::writeSerial : writePacket failed (SF_PACKET)") + reportError("SerialComm::writeSerial : writeFD(SF_PACKET)", -1); + } + // wait for ack... + struct timeval currentTime; + struct timespec ackTime; + timeout = (long long)ackTimeout * (retryCount + 1); + + pthread_testcancel(); + pthread_mutex_lock(&ack.lock); + + gettimeofday(¤tTime, NULL); + ackTime.tv_sec = currentTime.tv_sec; + ackTime.tv_nsec = currentTime.tv_usec * 1000; + + ackTime.tv_sec += timeout / (1000*1000*1000); + ackTime.tv_nsec += timeout % (1000*1000*1000); + + pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &ack.lock); + int retval = pthread_cond_timedwait(&ack.received, &ack.lock, &ackTime); + if (!((retryCount < maxRetries) && (retval == ETIMEDOUT))) + { + if (retryCount >= maxRetries) ++droppedWritePacketCount; + retry = false; + retryCount = 0; + } + else + { + ++retryCount; + retry = true; + DEBUG("SerialComm::writeSerial : packet retryCount = " << retryCount); + ++sumRetries; + } + // removes the cleanup handler and executes it (unlock mutex) + pthread_cleanup_pop(1); } + } +} + +/* cancels all running threads */ +void SerialComm::cancel() +{ + pthread_t callingThread = pthread_self(); + if (pthread_equal(callingThread, readerThread)) + { + DEBUG("SerialComm::cancel : by readerThread") + pthread_detach(readerThread); + if (writerThreadRunning) + { + pthread_cancel(writerThread); + DEBUG("SerialComm::cancel : writerThread canceled, joining") + pthread_join(writerThread, NULL); + writerThreadRunning = false; + } + readerThreadRunning = false; + pthread_cond_signal(&control.cancel); + pthread_exit(NULL); + } + else if ((pthread_equal(callingThread, writerThread))) + { + DEBUG("SerialComm::cancel : by writerThread") + pthread_detach(writerThread); + if (readerThreadRunning) + { + pthread_cancel(readerThread); + DEBUG("SerialComm::cancel : readerThread canceled, joining") + pthread_join(readerThread, NULL); + readerThreadRunning = false; + } + writerThreadRunning = false; + pthread_cond_signal(&control.cancel); + pthread_exit(NULL); + } + else + { + DEBUG("SerialComm::cancel : by other thread") + if (readerThreadRunning) + { + pthread_cancel(readerThread); + DEBUG("SerialComm::cancel : readerThread canceled, joining") + pthread_join(readerThread, NULL); + readerThreadRunning = false; + } + if (writerThreadRunning) + { + pthread_cancel(writerThread); + DEBUG("SerialComm::cancel : writerThread canceled, joining") + pthread_join(writerThread, NULL); + writerThreadRunning = false; + } + pthread_cond_signal(&control.cancel); + } +} + +/* reports error */ +int SerialComm::reportError(const char *msg, int result) +{ + if ((result < 0) && (!errorReported)) + { + errorMsg << "error : SF-Server ( SerialComm on device = " << device << " ) : " + << msg << " ( result = " << result << " )" << endl + << "error-description : " << strerror(errno) << endl; + + cerr << errorMsg.str(); + errorReported = true; + cancel(); + } + return result; +} + +/* prints out status */ +void SerialComm::reportStatus(ostream& os) +{ + os << "SF-Server ( SerialComm on device " << device << " ) : " + << "baudrate = " << baudrate + << " , packets read = " << readPacketCount + << " ( dropped = " << droppedReadPacketCount + << ", bad = " << badPacketCount << " )" + << " , packets written = " << writtenPacketCount + << " ( dropped = " << droppedWritePacketCount + << ", total retries: " << sumRetries << " )" + << endl; +} diff --git a/support/sdk/cpp/sf/serialcomm.h b/support/sdk/cpp/sf/serialcomm.h new file mode 100644 index 00000000..e12409c4 --- /dev/null +++ b/support/sdk/cpp/sf/serialcomm.h @@ -0,0 +1,242 @@ +/* + * Copyright (c) 2007, Technische Universitaet Berlin + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * - Neither the name of the Technische Universitaet Berlin nor the names + * of its contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +/** + * @author Philipp Huppertz + */ + +#ifndef SERIALCOMM_H +#define SERIALCOMM_H + +#include "basecomm.h" +#include "sfpacket.h" +#include "packetbuffer.h" +#include "sharedinfo.h" + +#include +#include +#include +#include +#include + +// #define DEBUG_SERIALCOMM +// #define DEBUG_RAW_SERIALCOMM + +#undef DEBUG +#ifdef DEBUG_SERIALCOMM +#include +#define DEBUG(message) std::cout << message << std::endl; +#else +#define DEBUG(message) +#endif + + +class SerialComm : public BaseComm +{ + + /** Constants **/ +protected: + // max serial MTU + static const int maxMTU = (SFPacket::cMaxPacketLength+1)*2; + // min serial MTU + static const int minMTU = 4; + // byte count of serial header + static const int serialHeaderBytes = 6; + // byte offset of type field + static const int typeOffset = 1; + // byte offset of sequence number field + static const int seqnoOffset = 2; + // byte offset of payload field + static const int payloadOffset = 3; + // timeout for acks in s + static const int ackTimeout = 1000 * 1000 * 200; + // max. reties for packets from pc to node + static const int maxRetries = 25; + + // how many bytes do we attempt to read from the serial line in one go? + static const int rawReadBytes = 20; + + /** Member vars */ +protected: + /* pthread for serial reading */ + pthread_t readerThread; + + bool readerThreadRunning; + + /* pthread for serial writing */ + pthread_t writerThread; + + bool writerThreadRunning; + + // thread safe ack + typedef struct + { + // mutex lock for any of this vars + pthread_mutex_t lock; + // notempty cond + pthread_cond_t received; + } ackCondition_t; + + ackCondition_t ack; + + /* raw read buffer */ + struct rawFifo_t { + char queue[maxMTU]; + int head; + int tail; + }; + + rawFifo_t rawFifo; + + /* reference to read packet buffer */ + PacketBuffer &readBuffer; + + /* reference to write packet buffer */ + PacketBuffer &writeBuffer; + + /* number of dropped (read) packets */ + int droppedReadPacketCount; + + /* number of dropped (write) packets */ + int droppedWritePacketCount; + + /* number of read packets */ + int readPacketCount; + + /* number of written packets */ + int writtenPacketCount; + + /* number of bad packets read from serial line, counts resynchronizations! */ + int badPacketCount; + + /* sum retry attempts for all packets */ + int sumRetries; + + /* device port of this sf */ + std::string device; + + /* baudrate of connected device */ + int baudrate; + + /* read fd set */ + fd_set rfds; + + /* write fd set */ + fd_set wfds; + + /* fd for reading from serial device */ + int serialReadFD; + + /* fd for writing to serial device */ + int serialWriteFD; + + /* seqno for serial data packets */ + int seqno; + + /* indicates that an error occured */ + bool errorReported; + + /* error message of reportError call */ + std::ostringstream errorMsg; + + /* for noticing the parent thread of cancelation */ + sharedControlInfo_t &control; + +/** Member functions */ + + /* needed to start pthreads */ + friend void* readSerialThread(void* ob); + friend void* writeSerialThread(void* ob); + +private: + /* do not allow standard constructor */ + SerialComm(); + +protected: + char nextRaw(); + + /* claculates crc byte-wise */ + inline static uint16_t byteCRC(uint8_t byte, uint16_t crc) { + crc = (uint8_t)(crc >> 8) | (crc << 8); + crc ^= byte; + crc ^= (uint8_t)(crc & 0xff) >> 4; + crc ^= crc << 12; + crc ^= (crc & 0xff) << 5; + return crc; + } + + /* HDLC encode (byte stuff) count bytes from buffer from into buffer to. + * to must be at least count * 2 bytes large. Returns the number of bytes + * written into to. + */ + int hdlcEncode(int count, const char* from, char *to); + + /** + * try to read at least count bytes in one go, but may read up to maxCount bytes. + */ + virtual int readFD(int fd, char *buffer, int count, int maxCount); + + /* enables byte escaping. overwrites method from base class.*/ + virtual int writeFD(int fd, const char *buffer, int count); + + /* reads a packet (blocking) */ + bool readPacket(SFPacket &pPacket); + + /* writes a packet to serial source */ + bool writePacket(SFPacket &pPacket); + + /* returns tcflag of requested baudrate */ + static tcflag_t parseBaudrate(int requested); + + int reportError(const char *msg, int result); + + /* checks for messages from node - producer thread */ + void readSerial(); + + /* write messages to serial / node - consumer thread */ + void writeSerial(); + +public: + SerialComm(const char* pDevice, int pBaudrate, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl); + + ~SerialComm(); + + /* cancels all running threads */ + void cancel(); + + std::string getDevice() const; + + int getBaudRate() const; + + void reportStatus(std::ostream& os); + + /* returns if error occurred */ + bool isErrorReported() { return errorReported; } +}; + +#endif diff --git a/support/sdk/cpp/sf/serialprotocol.h b/support/sdk/cpp/sf/serialprotocol.h new file mode 100644 index 00000000..46244574 --- /dev/null +++ b/support/sdk/cpp/sf/serialprotocol.h @@ -0,0 +1,18 @@ +/** + * This file is automatically generated by ncg. DO NOT EDIT THIS FILE. + * It includes values of some nesC constants from + * /home/phihup/cvs/tinyos-2.x/tos/lib/serial/Serial.h. + */ + +enum { + SERIAL_HDLC_CTLESC_BYTE = 125, + SERIAL_TOS_SERIAL_802_15_4_ID = 2, + SERIAL_SERIAL_PROTO_ACK = 67, + SERIAL_TOS_SERIAL_CC1000_ID = 1, + SERIAL_SERIAL_PROTO_PACKET_NOACK = 69, + SERIAL_SERIAL_PROTO_PACKET_UNKNOWN = 255, + SERIAL_HDLC_FLAG_BYTE = 126, + SERIAL_TOS_SERIAL_ACTIVE_MESSAGE_ID = 0, + SERIAL_TOS_SERIAL_UNKNOWN_ID = 255, + SERIAL_SERIAL_PROTO_PACKET_ACK = 68 +}; diff --git a/support/sdk/cpp/sf/sf.cpp b/support/sdk/cpp/sf/sf.cpp new file mode 100644 index 00000000..fdf5ced4 --- /dev/null +++ b/support/sdk/cpp/sf/sf.cpp @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2007, Technische Universitaet Berlin + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * - Neither the name of the Technische Universitaet Berlin nor the names + * of its contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +/** + * @author Philipp Huppertz + */ + + + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include +#include +#include +#include + +#include "sfcontrol.h" +#include "tcpcomm.h" +#include "serialcomm.h" +#include "packetbuffer.h" + + +using namespace std; + + + +int main(int argc, char *argv[]) +{ + + SFControl control; + control.parseArgs(argc, argv); + control.waitOnInput(); + + return EXIT_SUCCESS; +} diff --git a/support/sdk/cpp/sf/sfcontrol.cpp b/support/sdk/cpp/sf/sfcontrol.cpp new file mode 100644 index 00000000..1920949d --- /dev/null +++ b/support/sdk/cpp/sf/sfcontrol.cpp @@ -0,0 +1,671 @@ +/* + * Copyright (c) 2007, Technische Universitaet Berlin + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * - Neither the name of the Technische Universitaet Berlin nor the names + * of its contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +/** + * @author Philipp Huppertz + */ + +#include "sfcontrol.h" +#include "sharedinfo.h" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + + +using namespace std; + +/* forward declarations of pthrad helper functions*/ +void* checkCancelThread(void*); + +SFControl::SFControl() +{ + servers.clear(); + pthread_mutex_init(&sfControlInfo.lock, NULL); + pthread_cond_init(&sfControlInfo.cancel, NULL); + + FD_ZERO(&rfds); + FD_ZERO(&wfds); + uniqueId = 0; + servers.clear(); + serverFD = -1; + clientFD = -1; + controlPort = -1; + controlServerStarted = false; + daemon = false; + reportError("SFControl::SFControl : pthread_create( &cancelThread, NULL, checkCancelThread, this)", pthread_create( &cancelThread, NULL, checkCancelThread, this)); +} + + +SFControl::~SFControl() +{ + close(serverFD); + pthread_mutex_destroy(&sfControlInfo.lock); + pthread_cond_destroy(&sfControlInfo.cancel); +} + +string SFControl::getHelpMessage(string msg) +{ + stringstream helpMessage; + if (msg == "help arguments") + { + // genral help message for command line arguments + helpMessage << "sf - Controls (starting/stopping) several SFs on one machine" << endl << endl + << "Usage : sf" << endl + << "or : sf control-port PORT_NUMBER daemon" << endl << endl + << "Arguments:" << endl + << " control-port PORT_NUMBER : TCP port on which commands are accepted" << endl + << " daemon : this switch (if present) makes sf aware that it may be running as a daemon " << endl << endl + << "Info:" << endl + << " If sf is started without arguments it listen on " << endl + << " standard input for commands (for a list type \"help\" when sf is running)." << endl + << " If it is started with a given control-port (e.g.: sf control-port 9009)" << endl + << " sf listen on the given TCP control port _and_ the standard" << endl + << " input." << endl; + + } + else if (msg == "start") + { + helpMessage << ">> start PORT DEVICE_NAME BAUDRATE:" << endl + << ">> Starts a sf-server on a given TCP port connecting to a given device with the given baudrate." << endl + << ">> The TCP port device name must be specified and must not" << endl + << ">> overlap with any other TCP port or device name pair of an already running sf-server." << endl + << ">> (e.g: \"start 9002 /dev/ttyUSB2 115200\" starts server on port 9002 and device /dev/ttyUSB2 with baudrate 115200)" << endl; + } + else if (msg == "stop") + { + helpMessage << ">> stop ID | PORT | DEVICE_NAME:" << endl + << ">> Stops the specified sf-server." << endl + << ">> The unique id or the device or the TCP port of the" << endl + << ">> sf-server must be specified." << endl + << ">> (e.g: \"stop 1\" stops server with id 1 " << endl + << ">> \"stop /dev/ttyUSB0\" stops server connected to /dev/ttyUSB0" << endl + << ">> \"stop 9002\" prints stops server listening on TCPport 90002)" << endl; + } + else if (msg == "info") + { + helpMessage << ">> info ID | PORT | DEVICE_NAME:" << endl + << ">> Prints some information about a given sf-server." << endl + << ">> The unique id or the device or the TCP port of the" << endl + << ">> sf-server must be specified." << endl + << ">> (e.g: \"info 1\" prints out information about server with id 1 " << endl + << ">> \"info /dev/ttyUSB0\" prints out information about server connected to /dev/ttyUSB0" << endl + << ">> \"info 9002\" prints out information about server listening on TCPport 90002)" << endl; + } + else if (msg == "list") + { + helpMessage << ">> list:" << endl + << ">> Displays a list of currently running sf-servers." << endl + << ">> A List Entry contains the unique id, the TCP port and the device" << endl + << ">> of a sf-server." << endl; + } + else if (msg == "close") + { + helpMessage << ">> close:" << endl + << ">> Closes the TCP connection to the control client." << endl + << ">> It can be issued only if the control-server is started."<< endl; + } + else if (msg == "exit") + { + helpMessage << ">> exit:" << endl + << ">> Immediatly exits and kills all running sf-servers." << endl + << ">> This ends everything gracefully..." << endl; + } + else + { + // genral help message for interactive commands + helpMessage << ">> Supported commands are:" << endl + << ">> " << endl + << ">> start - starts a sf-server on a given port and device" << endl + << ">> stop - stops a running sf-server" << endl + << ">> list - lists all running sf-servers" << endl + << ">> info - prints out some information about a given sf-server" << endl; + if (controlServerStarted) { + helpMessage << ">> close - closes the TCP connection to the control-client" << endl; + } + helpMessage << ">> exit - immediatly exits and kills all running sf-servers" << endl + << ">>" << endl + << ">> By typing \"help\" followd by a command (e.g.: \"help start\")" << endl + << ">> detailed information about that command is printed." << endl; + + + } + return helpMessage.str(); +} + + +void SFControl::parseArgs(int argc, char *argv[]) +{ + if (argc == 1) + { + os << ">> Starting sf-control." << endl; + os << ">> Accepting commands on standard input..." << endl; + deliverOutput(); + // test standard port before + } + else if (argc >= 3) + { + int port = -1; + string argPort(argv[2]); + stringstream helpStream(argPort); + helpStream >> port; + if ((strncmp(argv[1], "control-port", 13) >= 0) && (port > 0)) + { + controlPort = port; + startControlServer(); + os << ">> Accepting commands on TCP port " << controlPort ; + if(argc == 3) { + os << " and on standard input..." << endl; + daemon = false; + } + else { + os << " but not on standard input..." << endl; + daemon = true; + } + deliverOutput(); + } + else + { + os << getHelpMessage("help arguments"); + deliverOutput(); + exit(1); + } + } + else + { + os << getHelpMessage("help arguments"); + deliverOutput(); + exit(1); + } +} + +/* starts a sf-server */ +void SFControl::startServer(int port, string device, int baudrate) +{ + pthread_testcancel(); + pthread_mutex_lock(&sfControlInfo.lock); + sfServer_t newSFServer; + newSFServer.serial2tcp = new PacketBuffer(); + newSFServer.tcp2serial = new PacketBuffer(); + newSFServer.TcpServer = new TCPComm(port, *(newSFServer.tcp2serial), *(newSFServer.serial2tcp), sfControlInfo); + newSFServer.SerialDevice = new SerialComm(device.c_str(), baudrate, *(newSFServer.serial2tcp), *(newSFServer.tcp2serial), sfControlInfo); + newSFServer.id = ++uniqueId; + servers.push_back(newSFServer); + pthread_mutex_unlock(&sfControlInfo.lock); +} + +/* stops a given sf-server. returns false if specified server not running */ +bool SFControl::stopServer(int& id, int& port, string& device) +{ + pthread_testcancel(); + pthread_mutex_lock(&sfControlInfo.lock); + bool found = false; + list::iterator it = servers.begin(); + list::iterator next = it; + while( (it != servers.end()) && (!found)) + { + ++next; + if (((*it).SerialDevice->getDevice() == device) || ((*it).TcpServer->getPort() == port) || ((*it).id == id) ) + { + // cancel + (*it).TcpServer->cancel(); + (*it).SerialDevice->cancel(); + // set id, port and device accordingly + id = (*it).id; + port = (*it).TcpServer->getPort(); + device = (*it).SerialDevice->getDevice(); + // clean up + delete (*it).TcpServer; + delete (*it).SerialDevice; + delete (*it).tcp2serial; + delete (*it).serial2tcp; + servers.erase(it); + found = true; + } + it = next; + } + pthread_mutex_unlock(&sfControlInfo.lock); + return found; +} + +/* prints out server info for specified server */ +bool SFControl::showServerInfo(ostream& pOs, int id, int port, string device) +{ + pthread_testcancel(); + pthread_mutex_lock(&sfControlInfo.lock); + bool found = false; + list::iterator it = servers.begin(); + list::iterator next = it; + while( it != servers.end() && (!found)) + { + ++next; + if (((*it).SerialDevice->getDevice() == device) || ((*it).TcpServer->getPort() == port) || ((*it).id == id) ) + { + pOs << ">> info for sf-server with id = " << (*it).id + << " ( port = " << (*it).TcpServer->getPort() + << " , device = " << (*it).SerialDevice->getDevice() + << " , baudrate = " << (*it).SerialDevice->getBaudRate() + << " )" << endl; + pOs << ">> "; + (*it).TcpServer->reportStatus(os); + pOs << ">> "; + (*it).SerialDevice->reportStatus(os); + found = true; + } + it = next; + } + pthread_mutex_unlock(&sfControlInfo.lock); + return found; +} + +/* lists all running servers */ +void SFControl::listServers(ostream& pOs) +{ + pthread_testcancel(); + pthread_mutex_lock(&sfControlInfo.lock); + list::iterator it = servers.begin(); + for ( it = servers.begin(); it != servers.end(); it++ ) + { + pOs << ">> sf-server id = " << (*it).id + << " , port = " << (*it).TcpServer->getPort() + << " , device = " << (*it).SerialDevice->getDevice() + << " , baudrate = " << (*it).SerialDevice->getBaudRate() << endl; + } + if (servers.size() == 0) + { + pOs << ">> none" << endl; + } + pthread_mutex_unlock(&sfControlInfo.lock); +} + +void SFControl::parseInput(std::string arg) +{ + /* silly, but works ... */ + string strBuf; + stringstream parseStream(arg); + vector tokens; + while (parseStream >> strBuf) + tokens.push_back(strBuf); + + if (tokens[0] == "start") + { + if (tokens.size() == 4) + { + if (servers.size() < maxSFServers) + { + os << ">> Trying to start sf-server with id = " << (uniqueId+1) + << " ( port = " << tokens[1] + << " , device = " << tokens[2] + << " , baudrate = " << tokens[3] + << " )" << endl; + deliverOutput(); + stringstream helpInt; + int baudrate = 0; + int port = 0; + helpInt << tokens[3] << " " << tokens[1]; + helpInt >> baudrate >> port; + startServer(port, tokens[2], baudrate); + } + else + { + os << ">> FAIL: Too many running servers (currently " << servers.size() << " servers running)" << endl; + deliverOutput(); + } + } + else + { + os << getHelpMessage("start"); + deliverOutput(); + } + } + else if (tokens[0] == "stop") + { + if (tokens.size() == 2) + { + stringstream helpInt; + int port = 0; + int id = -1; + helpInt << tokens[1] << " " << tokens[1]; + helpInt >> id >> port; + if (!stopServer(id, port, tokens[1])) + { + os << ">> no sf-server with id / device / baudrate = " << tokens[1] << " found!" << endl; + deliverOutput(); + } + else + { + os << ">> stopped sf-server with id = " << id + << " ( port = " << port + << " , device = " << tokens[1] + << " )" << endl; + deliverOutput(); + } + } + else + { + os << getHelpMessage("stop"); + deliverOutput(); + } + } + else if (tokens[0] == "info") + { + if (tokens.size() == 2) + { + + stringstream helpInt; + int port = 0; + int id = -1; + helpInt << tokens[1] << " " << tokens[1]; + helpInt >> id >> port; + if (!showServerInfo(os, id, port, tokens[1])) + { + os << ">> no sf-server with id / device / baudrate = " << tokens[1] << " found!" << endl; + deliverOutput(); + } else { + deliverOutput(); + } + } + else + { + os << getHelpMessage("info"); + deliverOutput(); + } + } + else if ((tokens[0] == "close") && (controlServerStarted)) + { + if (clientFD > 0) { + os << ">> closing connection to control-client " << endl; + deliverOutput(); + close(clientFD); + clientFD = -1; + } + } + else if (tokens[0] == "list") + { + os << ">> currently running sf-servers:" << endl; + listServers(os); + deliverOutput(); + } + else if (tokens[0] == "exit") + { + os << ">> exiting..." << endl; + deliverOutput(); + exit(0); + } + else + { + if ((tokens[0] == "help") && (tokens.size() == 2)) + { + os << getHelpMessage(tokens[1]); + deliverOutput(); + } + else + { + os << getHelpMessage(tokens[0]); + deliverOutput(); + } + + } +} + +/* send string to connected client.. */ +bool SFControl::sendToClient(string message) +{ + if (clientFD < 0) + return false; + int length = message.size(); + const char* buffer = message.c_str(); + while (length > 0) + { + int n = send(clientFD, buffer, length, MSG_NOSIGNAL); + if (!(n > 0)) + { + return false; + } + length -= n; + buffer += n; + } + return true; +} + +/* receive string from connected client... */ +bool SFControl::readFromClient(string& message) +{ + if (clientFD < 0) + return false; + int length = 0; + char buffer[256]; + char* bufPtr = buffer; + *bufPtr = '\0'; + do + { + int n = read(clientFD, (void *) bufPtr, 1); + if (!(n > 0)) + { + return false; + } + } + while ((*bufPtr++ != '\n') && (length++ < 255)); + buffer[length] = '\0'; + message = (length == 1) ? "" : buffer; + return true; +} + +void SFControl::waitOnInput() +{ + bool clientConnected = false; + + struct sockaddr_in client; + unsigned int clientAddrLen = sizeof(client); + FD_ZERO(&rfds); + + while (true) + { + int maxfd = 0; + if(daemon) { + FD_CLR(0, &rfds); + } + else { + FD_SET(0, &rfds); + } + if (controlServerStarted) + { + FD_SET(serverFD, &rfds); + maxfd = (serverFD > maxfd) ? serverFD : maxfd; + } + if (clientConnected) + { + FD_SET(clientFD, &rfds); + maxfd = (clientFD > maxfd) ? clientFD : maxfd; + } + + reportError("SFControl::waitOnInput : select(maxfd+1, &rfds, NULL, NULL, NULL)", select(maxfd+1, &rfds, NULL, NULL, NULL)); + + if (FD_ISSET(0, &rfds)) + { + /* parse standard input */ + FD_CLR(0, &rfds); + string input = ""; + getline (cin, input); + if (input != "") + { + os << "standard input : " << input << endl; + if (!(clientFD < 0)) + sendToClient(os.str()); + os.str(""); + os.clear(); + parseInput(input); + } + } + if (clientFD == -1) clientConnected = false; + if (controlServerStarted) + { + if (FD_ISSET(serverFD, &rfds)) + { + /* we got a new connection request */ + FD_CLR(serverFD, &rfds); + int newClientFD = reportError("SFControl::waitOnInput : accept(serverFD, (struct sockaddr*) &client, &clientAddrLen)", accept(serverFD, (struct sockaddr*) &client, &clientAddrLen)); + if ((newClientFD >= 0) && (!clientConnected)) + { + clientFD = newClientFD; + clientConnected = true; + os << ">> accepted connection from control-client " << inet_ntoa(client.sin_addr) << endl; + deliverOutput(); + } + else + { + close(newClientFD); + } + } + } + if (clientConnected) + { + if (FD_ISSET(clientFD, &rfds)) + { + /* we got data from the connected control client */ + FD_CLR(clientFD, &rfds); + string input = ""; + if (readFromClient(input)) + { + if (input != "") + { + os << "control-client : " << input << endl; + cout << os.str(); + os.str(""); + os.clear(); + parseInput(input); + } + } + else + { + os << ">> closing connection to control-client " << inet_ntoa(client.sin_addr) << endl; + deliverOutput(); + close(clientFD); + clientFD = -1; + } + } + } + if (clientFD == -1) clientConnected = false; + } +} + +void* checkCancelThread(void* ob) +{ + static_cast(ob)->checkThreadCancel(); + return NULL; +} + +/* keeps track of self-canceled sf-servers */ +void SFControl::checkThreadCancel() +{ + while(true) + { + pthread_testcancel(); + pthread_mutex_lock(&sfControlInfo.lock); + pthread_cond_wait(&sfControlInfo.cancel, &sfControlInfo.lock); + list::iterator it = servers.begin(); + list::iterator next = it; + + while( it != servers.end() ) + { + ++next; + if ((*it).TcpServer->isErrorReported() || (*it).SerialDevice->isErrorReported()) + { + // cancel + (*it).TcpServer->cancel(); + (*it).SerialDevice->cancel(); + // inform user + os << ">> FAIL: sf-server with id = " << (*it).id + << " ( port = " << (*it).TcpServer->getPort() + << " , device = " << (*it).SerialDevice->getDevice() + << " ) canceled" << endl; + deliverOutput(); + // clean up + delete (*it).TcpServer; + delete (*it).SerialDevice; + delete (*it).tcp2serial; + delete (*it).serial2tcp; + servers.erase(it); + } + it = next; + } + pthread_mutex_unlock(&sfControlInfo.lock); + } +} + + +void SFControl::startControlServer() +{ + struct sockaddr_in me; + int opt = 1; + + serverFD = reportError("SFControl::startControlServer : socket(AF_INET, SOCK_STREAM, 0)", socket(AF_INET, SOCK_STREAM, 0)); + reportError("SFControl::startControlServer : fcntl(serverFD, F_SETFL, O_NONBLOCK)", fcntl(serverFD, F_SETFL, O_NONBLOCK)); + + memset(&me, 0, sizeof me); + me.sin_family = AF_INET; + me.sin_port = htons(controlPort); + + reportError("SFControl::startControlServer : setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))", setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))); + reportError("SFControl::startControlServer : bind(serverFD, (struct sockaddr *)&me, sizeof me)", bind(serverFD, (struct sockaddr *)&me, sizeof me)); + reportError("SFControl::startControlServer : listen(serverFD, 1)", listen(serverFD, 1)); + controlServerStarted = true; +} + +void SFControl::deliverOutput() +{ + if (!(clientFD < 0)) + sendToClient(os.str()); + cout << os.str(); + os.str(""); + os.clear(); +} + +/* reports error */ +int SFControl::reportError(const char *msg, int result) +{ + if (result < 0) + { + cerr << "FATAL : SF-Control-Server : " + << msg << " ( result = " << result << " )" << endl + << "error-description : " << strerror(errno) << endl; + exit(1); + } + return result; +} + diff --git a/support/sdk/cpp/sf/sfcontrol.h b/support/sdk/cpp/sf/sfcontrol.h new file mode 100644 index 00000000..84a9626e --- /dev/null +++ b/support/sdk/cpp/sf/sfcontrol.h @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2007, Technische Universitaet Berlin + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * - Neither the name of the Technische Universitaet Berlin nor the names + * of its contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +/** + * @author Philipp Huppertz + */ + +#ifndef SFCONTROL_H +#define SFCONTROL_H + +#include "stdio.h" +#include "packetbuffer.h" +#include "tcpcomm.h" +#include "serialcomm.h" +#include "pthread.h" +#include +#include + +class SFControl +{ +protected: + + typedef struct + { + PacketBuffer* serial2tcp; + PacketBuffer* tcp2serial; + TCPComm* TcpServer; + SerialComm* SerialDevice; + int id; + } + sfServer_t; + + /* needed to get informed about canceled threads */ + sharedControlInfo_t sfControlInfo; + + /* list of running / started sf-servers */ + std::list servers; + + /* max. allowed sf-servers */ + static const unsigned int maxSFServers = 4; + + /* pthread for thread cancel notification */ + pthread_t cancelThread; + + /* read fd set */ + fd_set rfds; + + /* write fd set */ + fd_set wfds; + + /* indicated that the control server is started */ + bool controlServerStarted; + + /* in daemon mode: do not read from stdin */ + bool daemon; + + /* tcp port the control server listens on */ + int controlPort; + + /* control server FD */ + int serverFD; + + /* control-client fd */ + int clientFD; + + /* string stream for multiplexing output (cout and control-client) */ + std::ostringstream os; + + friend void* checkCancelThread(void* ob); + + /* needed for id generation */ + int uniqueId; + +public: + + SFControl(); + + ~SFControl(); + + /* gets corresponding help message to command */ + std::string getHelpMessage(std::string msg = ""); + + /* parses command line arguments */ + void parseArgs(int argc, char *argv[]); + + /* parses input */ + void parseInput(std::string arg); + + /* main loop, waits for input */ + void waitOnInput(); + +protected: + /* checks if child threads canceled themselves */ + void checkThreadCancel(); + + /* starts the controling server */ + void startControlServer(); + + /* send string to connected client.. */ + bool sendToClient(std::string message); + + /* receive string from connected client... */ + bool readFromClient(std::string& message); + + /* starts a sf-server */ + void startServer(int port, std::string device, int baudrate); + + /* stops a given sf-server. returns false if specified server not running */ + bool stopServer(int& id, int& port, std::string& device); + + /* prints out server info for specified server */ + bool showServerInfo(std::ostream& pOs, int id, int port, std::string device); + + /* lists all running servers */ + void listServers(std::ostream& pOs); + + /* send output to console and/or to connected control client */ + void deliverOutput(); + + /* reports error to stderr */ + int reportError(const char *msg, int result); +}; + + + +#endif diff --git a/support/sdk/cpp/sf/sfpacket.cpp b/support/sdk/cpp/sf/sfpacket.cpp new file mode 100644 index 00000000..8fc555fa --- /dev/null +++ b/support/sdk/cpp/sf/sfpacket.cpp @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2007, Technische Universitaet Berlin + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * - Neither the name of the Technische Universitaet Berlin nor the names + * of its contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +/** + * @author Philipp Huppertz + */ + +#include "sfpacket.h" + +SFPacket::SFPacket(int pType, int pSeqno) +{ + length = 0; + seqno = pSeqno; + type = pType; +} + +// copy constructor +SFPacket::SFPacket(const SFPacket &pPacket) +{ + length = pPacket.getLength(); + type = pPacket.getType(); + setPayload(pPacket.getPayload(), length); +} + +SFPacket::~SFPacket() +{ + // if (buffer) delete[] buffer; +} + +const char* SFPacket::getPayload() const +{ + if ( ((type == SF_PACKET_ACK) || (type == SF_PACKET_NO_ACK))) + { + return payloadBuffer; + } + else + { + return NULL; + } +} + +int SFPacket::getLength() const +{ + return length; +} + +int SFPacket::getType() const +{ + return type; +} + +int SFPacket::getSeqno() const +{ + return seqno; +} + +bool SFPacket::setPayload(const char* pBuffer, uint8_t pLength) +{ + if ((pLength > 0) && (pLength < cMaxPacketLength) && ((type == SF_PACKET_ACK) || (type == SF_PACKET_NO_ACK))) + { + length = pLength; + for (int i=0; i < pLength; i++) + { + payloadBuffer[i] = *(pBuffer+i); + } + return true; + } + DEBUG("SFPACKET::setPayload : wrong packet length = " << static_cast(pLength) << " or type = " << type) + return false; +} + +void SFPacket::setSeqno(int pSeqno) +{ + seqno = pSeqno; +} + +void SFPacket::setType(int pType) +{ + type = pType; +} + +int const SFPacket::getMaxPayloadLength() +{ + return cMaxPacketLength; +} + +/* == operator */ +bool SFPacket::operator==(SFPacket const& pPacket) +{ + bool retval=false; + if (!((pPacket.getType() != type) || (pPacket.getLength() != length) || pPacket.getSeqno() != seqno)) + { + if ((type == SF_PACKET_ACK) || (type == SF_PACKET_NO_ACK)) + { + const char* cmpBuffer = pPacket.getPayload(); + if (cmpBuffer) { + retval = true; + // compare buffers + for (int i=0; i < length; i++) + { + if (payloadBuffer[i] != cmpBuffer[i]) + { + i = length; + retval = false; + } + } + } + } + else + { + retval = true; + } + } + return retval; +} + diff --git a/support/sdk/cpp/sf/sfpacket.h b/support/sdk/cpp/sf/sfpacket.h new file mode 100644 index 00000000..d3f5fdef --- /dev/null +++ b/support/sdk/cpp/sf/sfpacket.h @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2007, Technische Universitaet Berlin + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * - Neither the name of the Technische Universitaet Berlin nor the names + * of its contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +/** + * @author Philipp Huppertz + */ + +#ifndef SFPACKET_H +#define SFPACKET_H + +// #define DEBUG_SFPACKET + +#undef DEBUG +#ifdef DEBUG_SFPACKET +#include +#define DEBUG(message) std::cout << message << std::endl; +#else +#define DEBUG(message) +#endif + +#include +#include + +#include "serialprotocol.h" +enum { + SYNC_BYTE = SERIAL_HDLC_FLAG_BYTE, + ESCAPE_BYTE = SERIAL_HDLC_CTLESC_BYTE, + + SF_ACK = SERIAL_SERIAL_PROTO_ACK, + SF_PACKET_ACK = SERIAL_SERIAL_PROTO_PACKET_ACK, + SF_PACKET_NO_ACK = SERIAL_SERIAL_PROTO_PACKET_NOACK, + SF_UNKNOWN = SERIAL_SERIAL_PROTO_PACKET_UNKNOWN +}; + +class SFPacket{ + + +public: + /* max packet length in bytes */ + static const int cMaxPacketLength = 256; + +/** member vars **/ +protected: + /* payload buffer */ + char payloadBuffer[cMaxPacketLength]; + /* length of byte buffer */ + int length; + /* type */ + int type; + /* sequence number */ + int seqno; + + +/** member functions **/ +protected: + +public: + SFPacket(int type = SF_PACKET_ACK, int pSeqno = 0); + + ~SFPacket(); + + SFPacket(const SFPacket &pPacket); + + /* returns buffer */ + const char* getPayload() const; + + /* returns length of buffer */ + int getLength() const; + + /* returns the seqno of this packet */ + int getSeqno() const; + + /* returns type of packet */ + int getType() const; + + /* sets buffer and length and constructs frame (incl crc) */ + bool setPayload(const char* pBuffer, uint8_t pLength); + + /* sets the seqno */ + void setSeqno(int pSeqno); + + /* sets the type */ + void setType(int pType); + + /* returns max payload length */ + static const int getMaxPayloadLength(); + + /* == operator */ + bool operator==(SFPacket const& pPacket); +}; + +#endif diff --git a/support/sdk/cpp/sf/sharedinfo.h b/support/sdk/cpp/sf/sharedinfo.h new file mode 100644 index 00000000..bafa0227 --- /dev/null +++ b/support/sdk/cpp/sf/sharedinfo.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2007, Technische Universitaet Berlin + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * - Neither the name of the Technische Universitaet Berlin nor the names + * of its contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +/** + * @author Philipp Huppertz + */ + +#ifndef SHAREDINFO_H +#define SHAREDINFO_H + +#include "pthread.h" + + typedef struct + { + /* mutex to protect *Comm objects */ + pthread_mutex_t lock; + /* condition that object is canceled*/ + pthread_cond_t cancel; + } sharedControlInfo_t; + +#endif diff --git a/support/sdk/cpp/sf/tcpcomm.cpp b/support/sdk/cpp/sf/tcpcomm.cpp new file mode 100644 index 00000000..40baa35f --- /dev/null +++ b/support/sdk/cpp/sf/tcpcomm.cpp @@ -0,0 +1,546 @@ +/* + * Copyright (c) 2007, Technische Universitaet Berlin + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * - Neither the name of the Technische Universitaet Berlin nor the names + * of its contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +/** + * @author Philipp Huppertz + */ + +#include "sharedinfo.h" +#include "tcpcomm.h" +#include "sfpacket.h" +#include "stdio.h" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + + + +using namespace std; + +/* forward declarations of pthrad helper functions*/ +void* checkClientsThread(void*); +void* readClientsThread(void*); +void* writeClientsThread(void*); + +/* opens tcp server port for listening and start threads*/ +TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), errorReported(false), errorMsg(""), control(pControl) +{ + // init values + writerThreadRunning = false; + readerThreadRunning = false; + serverThreadRunning = false; + clientInfo.count = 0; + clientInfo.FDs.clear(); + readPacketCount = 0; + writtenPacketCount = 0; + port = pPort; + pthread_mutex_init(&clientInfo.sleeplock, NULL); + pthread_mutex_init(&clientInfo.countlock, NULL); + pthread_cond_init(&clientInfo.wakeup, NULL); + + struct sockaddr_in me; + int opt; + int rxBuf = 1024; + + serverFD = reportError("TCPComm::TCPComm : socket(AF_INET, SOCK_STREAM, 0)", socket(AF_INET, SOCK_STREAM, 0)); + memset(&me, 0, sizeof me); + me.sin_family = AF_INET; + me.sin_port = htons(port); + + opt = 1; + if (!errorReported) + reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))", setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))); + if (!errorReported) + reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))", setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))); + if (!errorReported) + reportError("TCPComm::TCPComm : bind(serverFD, (struct sockaddr *)&me, sizeof me)", bind(serverFD, (struct sockaddr *)&me, sizeof me)); + if (!errorReported) + reportError("TCPComm::TCPComm : listen(serverFD, 5)", listen(serverFD, 5)); + + // start thread for server socket (adding and removing clients) + if (!errorReported) + { + if (reportError("TCPComm::TCPComm : pthread_create( &serverThread, NULL, checkClientsThread, this)", pthread_create( &serverThread, NULL, checkClientsThread, this)) == 0) + serverThreadRunning = true; + // start thread for reading from client connections + if (reportError("TCPComm::TCPComm : pthread_create( &readerThread, NULL, readClientsThread, this)", pthread_create( &readerThread, NULL, readClientsThread, this)) == 0) + readerThreadRunning = true; + // start thread for writing to client connections + if (reportError("TCPComm::TCPComm : pthread_create( &writerThread, NULL, writeClientsThread, this)", pthread_create( &writerThread, NULL, writeClientsThread, this)) == 0) + writerThreadRunning = true; + } +} + + +TCPComm::~TCPComm() +{ + cancel(); + + close(serverFD); + set::iterator it; + for( it = clientInfo.FDs.begin(); it != clientInfo.FDs.end(); it++ ) + { + close(*it); + } + pthread_mutex_destroy(&clientInfo.sleeplock); + pthread_mutex_destroy(&clientInfo.countlock); + pthread_cond_destroy(&clientInfo.wakeup); +} + +int TCPComm::getPort() +{ + return port; +} + +/* reads packet */ +bool TCPComm::readPacket(int pFD, SFPacket &pPacket) +{ + char l; + char* buffer[SFPacket::getMaxPayloadLength()]; + + if (readFD(pFD, &l, 1) != 1) + { + return false; + } + if (l > SFPacket::getMaxPayloadLength()) + { + return false; + } + if (readFD(pFD, (char*) buffer, static_cast(l)) != l) + { + return false; + } + if (pPacket.setPayload((char*)buffer ,l)) + { + return true; + } + else + { + return false; + } +} + +int TCPComm::writeFD(int fd, const char *buffer, int count) +{ + int actual = 0; + while (count > 0) + { + int n = send(fd, buffer, count, MSG_NOSIGNAL); + if (n == -1) + { + return -1; + } + count -= n; + actual += n; + buffer += n; + } + return actual; +} + +/* writes packet */ +bool TCPComm::writePacket(int pFD, SFPacket &pPacket) +{ + char len = pPacket.getLength(); + if (writeFD(pFD, &len, 1) != 1) + { + return false; + } + if (writeFD(pFD, pPacket.getPayload(), len) != len) + { + return false; + } + return true; +} + +/* checks for correct version of SF protocol */ +bool TCPComm::versionCheck(int clientFD) +{ + char check[2], us[2]; + int version; + + /* Indicate version and check if a TinyOS 2.0 serial forwarder on the other end */ + us[0] = 'U'; + us[1] = ' '; + if (writeFD(clientFD, us, 2) != 2) + { + return false; + } + if (readFD(clientFD, check, 2) != 2) + { + return false; + } + if (check[0] != 'U') + { + return false; + } + + version = check[1]; + if (us[1] < version) + { + version = us[1]; + } + /* Add other cases here for later protocol versions */ + switch (version) + { + case ' ': + break; + default: + return false; + } + + return true; +} + +/* adds a client to the client list and wakes up all threads */ +void TCPComm::addClient(int clientFD) +{ + DEBUG("TCPComm::addClient : lock") + pthread_testcancel(); + pthread_mutex_lock( &clientInfo.countlock ); + bool wakeupClientThreads = false; + if (clientInfo.count == 0) + { + wakeupClientThreads = true; + } + ++clientInfo.count; + clientInfo.FDs.insert(clientFD); + if (wakeupClientThreads) + { + pthread_cond_broadcast( &clientInfo.wakeup ); + } + pthread_mutex_unlock( &clientInfo.countlock ); + DEBUG("TCPComm::addClient : unlock") +} + +void TCPComm::removeClient(int clientFD) +{ + DEBUG("TCPComm::removeClient : lock") + pthread_testcancel(); + pthread_mutex_lock( &clientInfo.countlock ); + if (clientInfo.count > 0) + { + clientInfo.FDs.erase(clientFD); + if (close(clientFD) != 0) + { + DEBUG("TCPComm::removeClient : error closing fd " << clientFD) + } + else + { + --clientInfo.count; + } + } + if (clientInfo.count == 0) + { + // clear write buffer + writeBuffer.clear(); + } + pthread_mutex_unlock( &clientInfo.countlock ); + DEBUG("TCPComm::removeClient : unlock") +} + +/* helper function to start server pthread */ +void* checkClientsThread(void* ob) +{ + static_cast(ob)->connectClients(); + return NULL; +} + +/* checks for new connected clients */ +void TCPComm::connectClients() +{ + while (true) + { + int clientFD = accept(serverFD, NULL, NULL); + pthread_testcancel(); + if (clientFD >= 0) + { + if (versionCheck(clientFD)) + { + addClient(clientFD); + } + else + { + close(clientFD); + } + } + else + { + pthread_testcancel(); + cancel(); + } + } +} + +/* helper function to start client reader pthread */ +void* readClientsThread(void* ob) +{ + static_cast(ob)->readClients(); + return NULL; +} + +/* reads from connected clients */ +void TCPComm::readClients() +{ + FD_t clientFDs; + while (true) + { + pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &clientInfo.countlock); + pthread_mutex_lock( &clientInfo.countlock ); + while( clientInfo.count == 0 ) + { + // do nothing when no client is connected... + DEBUG("TCPComm::readClients : sleeping reader thread") + pthread_cond_wait( &clientInfo.wakeup, &clientInfo.countlock ); + } + // copy set in to temp set + clientFDs = clientInfo.FDs; + // removes the cleanup handler and executes it (unlock mutex) + pthread_cleanup_pop(1); + + // check all fds (work with temp set)... + fd_set rfds; + FD_ZERO(&rfds); + int maxFD = -1; + set::iterator it; + for( it = clientFDs.begin(); it != clientFDs.end(); it++ ) + { + if (*it > maxFD) + { + maxFD = *it; + } + FD_SET(*it, &rfds); + } + if (select(maxFD + 1, &rfds, NULL, NULL, NULL) < 0 ) + { + // run = false; + reportError("TCPComm::readClients : select(maxFD+1, &rfds, NULL, NULL NULL)", -1); + } + else + { + for ( it = clientFDs.begin(); it != clientFDs.end(); it++) + { + if (FD_ISSET(*it, &rfds)) + { + SFPacket packet; + if (readPacket(*it, packet)) + { + // this call blocks until buffer is not full + readBuffer.enqueueBack(packet); + ++readPacketCount; + } + else + { + DEBUG("TCPComm::readClients : removeClient") + removeClient(*it); + } + } + } + } + } +} + +/* helper function to start client writer pthread */ +void* writeClientsThread(void* ob) +{ + static_cast(ob)->writeClients(); + return NULL; +} + +/* writes to connected clients */ +void TCPComm::writeClients() +{ + FD_t clientFDs; + while (true) + { + pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &clientInfo.countlock); + pthread_mutex_lock( &clientInfo.countlock ); + while( clientInfo.count == 0 ) + { + // do nothing when no client is connected... + DEBUG("TCPComm::writeClients : sleeping writer thread") + pthread_cond_wait( &clientInfo.wakeup, &clientInfo.countlock ); + } + // removes the cleanup handler and executes it (unlock mutex) + pthread_cleanup_pop(1); + + // blocks until buffer is not empty + SFPacket packet = writeBuffer.dequeue(); + pthread_testcancel(); + pthread_mutex_lock( &clientInfo.countlock ); + // copy client fd set into temp set + clientFDs = clientInfo.FDs; + pthread_mutex_unlock( &clientInfo.countlock ); + + // check all fds (work with temp set)... + set::iterator it; + // duplicate and send out packet to all connected clients + for( it = clientFDs.begin(); it != clientFDs.end(); it++ ) + { + if (writePacket(*it, packet)) + { + ++writtenPacketCount; + } + else + { + DEBUG("TCPComm::writeClients : removeClient") + removeClient(*it); + } + } + } +} + +/* cancels all running threads */ +void TCPComm::cancel() +{ + pthread_t callingThread = pthread_self(); + if (pthread_equal(callingThread, readerThread)) + { + DEBUG("TCPComm::cancel : by readerThread") + pthread_detach(readerThread); + if (writerThreadRunning) + { + pthread_cancel(writerThread); + DEBUG("TCPComm::cancel : writerThread canceled, joining") + pthread_join(writerThread, NULL); + writerThreadRunning = false; + } + if (serverThreadRunning) + { + pthread_cancel(serverThread); + DEBUG("TCPComm::cancel : serverThread canceled, joining") + pthread_join(serverThread, NULL); + serverThreadRunning = false; + } + readerThreadRunning = false; + pthread_cond_signal(&control.cancel); + pthread_exit(NULL); + } + else if (pthread_equal(callingThread, writerThread)) + { + DEBUG("TCPComm::cancel : by writerThread") + pthread_detach(writerThread); + if (readerThreadRunning) + { + pthread_cancel(readerThread); + DEBUG("TCPComm::cancel : readerThread canceled, joining") + pthread_join(readerThread, NULL); + readerThreadRunning = false; + } + if (serverThreadRunning) + { + pthread_cancel(serverThread); + DEBUG("TCPComm::cancel : serverThread canceled, joining") + pthread_join(serverThread, NULL); + serverThreadRunning = false; + } + writerThreadRunning = false; + pthread_cond_signal(&control.cancel); + pthread_exit(NULL); + } + else if (pthread_equal(callingThread, serverThread)) + { + DEBUG("TCPComm::cancel : by serverThread") + pthread_detach(serverThread); + if (readerThreadRunning) + { + pthread_cancel(readerThread); + DEBUG("TCPComm::cancel : readerThread canceled, joining") + pthread_join(readerThread, NULL); + readerThreadRunning = false; + } + if (writerThreadRunning) + { + pthread_cancel(writerThread); + DEBUG("TCPComm::cancel : writerThread canceled, joining") + pthread_join(writerThread, NULL); + writerThreadRunning = false; + } + serverThreadRunning = false; + pthread_cond_signal(&control.cancel); + pthread_exit(NULL); + } + else + { + DEBUG("TCPComm::cancel : by other thread") + if (serverThreadRunning) + { + pthread_cancel(serverThread); + DEBUG("TCPComm::cancel : serverThread canceled, joining") + pthread_join(serverThread, NULL); + serverThreadRunning = false; + } + if (writerThreadRunning) + { + pthread_cancel(writerThread); + DEBUG("TCPComm::cancel : writerThread canceled, joining") + pthread_join(writerThread, NULL); + writerThreadRunning = false; + } + if (readerThreadRunning) + { + pthread_cancel(readerThread); + DEBUG("TCPComm::cancel : readerThread canceled, joining") + pthread_join(readerThread, NULL); + readerThreadRunning = false; + } + pthread_cond_signal(&control.cancel); + } +} + +/* reports error */ +int TCPComm::reportError(const char *msg, int result) +{ + if ((result < 0) && (!errorReported)) + { + errorMsg << "error : SF-Server (TCPComm on port = " << port << ") : " + << msg << " ( result = " << result << " )" << endl + << "error-description : " << strerror(errno) << endl; + + cerr << errorMsg.str(); + errorReported = true; + cancel(); + } + return result; +} + +/* prints out status */ +void TCPComm::reportStatus(ostream& os) +{ + os << "SF-Server ( TCPComm on port " << port << " )" + << " : clients = " << clientInfo.count + << " , packets read = " << readPacketCount + << " , packets written = " << writtenPacketCount << endl; +} diff --git a/support/sdk/cpp/sf/tcpcomm.h b/support/sdk/cpp/sf/tcpcomm.h new file mode 100644 index 00000000..624047b9 --- /dev/null +++ b/support/sdk/cpp/sf/tcpcomm.h @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2007, Technische Universitaet Berlin + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * - Neither the name of the Technische Universitaet Berlin nor the names + * of its contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +/** + * @author Philipp Huppertz + */ + + +#ifndef TCPCOMM_H +#define TCPCOMM_H + +#include "sfpacket.h" +#include "packetbuffer.h" +#include "basecomm.h" +#include "sharedinfo.h" + +#include +#include +#include +#include + +// #define DEBUG_TCPCOMM + +#undef DEBUG +#ifdef DEBUG_TCPCOMM +#include +#define DEBUG(message) std::cout << message << std::endl; +#else +#define DEBUG(message) +#endif + +class TCPComm : public BaseComm +{ + + /** Member vars */ +protected: + /* pthread for tcp client connection handling */ + pthread_t serverThread; + + bool serverThreadRunning; + + /* pthread for tcp client reading */ + pthread_t readerThread; + + bool readerThreadRunning; + + /* pthread for tcp client writing */ + pthread_t writerThread; + + bool writerThreadRunning; + + typedef std::set FD_t; + + // thread safe shared info about connected clients + typedef struct + { + /* mutex to protect clientCount and clientFDs */ + pthread_mutex_t countlock; + /* mutex to protect wakeup condiation */ + pthread_mutex_t sleeplock; + /* wakeup condition which is siganled if clients are connected */ + pthread_cond_t wakeup; + /* number of connected clients */ + int count; + /* container for client stuff */ + FD_t FDs; + } sharedClientInfo_t; + + /* information about clients */ + sharedClientInfo_t clientInfo; + + /* number of read packets */ + int readPacketCount; + + /* number of written packets */ + int writtenPacketCount; + + /* port of this sf */ + int port; + + /* file descriptor for server port on local machine */ + int serverFD; + + /* reference to read packet buffer */ + PacketBuffer &readBuffer; + + /* reference to write packet buffer */ + PacketBuffer &writeBuffer; + + /* indicates that an error occured */ + bool errorReported; + + /* error message of reportError call */ + std::ostringstream errorMsg; + + /* for noticing the parent thread of cancelation */ + sharedControlInfo_t &control; + + /** Member functions */ + + /* needed to start pthreads */ + friend void* checkClientsThread(void* ob); + friend void* readClientsThread(void* ob); + friend void* writeClientsThread(void* ob); + +private: + /* disable standard constructor */ + TCPComm(); + +protected: + /* performs blocking write on fd */ + virtual int writeFD(int fd, const char *buffer, int count); + + /* checks SF client protocol version */ + bool versionCheck(int clientFD); + + /* reads packet */ + bool readPacket(int pFD, SFPacket &pPacket); + + /* writes packet */ + bool writePacket(int pFD, SFPacket &pPacket); + + /* adds client to the list */ + void addClient(int clientFD); + + /* removes client from the list */ + void removeClient(int clientFD); + + /* checks for connecting clients - main thread for connection handling */ + void connectClients(); + + /* checks for messages from the clients - producer thread */ + void readClients(); + + /* write messages to clients (duplicate) - consumer thread */ + void writeClients(); + + /* reports error to stderr */ + int reportError(const char *msg, int result); + + +public: + /* create SF TCP server - init and start threads */ + TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl); + + /* wait for threads, close fds and cleanup */ + ~TCPComm(); + + /* cancels all running threads */ + void cancel(); + + /* returns the TCP/IP port of this sf server */ + int getPort(); + + /* reports status info to stdout */ + void reportStatus(std::ostream& os); + + /* returns if error occurred */ + bool isErrorReported() { return errorReported; } +}; + +#endif -- 2.39.2