From dd6ea733b095870a1d797dc1f6465a34c8227633 Mon Sep 17 00:00:00 2001 From: andreaskoepke Date: Fri, 21 Sep 2007 14:01:13 +0000 Subject: [PATCH] return from select when a client is added or removed --- support/sdk/cpp/sf/tcpcomm.cpp | 85 ++++++++++++++++++++++++++-------- support/sdk/cpp/sf/tcpcomm.h | 9 ++++ 2 files changed, 75 insertions(+), 19 deletions(-) diff --git a/support/sdk/cpp/sf/tcpcomm.cpp b/support/sdk/cpp/sf/tcpcomm.cpp index 7e064993..762487f6 100644 --- a/support/sdk/cpp/sf/tcpcomm.cpp +++ b/support/sdk/cpp/sf/tcpcomm.cpp @@ -74,33 +74,62 @@ TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffe struct sockaddr_in me; int opt; int rxBuf = 1024; - - serverFD = reportError("TCPComm::TCPComm : socket(AF_INET, SOCK_STREAM, 0)", socket(AF_INET, SOCK_STREAM, 0)); + + /* create pipe to inform client reader of new clients */ + if (!errorReported) { + int pipeFDPair[2]; + reportError("TCPComm::TCPComm : pipe(pipeFDPair)", pipe(pipeFDPair)); + pipeWriteFD = pipeFDPair[1]; + pipeReadFD = pipeFDPair[0]; + } + if (!errorReported) { + reportError("TCPComm::TCPComm : fcntl(pipeReadFD, F_SETFL, O_NONBLOCK);", + fcntl(pipeReadFD, F_SETFL, O_NONBLOCK)); + } + /* create server socket where clients connect */ + if (!errorReported) { + serverFD = reportError("TCPComm::TCPComm : socket(AF_INET, SOCK_STREAM, 0)", + socket(AF_INET, SOCK_STREAM, 0)); + } memset(&me, 0, sizeof me); me.sin_family = AF_INET; me.sin_port = htons(port); - + opt = 1; - if (!errorReported) - reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))", setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))); - if (!errorReported) - reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))", setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))); - if (!errorReported) - reportError("TCPComm::TCPComm : bind(serverFD, (struct sockaddr *)&me, sizeof me)", bind(serverFD, (struct sockaddr *)&me, sizeof me)); - if (!errorReported) - reportError("TCPComm::TCPComm : listen(serverFD, 5)", listen(serverFD, 5)); + if (!errorReported) { + reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))", + setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))); + } + if (!errorReported) { + reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))", + setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))); + } + if (!errorReported) { + reportError("TCPComm::TCPComm : bind(serverFD, (struct sockaddr *)&me, sizeof me)", + bind(serverFD, (struct sockaddr *)&me, sizeof me)); + } + if (!errorReported) { + reportError("TCPComm::TCPComm : listen(serverFD, 5)", + listen(serverFD, 5)); + } // start thread for server socket (adding and removing clients) if (!errorReported) { - if (reportError("TCPComm::TCPComm : pthread_create( &serverThread, NULL, checkClientsThread, this)", pthread_create( &serverThread, NULL, checkClientsThread, this)) == 0) + if (reportError("TCPComm::TCPComm : pthread_create( &serverThread, NULL, checkClientsThread, this)", + pthread_create( &serverThread, NULL, checkClientsThread, this)) == 0) { serverThreadRunning = true; + } // start thread for reading from client connections - if (reportError("TCPComm::TCPComm : pthread_create( &readerThread, NULL, readClientsThread, this)", pthread_create( &readerThread, NULL, readClientsThread, this)) == 0) + if (reportError("TCPComm::TCPComm : pthread_create( &readerThread, NULL, readClientsThread, this)", + pthread_create( &readerThread, NULL, readClientsThread, this)) == 0) { readerThreadRunning = true; + } // start thread for writing to client connections - if (reportError("TCPComm::TCPComm : pthread_create( &writerThread, NULL, writeClientsThread, this)", pthread_create( &writerThread, NULL, writeClientsThread, this)) == 0) + if (reportError("TCPComm::TCPComm : pthread_create( &writerThread, NULL, writeClientsThread, this)", + pthread_create( &writerThread, NULL, writeClientsThread, this)) == 0) { writerThreadRunning = true; + } } } @@ -246,6 +275,7 @@ void TCPComm::addClient(int clientFD) pthread_cond_broadcast( &clientInfo.wakeup ); } pthread_mutex_unlock( &clientInfo.countlock ); + stuffPipe(); DEBUG("TCPComm::addClient : unlock") } @@ -272,6 +302,7 @@ void TCPComm::removeClient(int clientFD) writeBuffer.clear(); } pthread_mutex_unlock( &clientInfo.countlock ); + stuffPipe(); DEBUG("TCPComm::removeClient : unlock") } @@ -332,12 +363,12 @@ void TCPComm::readClients() // copy set in to temp set clientFDs = clientInfo.FDs; // removes the cleanup handler and executes it (unlock mutex) - pthread_cleanup_pop(1); - + pthread_cleanup_pop(1); // check all fds (work with temp set)... fd_set rfds; FD_ZERO(&rfds); - int maxFD = -1; + int maxFD = pipeReadFD; + FD_SET(pipeReadFD, &rfds); set::iterator it; for( it = clientFDs.begin(); it != clientFDs.end(); it++ ) { @@ -350,11 +381,14 @@ void TCPComm::readClients() if (select(maxFD + 1, &rfds, NULL, NULL, NULL) < 0 ) { // run = false; - reportError("TCPComm::readClients : select(maxFD+1, &rfds, NULL, NULL NULL)", -1); + reportError("TCPComm::readClients : select(maxFD+1, &rfds, NULL, NULL, NULL)", -1); } else { - for ( it = clientFDs.begin(); it != clientFDs.end(); it++) + if(FD_ISSET(pipeReadFD, &rfds)) { + clearPipe(); + } + for (it = clientFDs.begin(); it != clientFDs.end(); it++) { if (FD_ISSET(*it, &rfds)) { @@ -548,3 +582,16 @@ void TCPComm::reportStatus(ostream& os) << " , packets read = " << readPacketCount << " , packets written = " << writtenPacketCount << endl; } + +void TCPComm::stuffPipe() +{ + char info = 'n'; + write(pipeWriteFD, &info, 1); +} + +void TCPComm::clearPipe() { + char buf; + while(read(pipeReadFD, &buf, 1) > 0) { + ; + } +} diff --git a/support/sdk/cpp/sf/tcpcomm.h b/support/sdk/cpp/sf/tcpcomm.h index 624047b9..35458587 100644 --- a/support/sdk/cpp/sf/tcpcomm.h +++ b/support/sdk/cpp/sf/tcpcomm.h @@ -106,6 +106,10 @@ protected: /* file descriptor for server port on local machine */ int serverFD; + /* pipe fd pair to inform client reader thread of new clients */ + int pipeWriteFD; + int pipeReadFD; + /* reference to read packet buffer */ PacketBuffer &readBuffer; @@ -163,6 +167,11 @@ protected: /* reports error to stderr */ int reportError(const char *msg, int result); + /* write something into pipe to wake up client readerThread */ + void stuffPipe(); + + /* remove data written into pipe */ + void clearPipe(); public: /* create SF TCP server - init and start threads */ -- 2.39.2