]> oss.titaniummirror.com Git - tinyos-2.x.git/blobdiff - support/sdk/cpp/sf/tcpcomm.cpp
return from select when a client is added or removed
[tinyos-2.x.git] / support / sdk / cpp / sf / tcpcomm.cpp
index 7e064993a4947eb9968747aa80981bc0dd241a5b..762487f6a25b2c9ea916c547af9ef90b79d0267a 100644 (file)
@@ -74,33 +74,62 @@ TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffe
     struct sockaddr_in me;
     int opt;
     int rxBuf = 1024;
-    
-    serverFD = reportError("TCPComm::TCPComm : socket(AF_INET, SOCK_STREAM, 0)", socket(AF_INET, SOCK_STREAM, 0));
+
+    /* create pipe to inform client reader of new clients */
+    if (!errorReported) {
+        int pipeFDPair[2];
+        reportError("TCPComm::TCPComm : pipe(pipeFDPair)", pipe(pipeFDPair));
+        pipeWriteFD = pipeFDPair[1];
+        pipeReadFD = pipeFDPair[0];
+    }
+    if (!errorReported) {
+        reportError("TCPComm::TCPComm : fcntl(pipeReadFD, F_SETFL, O_NONBLOCK);",
+                    fcntl(pipeReadFD, F_SETFL, O_NONBLOCK));
+    }
+    /* create server socket where clients connect */
+    if (!errorReported) {
+        serverFD = reportError("TCPComm::TCPComm : socket(AF_INET, SOCK_STREAM, 0)",
+                               socket(AF_INET, SOCK_STREAM, 0));
+    }
     memset(&me, 0, sizeof me);
     me.sin_family = AF_INET;
     me.sin_port = htons(port);
-
+    
     opt = 1;
-    if (!errorReported)
-        reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))", setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)));
-    if (!errorReported)
-        reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))", setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf)));
-    if (!errorReported)
-        reportError("TCPComm::TCPComm : bind(serverFD, (struct sockaddr *)&me, sizeof me)", bind(serverFD, (struct sockaddr *)&me, sizeof me));
-    if (!errorReported)
-        reportError("TCPComm::TCPComm : listen(serverFD, 5)", listen(serverFD, 5));
+    if (!errorReported) {
+        reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))",
+                    setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)));
+    }
+    if (!errorReported) {
+        reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))",
+                    setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf)));
+    }
+    if (!errorReported) {
+        reportError("TCPComm::TCPComm : bind(serverFD, (struct sockaddr *)&me, sizeof me)",
+                    bind(serverFD, (struct sockaddr *)&me, sizeof me));
+    }
+    if (!errorReported) {
+        reportError("TCPComm::TCPComm : listen(serverFD, 5)",
+                    listen(serverFD, 5));
+    }
 
     // start thread for server socket (adding and removing clients)
     if (!errorReported)
     {
-        if (reportError("TCPComm::TCPComm : pthread_create( &serverThread, NULL, checkClientsThread, this)", pthread_create( &serverThread, NULL, checkClientsThread, this)) == 0) 
+        if (reportError("TCPComm::TCPComm : pthread_create( &serverThread, NULL, checkClientsThread, this)",
+                        pthread_create( &serverThread, NULL, checkClientsThread, this)) == 0) {
             serverThreadRunning = true;
+        }
         // start thread for reading from client connections
-        if (reportError("TCPComm::TCPComm : pthread_create( &readerThread, NULL, readClientsThread, this)", pthread_create( &readerThread, NULL, readClientsThread, this)) == 0)
+        if (reportError("TCPComm::TCPComm : pthread_create( &readerThread, NULL, readClientsThread, this)",
+                        pthread_create( &readerThread, NULL, readClientsThread, this)) == 0) {
             readerThreadRunning = true;
+        }
         // start thread for writing to client connections
-        if (reportError("TCPComm::TCPComm : pthread_create( &writerThread, NULL, writeClientsThread, this)", pthread_create( &writerThread, NULL, writeClientsThread, this)) == 0)
+        if (reportError("TCPComm::TCPComm : pthread_create( &writerThread, NULL, writeClientsThread, this)",
+                        pthread_create( &writerThread, NULL, writeClientsThread, this)) == 0) {
             writerThreadRunning = true;
+        }
     }
 }
 
@@ -246,6 +275,7 @@ void TCPComm::addClient(int clientFD)
         pthread_cond_broadcast( &clientInfo.wakeup );
     }
     pthread_mutex_unlock( &clientInfo.countlock );
+    stuffPipe();
     DEBUG("TCPComm::addClient : unlock")
 }
 
@@ -272,6 +302,7 @@ void TCPComm::removeClient(int clientFD)
         writeBuffer.clear();
     }
     pthread_mutex_unlock( &clientInfo.countlock );
+    stuffPipe();
     DEBUG("TCPComm::removeClient : unlock")
 }
 
@@ -332,12 +363,12 @@ void TCPComm::readClients()
         // copy set in to temp set
         clientFDs = clientInfo.FDs;
         // removes the cleanup handler and executes it (unlock mutex)
-        pthread_cleanup_pop(1); 
-
+        pthread_cleanup_pop(1);
         // check all fds (work with temp set)...
         fd_set rfds;
         FD_ZERO(&rfds);
-        int maxFD = -1;
+        int maxFD = pipeReadFD;
+        FD_SET(pipeReadFD, &rfds);
         set<int>::iterator it;
         for( it = clientFDs.begin(); it != clientFDs.end(); it++ )
         {
@@ -350,11 +381,14 @@ void TCPComm::readClients()
         if (select(maxFD + 1, &rfds, NULL, NULL, NULL) < 0 )
         {
             //             run = false;
-            reportError("TCPComm::readClients : select(maxFD+1, &rfds, NULL, NULL NULL)", -1);
+            reportError("TCPComm::readClients : select(maxFD+1, &rfds, NULL, NULL, NULL)", -1);
         }
         else
         {
-            for ( it = clientFDs.begin(); it != clientFDs.end(); it++)
+            if(FD_ISSET(pipeReadFD, &rfds)) {
+                clearPipe();
+            }
+            for (it = clientFDs.begin(); it != clientFDs.end(); it++)
             {
                 if (FD_ISSET(*it, &rfds))
                 {
@@ -548,3 +582,16 @@ void TCPComm::reportStatus(ostream& os)
     << " , packets read = " << readPacketCount
     << " , packets written = " << writtenPacketCount << endl;
 }
+
+void TCPComm::stuffPipe() 
+{
+    char info = 'n';
+    write(pipeWriteFD, &info, 1);
+}
+
+void TCPComm::clearPipe() {
+    char buf;
+    while(read(pipeReadFD, &buf, 1) > 0) {
+        ;
+    }
+}