]> oss.titaniummirror.com Git - tinyos-2.x.git/commitdiff
return from select when a client is added or removed
authorandreaskoepke <andreaskoepke>
Fri, 21 Sep 2007 14:01:13 +0000 (14:01 +0000)
committerandreaskoepke <andreaskoepke>
Fri, 21 Sep 2007 14:01:13 +0000 (14:01 +0000)
support/sdk/cpp/sf/tcpcomm.cpp
support/sdk/cpp/sf/tcpcomm.h

index 7e064993a4947eb9968747aa80981bc0dd241a5b..762487f6a25b2c9ea916c547af9ef90b79d0267a 100644 (file)
@@ -74,33 +74,62 @@ TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffe
     struct sockaddr_in me;
     int opt;
     int rxBuf = 1024;
-    
-    serverFD = reportError("TCPComm::TCPComm : socket(AF_INET, SOCK_STREAM, 0)", socket(AF_INET, SOCK_STREAM, 0));
+
+    /* create pipe to inform client reader of new clients */
+    if (!errorReported) {
+        int pipeFDPair[2];
+        reportError("TCPComm::TCPComm : pipe(pipeFDPair)", pipe(pipeFDPair));
+        pipeWriteFD = pipeFDPair[1];
+        pipeReadFD = pipeFDPair[0];
+    }
+    if (!errorReported) {
+        reportError("TCPComm::TCPComm : fcntl(pipeReadFD, F_SETFL, O_NONBLOCK);",
+                    fcntl(pipeReadFD, F_SETFL, O_NONBLOCK));
+    }
+    /* create server socket where clients connect */
+    if (!errorReported) {
+        serverFD = reportError("TCPComm::TCPComm : socket(AF_INET, SOCK_STREAM, 0)",
+                               socket(AF_INET, SOCK_STREAM, 0));
+    }
     memset(&me, 0, sizeof me);
     me.sin_family = AF_INET;
     me.sin_port = htons(port);
-
+    
     opt = 1;
-    if (!errorReported)
-        reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))", setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)));
-    if (!errorReported)
-        reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))", setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf)));
-    if (!errorReported)
-        reportError("TCPComm::TCPComm : bind(serverFD, (struct sockaddr *)&me, sizeof me)", bind(serverFD, (struct sockaddr *)&me, sizeof me));
-    if (!errorReported)
-        reportError("TCPComm::TCPComm : listen(serverFD, 5)", listen(serverFD, 5));
+    if (!errorReported) {
+        reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))",
+                    setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)));
+    }
+    if (!errorReported) {
+        reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))",
+                    setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf)));
+    }
+    if (!errorReported) {
+        reportError("TCPComm::TCPComm : bind(serverFD, (struct sockaddr *)&me, sizeof me)",
+                    bind(serverFD, (struct sockaddr *)&me, sizeof me));
+    }
+    if (!errorReported) {
+        reportError("TCPComm::TCPComm : listen(serverFD, 5)",
+                    listen(serverFD, 5));
+    }
 
     // start thread for server socket (adding and removing clients)
     if (!errorReported)
     {
-        if (reportError("TCPComm::TCPComm : pthread_create( &serverThread, NULL, checkClientsThread, this)", pthread_create( &serverThread, NULL, checkClientsThread, this)) == 0) 
+        if (reportError("TCPComm::TCPComm : pthread_create( &serverThread, NULL, checkClientsThread, this)",
+                        pthread_create( &serverThread, NULL, checkClientsThread, this)) == 0) {
             serverThreadRunning = true;
+        }
         // start thread for reading from client connections
-        if (reportError("TCPComm::TCPComm : pthread_create( &readerThread, NULL, readClientsThread, this)", pthread_create( &readerThread, NULL, readClientsThread, this)) == 0)
+        if (reportError("TCPComm::TCPComm : pthread_create( &readerThread, NULL, readClientsThread, this)",
+                        pthread_create( &readerThread, NULL, readClientsThread, this)) == 0) {
             readerThreadRunning = true;
+        }
         // start thread for writing to client connections
-        if (reportError("TCPComm::TCPComm : pthread_create( &writerThread, NULL, writeClientsThread, this)", pthread_create( &writerThread, NULL, writeClientsThread, this)) == 0)
+        if (reportError("TCPComm::TCPComm : pthread_create( &writerThread, NULL, writeClientsThread, this)",
+                        pthread_create( &writerThread, NULL, writeClientsThread, this)) == 0) {
             writerThreadRunning = true;
+        }
     }
 }
 
@@ -246,6 +275,7 @@ void TCPComm::addClient(int clientFD)
         pthread_cond_broadcast( &clientInfo.wakeup );
     }
     pthread_mutex_unlock( &clientInfo.countlock );
+    stuffPipe();
     DEBUG("TCPComm::addClient : unlock")
 }
 
@@ -272,6 +302,7 @@ void TCPComm::removeClient(int clientFD)
         writeBuffer.clear();
     }
     pthread_mutex_unlock( &clientInfo.countlock );
+    stuffPipe();
     DEBUG("TCPComm::removeClient : unlock")
 }
 
@@ -332,12 +363,12 @@ void TCPComm::readClients()
         // copy set in to temp set
         clientFDs = clientInfo.FDs;
         // removes the cleanup handler and executes it (unlock mutex)
-        pthread_cleanup_pop(1); 
-
+        pthread_cleanup_pop(1);
         // check all fds (work with temp set)...
         fd_set rfds;
         FD_ZERO(&rfds);
-        int maxFD = -1;
+        int maxFD = pipeReadFD;
+        FD_SET(pipeReadFD, &rfds);
         set<int>::iterator it;
         for( it = clientFDs.begin(); it != clientFDs.end(); it++ )
         {
@@ -350,11 +381,14 @@ void TCPComm::readClients()
         if (select(maxFD + 1, &rfds, NULL, NULL, NULL) < 0 )
         {
             //             run = false;
-            reportError("TCPComm::readClients : select(maxFD+1, &rfds, NULL, NULL NULL)", -1);
+            reportError("TCPComm::readClients : select(maxFD+1, &rfds, NULL, NULL, NULL)", -1);
         }
         else
         {
-            for ( it = clientFDs.begin(); it != clientFDs.end(); it++)
+            if(FD_ISSET(pipeReadFD, &rfds)) {
+                clearPipe();
+            }
+            for (it = clientFDs.begin(); it != clientFDs.end(); it++)
             {
                 if (FD_ISSET(*it, &rfds))
                 {
@@ -548,3 +582,16 @@ void TCPComm::reportStatus(ostream& os)
     << " , packets read = " << readPacketCount
     << " , packets written = " << writtenPacketCount << endl;
 }
+
+void TCPComm::stuffPipe() 
+{
+    char info = 'n';
+    write(pipeWriteFD, &info, 1);
+}
+
+void TCPComm::clearPipe() {
+    char buf;
+    while(read(pipeReadFD, &buf, 1) > 0) {
+        ;
+    }
+}
index 624047b95e59a8951264e26e2c6eaaa2e834c8c3..35458587068dc25774975ac92c6de99890edeb08 100644 (file)
@@ -106,6 +106,10 @@ protected:
     /* file descriptor for server port on local machine */
     int serverFD;
 
+    /* pipe fd pair to inform client reader thread of new clients */
+    int pipeWriteFD;
+    int pipeReadFD;
+    
     /* reference to read packet buffer */
     PacketBuffer &readBuffer;    
 
@@ -163,6 +167,11 @@ protected:
     /* reports error to stderr */
     int reportError(const char *msg, int result);
 
+    /* write something into pipe to wake up client readerThread */
+    void stuffPipe();
+    
+    /* remove data written into pipe */
+    void clearPipe();
 
 public:
     /* create SF TCP server - init and start threads */