]> oss.titaniummirror.com Git - tinyos-2.x.git/blobdiff - support/sdk/cpp/sf/tcpcomm.cpp
Merge TinyOS 2.1.1 into master.
[tinyos-2.x.git] / support / sdk / cpp / sf / tcpcomm.cpp
index 7e064993a4947eb9968747aa80981bc0dd241a5b..2e77a22cfca9efcf0b636fbe1f6856dcc5f7be9b 100644 (file)
@@ -38,6 +38,7 @@
 #include <iostream>
 #include <set>
 
+#include <cstring>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
@@ -57,7 +58,7 @@ void* writeClientsThread(void*);
 
 /* opens tcp server port for listening and start threads*/
 TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), errorReported(false), errorMsg(""), control(pControl)
-{
+{   
     // init values
     writerThreadRunning = false;
     readerThreadRunning = false;
@@ -67,6 +68,7 @@ TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffe
     readPacketCount = 0;
     writtenPacketCount = 0;
     port = pPort;
+    
     pthread_mutex_init(&clientInfo.sleeplock, NULL);
     pthread_mutex_init(&clientInfo.countlock, NULL);
     pthread_cond_init(&clientInfo.wakeup, NULL);
@@ -74,33 +76,62 @@ TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffe
     struct sockaddr_in me;
     int opt;
     int rxBuf = 1024;
-    
-    serverFD = reportError("TCPComm::TCPComm : socket(AF_INET, SOCK_STREAM, 0)", socket(AF_INET, SOCK_STREAM, 0));
+
+    /* create pipe to inform client reader of new clients */
+    if (!errorReported) {
+        int pipeFDPair[2];
+        reportError("TCPComm::TCPComm : pipe(pipeFDPair)", pipe(pipeFDPair));
+        pipeWriteFD = pipeFDPair[1];
+        pipeReadFD = pipeFDPair[0];
+    }
+    if (!errorReported) {
+        reportError("TCPComm::TCPComm : fcntl(pipeReadFD, F_SETFL, O_NONBLOCK);",
+                    fcntl(pipeReadFD, F_SETFL, O_NONBLOCK));
+    }
+    /* create server socket where clients connect */
+    if (!errorReported) {
+        serverFD = reportError("TCPComm::TCPComm : socket(AF_INET, SOCK_STREAM, 0)",
+                               socket(AF_INET, SOCK_STREAM, 0));
+    }
     memset(&me, 0, sizeof me);
     me.sin_family = AF_INET;
     me.sin_port = htons(port);
-
+    
     opt = 1;
-    if (!errorReported)
-        reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))", setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)));
-    if (!errorReported)
-        reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))", setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf)));
-    if (!errorReported)
-        reportError("TCPComm::TCPComm : bind(serverFD, (struct sockaddr *)&me, sizeof me)", bind(serverFD, (struct sockaddr *)&me, sizeof me));
-    if (!errorReported)
-        reportError("TCPComm::TCPComm : listen(serverFD, 5)", listen(serverFD, 5));
+    if (!errorReported) {
+        reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))",
+                    setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)));
+    }
+    if (!errorReported) {
+        reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))",
+                    setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf)));
+    }
+    if (!errorReported) {
+        reportError("TCPComm::TCPComm : bind(serverFD, (struct sockaddr *)&me, sizeof me)",
+                    bind(serverFD, (struct sockaddr *)&me, sizeof me));
+    }
+    if (!errorReported) {
+        reportError("TCPComm::TCPComm : listen(serverFD, 5)",
+                    listen(serverFD, 5));
+    }
 
     // start thread for server socket (adding and removing clients)
     if (!errorReported)
     {
-        if (reportError("TCPComm::TCPComm : pthread_create( &serverThread, NULL, checkClientsThread, this)", pthread_create( &serverThread, NULL, checkClientsThread, this)) == 0) 
+        if (reportError("TCPComm::TCPComm : pthread_create( &serverThread, NULL, checkClientsThread, this)",
+                        pthread_create( &serverThread, NULL, checkClientsThread, this)) == 0) {
             serverThreadRunning = true;
+        }
         // start thread for reading from client connections
-        if (reportError("TCPComm::TCPComm : pthread_create( &readerThread, NULL, readClientsThread, this)", pthread_create( &readerThread, NULL, readClientsThread, this)) == 0)
+        if (reportError("TCPComm::TCPComm : pthread_create( &readerThread, NULL, readClientsThread, this)",
+                        pthread_create( &readerThread, NULL, readClientsThread, this)) == 0) {
             readerThreadRunning = true;
+        }
         // start thread for writing to client connections
-        if (reportError("TCPComm::TCPComm : pthread_create( &writerThread, NULL, writeClientsThread, this)", pthread_create( &writerThread, NULL, writeClientsThread, this)) == 0)
+        if (reportError("TCPComm::TCPComm : pthread_create( &writerThread, NULL, writeClientsThread, this)",
+                        pthread_create( &writerThread, NULL, writeClientsThread, this)) == 0) {
             writerThreadRunning = true;
+        }
     }
 }
 
@@ -115,6 +146,8 @@ TCPComm::~TCPComm()
     {
         close(*it);
     }
+    close(pipeWriteFD);
+    close(pipeReadFD);
     pthread_mutex_destroy(&clientInfo.sleeplock);
     pthread_mutex_destroy(&clientInfo.countlock);
     pthread_cond_destroy(&clientInfo.wakeup);
@@ -130,8 +163,9 @@ bool TCPComm::readPacket(int pFD, SFPacket &pPacket)
 {
     char l;
     char* buffer[SFPacket::getMaxPayloadLength()];
-
-    if (readFD(pFD, &l, 1) != 1)
+    int err;
+    
+    if (readFD(pFD, &l, 1, &err) != 1)
     {
         return false;
     }
@@ -139,7 +173,7 @@ bool TCPComm::readPacket(int pFD, SFPacket &pPacket)
     {
         return false;
     }
-    if (readFD(pFD, (char*) buffer, static_cast<int>(l)) != l)
+    if (readFD(pFD, (char*) buffer, static_cast<int>(l), &err) != l)
     {
         return false;
     }
@@ -153,7 +187,7 @@ bool TCPComm::readPacket(int pFD, SFPacket &pPacket)
     }
 }
 
-int TCPComm::writeFD(int fd, const char *buffer, int count)
+int TCPComm::writeFD(int fd, const char *buffer, int count, int *err)
 {
     int actual = 0;
     while (count > 0)
@@ -163,8 +197,8 @@ int TCPComm::writeFD(int fd, const char *buffer, int count)
 #else
         int n = send(fd, buffer, count, MSG_NOSIGNAL);
 #endif
-        if (n == -1)
-        {
+        if (n == -1) {
+            *err = errno;
             return -1;
         }
         count -= n;
@@ -177,16 +211,9 @@ int TCPComm::writeFD(int fd, const char *buffer, int count)
 /* writes packet */
 bool TCPComm::writePacket(int pFD, SFPacket &pPacket)
 {
-    char len = pPacket.getLength();
-    if (writeFD(pFD, &len, 1) != 1)
-    {
-        return false;
-    }
-    if (writeFD(pFD, pPacket.getPayload(), len) != len)
-    {
-        return false;
-    }
-    return true;
+    int len = pPacket.getTcpLength();
+    int err;
+    return (writeFD(pFD, pPacket.getTcpPayload(), len, &err) == len);
 }
 
 /* checks for correct version of SF protocol */
@@ -194,15 +221,16 @@ bool TCPComm::versionCheck(int clientFD)
 {
     char check[2], us[2];
     int version;
-
+    int err = 0;
     /* Indicate version and check if a TinyOS 2.0 serial forwarder on the other end */
     us[0] = 'U';
     us[1] = ' ';
-    if (writeFD(clientFD, us, 2) != 2)
+    
+    if (writeFD(clientFD, us, 2, &err) != 2)
     {
         return false;
     }
-    if (readFD(clientFD, check, 2) != 2)
+    if (readFD(clientFD, check, 2, &err) != 2)
     {
         return false;
     }
@@ -246,6 +274,7 @@ void TCPComm::addClient(int clientFD)
         pthread_cond_broadcast( &clientInfo.wakeup );
     }
     pthread_mutex_unlock( &clientInfo.countlock );
+    stuffPipe();
     DEBUG("TCPComm::addClient : unlock")
 }
 
@@ -272,6 +301,7 @@ void TCPComm::removeClient(int clientFD)
         writeBuffer.clear();
     }
     pthread_mutex_unlock( &clientInfo.countlock );
+    stuffPipe();
     DEBUG("TCPComm::removeClient : unlock")
 }
 
@@ -332,12 +362,12 @@ void TCPComm::readClients()
         // copy set in to temp set
         clientFDs = clientInfo.FDs;
         // removes the cleanup handler and executes it (unlock mutex)
-        pthread_cleanup_pop(1); 
-
+        pthread_cleanup_pop(1);
         // check all fds (work with temp set)...
         fd_set rfds;
         FD_ZERO(&rfds);
-        int maxFD = -1;
+        int maxFD = pipeReadFD;
+        FD_SET(pipeReadFD, &rfds);
         set<int>::iterator it;
         for( it = clientFDs.begin(); it != clientFDs.end(); it++ )
         {
@@ -350,23 +380,24 @@ void TCPComm::readClients()
         if (select(maxFD + 1, &rfds, NULL, NULL, NULL) < 0 )
         {
             //             run = false;
-            reportError("TCPComm::readClients : select(maxFD+1, &rfds, NULL, NULL NULL)", -1);
+            reportError("TCPComm::readClients : select(maxFD+1, &rfds, NULL, NULL, NULL)", -1);
         }
         else
         {
-            for ( it = clientFDs.begin(); it != clientFDs.end(); it++)
+            if(FD_ISSET(pipeReadFD, &rfds)) {
+                clearPipe();
+            }
+            for (it = clientFDs.begin(); it != clientFDs.end(); it++)
             {
                 if (FD_ISSET(*it, &rfds))
                 {
                     SFPacket packet;
-                    if (readPacket(*it, packet))
-                    {
+                    if(readPacket(*it, packet)) {
                         // this call blocks until buffer is not full
                         readBuffer.enqueueBack(packet);
                         ++readPacketCount;
                     }
-                    else
-                    {
+                    else {
                         DEBUG("TCPComm::readClients : removeClient")
                         removeClient(*it);
                     }
@@ -548,3 +579,16 @@ void TCPComm::reportStatus(ostream& os)
     << " , packets read = " << readPacketCount
     << " , packets written = " << writtenPacketCount << endl;
 }
+
+void TCPComm::stuffPipe() 
+{
+    char info = 'n';
+    if(write(pipeWriteFD, &info, 1) != 1) DEBUG("TCPComm::stuffPipe : lokal pipe is broken");
+}
+
+void TCPComm::clearPipe() {
+    char buf;
+    while(read(pipeReadFD, &buf, 1) > 0) {
+        ;
+    }
+}