]> oss.titaniummirror.com Git - tinyos-2.x.git/blobdiff - support/sdk/cpp/sf/tcpcomm.cpp
re-implementation of C serial forwarder in C++
[tinyos-2.x.git] / support / sdk / cpp / sf / tcpcomm.cpp
diff --git a/support/sdk/cpp/sf/tcpcomm.cpp b/support/sdk/cpp/sf/tcpcomm.cpp
new file mode 100644 (file)
index 0000000..40baa35
--- /dev/null
@@ -0,0 +1,546 @@
+/*
+ * Copyright (c) 2007, Technische Universitaet Berlin
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without 
+ * modification, are permitted provided that the following conditions 
+ * are met:
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright 
+ *   notice, this list of conditions and the following disclaimer in the 
+ *   documentation and/or other materials provided with the distribution.
+ * - Neither the name of the Technische Universitaet Berlin nor the names 
+ *   of its contributors may be used to endorse or promote products derived
+ *   from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 
+ * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY 
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE 
+ * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+/**
+ * @author Philipp Huppertz <huppertz@tkn.tu-berlin.de>
+ */
+
+#include "sharedinfo.h"
+#include "tcpcomm.h"
+#include "sfpacket.h"
+#include "stdio.h"
+
+#include <iostream>
+#include <set>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <pthread.h>
+
+
+
+using namespace std;
+
+/* forward declarations of pthrad helper functions*/
+void* checkClientsThread(void*);
+void* readClientsThread(void*);
+void* writeClientsThread(void*);
+
+/* opens tcp server port for listening and start threads*/
+TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), errorReported(false), errorMsg(""), control(pControl)
+{
+    // init values
+    writerThreadRunning = false;
+    readerThreadRunning = false;
+    serverThreadRunning = false;
+    clientInfo.count = 0;
+    clientInfo.FDs.clear();
+    readPacketCount = 0;
+    writtenPacketCount = 0;
+    port = pPort;
+    pthread_mutex_init(&clientInfo.sleeplock, NULL);
+    pthread_mutex_init(&clientInfo.countlock, NULL);
+    pthread_cond_init(&clientInfo.wakeup, NULL);
+
+    struct sockaddr_in me;
+    int opt;
+    int rxBuf = 1024;
+    
+    serverFD = reportError("TCPComm::TCPComm : socket(AF_INET, SOCK_STREAM, 0)", socket(AF_INET, SOCK_STREAM, 0));
+    memset(&me, 0, sizeof me);
+    me.sin_family = AF_INET;
+    me.sin_port = htons(port);
+
+    opt = 1;
+    if (!errorReported)
+        reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))", setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)));
+    if (!errorReported)
+        reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))", setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf)));
+    if (!errorReported)
+        reportError("TCPComm::TCPComm : bind(serverFD, (struct sockaddr *)&me, sizeof me)", bind(serverFD, (struct sockaddr *)&me, sizeof me));
+    if (!errorReported)
+        reportError("TCPComm::TCPComm : listen(serverFD, 5)", listen(serverFD, 5));
+
+    // start thread for server socket (adding and removing clients)
+    if (!errorReported)
+    {
+        if (reportError("TCPComm::TCPComm : pthread_create( &serverThread, NULL, checkClientsThread, this)", pthread_create( &serverThread, NULL, checkClientsThread, this)) == 0) 
+            serverThreadRunning = true;
+        // start thread for reading from client connections
+        if (reportError("TCPComm::TCPComm : pthread_create( &readerThread, NULL, readClientsThread, this)", pthread_create( &readerThread, NULL, readClientsThread, this)) == 0)
+            readerThreadRunning = true;
+        // start thread for writing to client connections
+        if (reportError("TCPComm::TCPComm : pthread_create( &writerThread, NULL, writeClientsThread, this)", pthread_create( &writerThread, NULL, writeClientsThread, this)) == 0)
+            writerThreadRunning = true;
+    }
+}
+
+
+TCPComm::~TCPComm()
+{
+    cancel();
+
+    close(serverFD);
+    set<int>::iterator it;
+    for( it = clientInfo.FDs.begin(); it != clientInfo.FDs.end(); it++ )
+    {
+        close(*it);
+    }
+    pthread_mutex_destroy(&clientInfo.sleeplock);
+    pthread_mutex_destroy(&clientInfo.countlock);
+    pthread_cond_destroy(&clientInfo.wakeup);
+}
+
+int TCPComm::getPort()
+{
+    return port;
+}
+
+/* reads packet */
+bool TCPComm::readPacket(int pFD, SFPacket &pPacket)
+{
+    char l;
+    char* buffer[SFPacket::getMaxPayloadLength()];
+
+    if (readFD(pFD, &l, 1) != 1)
+    {
+        return false;
+    }
+    if (l > SFPacket::getMaxPayloadLength())
+    {
+        return false;
+    }
+    if (readFD(pFD, (char*) buffer, static_cast<int>(l)) != l)
+    {
+        return false;
+    }
+    if (pPacket.setPayload((char*)buffer ,l))
+    {
+        return true;
+    }
+    else
+    {
+        return false;
+    }
+}
+
+int TCPComm::writeFD(int fd, const char *buffer, int count)
+{
+    int actual = 0;
+    while (count > 0)
+    {
+        int n = send(fd, buffer, count, MSG_NOSIGNAL);
+        if (n == -1)
+        {
+            return -1;
+        }
+        count -= n;
+        actual += n;
+        buffer += n;
+    }
+    return actual;
+}
+
+/* writes packet */
+bool TCPComm::writePacket(int pFD, SFPacket &pPacket)
+{
+    char len = pPacket.getLength();
+    if (writeFD(pFD, &len, 1) != 1)
+    {
+        return false;
+    }
+    if (writeFD(pFD, pPacket.getPayload(), len) != len)
+    {
+        return false;
+    }
+    return true;
+}
+
+/* checks for correct version of SF protocol */
+bool TCPComm::versionCheck(int clientFD)
+{
+    char check[2], us[2];
+    int version;
+
+    /* Indicate version and check if a TinyOS 2.0 serial forwarder on the other end */
+    us[0] = 'U';
+    us[1] = ' ';
+    if (writeFD(clientFD, us, 2) != 2)
+    {
+        return false;
+    }
+    if (readFD(clientFD, check, 2) != 2)
+    {
+        return false;
+    }
+    if (check[0] != 'U')
+    {
+        return false;
+    }
+
+    version = check[1];
+    if (us[1] < version)
+    {
+        version = us[1];
+    }
+    /* Add other cases here for later protocol versions */
+    switch (version)
+    {
+    case ' ':
+        break;
+    default:
+        return false;
+    }
+
+    return true;
+}
+
+/* adds a client to the client list and wakes up all threads */
+void TCPComm::addClient(int clientFD)
+{
+    DEBUG("TCPComm::addClient : lock")
+    pthread_testcancel();
+    pthread_mutex_lock( &clientInfo.countlock );
+    bool wakeupClientThreads = false;
+    if (clientInfo.count == 0)
+    {
+        wakeupClientThreads = true;
+    }
+    ++clientInfo.count;
+    clientInfo.FDs.insert(clientFD);
+    if (wakeupClientThreads)
+    {
+        pthread_cond_broadcast( &clientInfo.wakeup );
+    }
+    pthread_mutex_unlock( &clientInfo.countlock );
+    DEBUG("TCPComm::addClient : unlock")
+}
+
+void TCPComm::removeClient(int clientFD)
+{
+    DEBUG("TCPComm::removeClient : lock")
+    pthread_testcancel();
+    pthread_mutex_lock( &clientInfo.countlock );
+    if (clientInfo.count > 0)
+    {
+        clientInfo.FDs.erase(clientFD);
+        if (close(clientFD) != 0)
+        {
+            DEBUG("TCPComm::removeClient : error closing fd " << clientFD)
+        }
+        else
+        {
+            --clientInfo.count;
+        }
+    }
+    if (clientInfo.count == 0)
+    {
+        // clear write buffer
+        writeBuffer.clear();
+    }
+    pthread_mutex_unlock( &clientInfo.countlock );
+    DEBUG("TCPComm::removeClient : unlock")
+}
+
+/* helper function to start server pthread */
+void* checkClientsThread(void* ob)
+{
+    static_cast<TCPComm*>(ob)->connectClients();
+    return NULL;
+}
+
+/* checks for new connected clients */
+void TCPComm::connectClients()
+{
+    while (true)
+    {
+        int clientFD = accept(serverFD, NULL, NULL);
+       pthread_testcancel();
+        if (clientFD >= 0)
+        {
+            if (versionCheck(clientFD))
+            {
+                addClient(clientFD);
+            }
+            else
+            {
+                close(clientFD);
+            }
+        }
+        else
+        {
+            pthread_testcancel();
+            cancel();
+        }
+    }
+}
+
+/* helper function to start client reader pthread */
+void* readClientsThread(void* ob)
+{
+    static_cast<TCPComm*>(ob)->readClients();
+    return NULL;
+}
+
+/* reads from connected clients */
+void TCPComm::readClients()
+{
+    FD_t clientFDs;
+    while (true)
+    {
+        pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &clientInfo.countlock);
+        pthread_mutex_lock( &clientInfo.countlock );
+        while( clientInfo.count == 0 )
+        {
+            // do nothing when no client is connected...
+            DEBUG("TCPComm::readClients : sleeping reader thread")
+            pthread_cond_wait( &clientInfo.wakeup, &clientInfo.countlock );
+        }
+        // copy set in to temp set
+        clientFDs = clientInfo.FDs;
+        // removes the cleanup handler and executes it (unlock mutex)
+        pthread_cleanup_pop(1); 
+
+        // check all fds (work with temp set)...
+        fd_set rfds;
+        FD_ZERO(&rfds);
+        int maxFD = -1;
+        set<int>::iterator it;
+        for( it = clientFDs.begin(); it != clientFDs.end(); it++ )
+        {
+            if (*it > maxFD)
+            {
+                maxFD = *it;
+            }
+            FD_SET(*it, &rfds);
+        }
+        if (select(maxFD + 1, &rfds, NULL, NULL, NULL) < 0 )
+        {
+            //             run = false;
+            reportError("TCPComm::readClients : select(maxFD+1, &rfds, NULL, NULL NULL)", -1);
+        }
+        else
+        {
+            for ( it = clientFDs.begin(); it != clientFDs.end(); it++)
+            {
+                if (FD_ISSET(*it, &rfds))
+                {
+                    SFPacket packet;
+                    if (readPacket(*it, packet))
+                    {
+                        // this call blocks until buffer is not full
+                        readBuffer.enqueueBack(packet);
+                        ++readPacketCount;
+                    }
+                    else
+                    {
+                        DEBUG("TCPComm::readClients : removeClient")
+                        removeClient(*it);
+                    }
+                }
+            }
+        }
+    }
+}
+
+/* helper function to start client writer pthread */
+void* writeClientsThread(void* ob)
+{
+    static_cast<TCPComm*>(ob)->writeClients();
+    return NULL;
+}
+
+/* writes to connected clients */
+void TCPComm::writeClients()
+{
+    FD_t clientFDs;
+    while (true)
+    {
+        pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &clientInfo.countlock);
+        pthread_mutex_lock( &clientInfo.countlock );
+        while( clientInfo.count == 0 )
+        {
+            // do nothing when no client is connected...
+            DEBUG("TCPComm::writeClients : sleeping writer thread")
+            pthread_cond_wait( &clientInfo.wakeup, &clientInfo.countlock );
+        }
+        // removes the cleanup handler and executes it (unlock mutex)
+        pthread_cleanup_pop(1); 
+
+        // blocks until buffer is not empty
+        SFPacket packet = writeBuffer.dequeue();
+        pthread_testcancel();
+        pthread_mutex_lock( &clientInfo.countlock );
+        // copy client fd set into temp set
+        clientFDs = clientInfo.FDs;
+        pthread_mutex_unlock( &clientInfo.countlock );
+
+        // check all fds (work with temp set)...
+        set<int>::iterator it;
+        // duplicate and send out packet to all connected clients
+        for( it = clientFDs.begin(); it != clientFDs.end(); it++ )
+        {
+            if (writePacket(*it, packet))
+            {
+                ++writtenPacketCount;
+            }
+            else
+            {
+                DEBUG("TCPComm::writeClients : removeClient")
+                removeClient(*it);
+            }
+        }
+    }
+}
+
+/* cancels all running threads */
+void TCPComm::cancel()
+{
+    pthread_t callingThread = pthread_self();
+    if (pthread_equal(callingThread, readerThread))
+    {
+        DEBUG("TCPComm::cancel : by readerThread")
+        pthread_detach(readerThread);
+        if (writerThreadRunning)
+        {
+            pthread_cancel(writerThread);
+            DEBUG("TCPComm::cancel : writerThread canceled, joining")
+            pthread_join(writerThread, NULL);
+            writerThreadRunning = false;
+        }
+        if (serverThreadRunning)
+        {
+            pthread_cancel(serverThread);
+            DEBUG("TCPComm::cancel : serverThread canceled, joining")
+            pthread_join(serverThread, NULL);
+            serverThreadRunning = false;
+        }
+        readerThreadRunning = false;
+       pthread_cond_signal(&control.cancel);
+        pthread_exit(NULL);
+    }
+    else if (pthread_equal(callingThread, writerThread))
+    {
+        DEBUG("TCPComm::cancel : by writerThread")
+        pthread_detach(writerThread);
+        if (readerThreadRunning)
+        {
+            pthread_cancel(readerThread);
+            DEBUG("TCPComm::cancel : readerThread canceled, joining")
+            pthread_join(readerThread, NULL);
+            readerThreadRunning = false;
+        }
+        if (serverThreadRunning)
+        {
+            pthread_cancel(serverThread);
+            DEBUG("TCPComm::cancel : serverThread canceled, joining")
+            pthread_join(serverThread, NULL);
+            serverThreadRunning = false;
+        }
+        writerThreadRunning = false;
+       pthread_cond_signal(&control.cancel);
+        pthread_exit(NULL);
+    }
+    else if (pthread_equal(callingThread, serverThread))
+    {
+        DEBUG("TCPComm::cancel : by serverThread")
+        pthread_detach(serverThread);
+        if (readerThreadRunning)
+        {
+            pthread_cancel(readerThread);
+            DEBUG("TCPComm::cancel : readerThread canceled, joining")
+           pthread_join(readerThread, NULL);
+            readerThreadRunning = false;
+        }
+        if (writerThreadRunning)
+        {
+            pthread_cancel(writerThread);
+            DEBUG("TCPComm::cancel : writerThread canceled, joining")
+            pthread_join(writerThread, NULL);
+            writerThreadRunning = false;
+        }
+        serverThreadRunning = false;
+       pthread_cond_signal(&control.cancel);
+        pthread_exit(NULL);
+    }
+    else
+    {
+        DEBUG("TCPComm::cancel : by other thread")
+       if (serverThreadRunning)
+        {
+            pthread_cancel(serverThread);
+            DEBUG("TCPComm::cancel : serverThread canceled, joining")
+            pthread_join(serverThread, NULL);
+            serverThreadRunning = false;
+        }
+       if (writerThreadRunning)
+        {
+            pthread_cancel(writerThread);
+            DEBUG("TCPComm::cancel : writerThread canceled, joining")
+            pthread_join(writerThread, NULL);
+            writerThreadRunning = false;
+        }
+        if (readerThreadRunning)
+        {
+            pthread_cancel(readerThread);
+            DEBUG("TCPComm::cancel : readerThread canceled, joining")
+            pthread_join(readerThread, NULL);
+            readerThreadRunning = false;
+        }
+       pthread_cond_signal(&control.cancel);
+    }
+}
+
+/* reports error */
+int TCPComm::reportError(const char *msg, int result)
+{
+    if ((result < 0) && (!errorReported))
+    {
+        errorMsg << "error : SF-Server (TCPComm on port = " << port << ") : "
+        << msg << " ( result = " << result << " )" << endl
+        << "error-description : " << strerror(errno) << endl;
+
+        cerr << errorMsg.str();
+        errorReported = true;
+        cancel();
+    }
+    return result;
+}
+
+/* prints out status */
+void TCPComm::reportStatus(ostream& os)
+{
+    os << "SF-Server ( TCPComm on port " << port << " )"
+    << " : clients = " << clientInfo.count
+    << " , packets read = " << readPacketCount
+    << " , packets written = " << writtenPacketCount << endl;
+}