X-Git-Url: https://oss.titaniummirror.com/gitweb/?a=blobdiff_plain;f=support%2Fsdk%2Fcpp%2Fsf%2Ftcpcomm.cpp;h=8e5efee52a1f6c2c08b60806ca3633bb33d56746;hb=f2ec0361a8796350bde71e0611e16be06ba22be0;hp=7e064993a4947eb9968747aa80981bc0dd241a5b;hpb=f2aa9fb41a3e147a3715c028eb091806083de5ac;p=tinyos-2.x.git diff --git a/support/sdk/cpp/sf/tcpcomm.cpp b/support/sdk/cpp/sf/tcpcomm.cpp index 7e064993..8e5efee5 100644 --- a/support/sdk/cpp/sf/tcpcomm.cpp +++ b/support/sdk/cpp/sf/tcpcomm.cpp @@ -57,7 +57,7 @@ void* writeClientsThread(void*); /* opens tcp server port for listening and start threads*/ TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), errorReported(false), errorMsg(""), control(pControl) -{ +{ // init values writerThreadRunning = false; readerThreadRunning = false; @@ -67,6 +67,7 @@ TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffe readPacketCount = 0; writtenPacketCount = 0; port = pPort; + pthread_mutex_init(&clientInfo.sleeplock, NULL); pthread_mutex_init(&clientInfo.countlock, NULL); pthread_cond_init(&clientInfo.wakeup, NULL); @@ -74,33 +75,62 @@ TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffe struct sockaddr_in me; int opt; int rxBuf = 1024; - - serverFD = reportError("TCPComm::TCPComm : socket(AF_INET, SOCK_STREAM, 0)", socket(AF_INET, SOCK_STREAM, 0)); + + /* create pipe to inform client reader of new clients */ + if (!errorReported) { + int pipeFDPair[2]; + reportError("TCPComm::TCPComm : pipe(pipeFDPair)", pipe(pipeFDPair)); + pipeWriteFD = pipeFDPair[1]; + pipeReadFD = pipeFDPair[0]; + } + if (!errorReported) { + reportError("TCPComm::TCPComm : fcntl(pipeReadFD, F_SETFL, O_NONBLOCK);", + fcntl(pipeReadFD, F_SETFL, O_NONBLOCK)); + } + /* create server socket where clients connect */ + if (!errorReported) { + serverFD = reportError("TCPComm::TCPComm : socket(AF_INET, SOCK_STREAM, 0)", + socket(AF_INET, SOCK_STREAM, 0)); + } memset(&me, 0, sizeof me); me.sin_family = AF_INET; me.sin_port = htons(port); - + opt = 1; - if (!errorReported) - reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))", setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))); - if (!errorReported) - reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))", setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))); - if (!errorReported) - reportError("TCPComm::TCPComm : bind(serverFD, (struct sockaddr *)&me, sizeof me)", bind(serverFD, (struct sockaddr *)&me, sizeof me)); - if (!errorReported) - reportError("TCPComm::TCPComm : listen(serverFD, 5)", listen(serverFD, 5)); + if (!errorReported) { + reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))", + setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))); + } + if (!errorReported) { + reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))", + setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))); + } + if (!errorReported) { + reportError("TCPComm::TCPComm : bind(serverFD, (struct sockaddr *)&me, sizeof me)", + bind(serverFD, (struct sockaddr *)&me, sizeof me)); + } + if (!errorReported) { + reportError("TCPComm::TCPComm : listen(serverFD, 5)", + listen(serverFD, 5)); + } // start thread for server socket (adding and removing clients) if (!errorReported) { - if (reportError("TCPComm::TCPComm : pthread_create( &serverThread, NULL, checkClientsThread, this)", pthread_create( &serverThread, NULL, checkClientsThread, this)) == 0) + if (reportError("TCPComm::TCPComm : pthread_create( &serverThread, NULL, checkClientsThread, this)", + pthread_create( &serverThread, NULL, checkClientsThread, this)) == 0) { serverThreadRunning = true; + } // start thread for reading from client connections - if (reportError("TCPComm::TCPComm : pthread_create( &readerThread, NULL, readClientsThread, this)", pthread_create( &readerThread, NULL, readClientsThread, this)) == 0) + if (reportError("TCPComm::TCPComm : pthread_create( &readerThread, NULL, readClientsThread, this)", + pthread_create( &readerThread, NULL, readClientsThread, this)) == 0) { readerThreadRunning = true; + } // start thread for writing to client connections - if (reportError("TCPComm::TCPComm : pthread_create( &writerThread, NULL, writeClientsThread, this)", pthread_create( &writerThread, NULL, writeClientsThread, this)) == 0) + if (reportError("TCPComm::TCPComm : pthread_create( &writerThread, NULL, writeClientsThread, this)", + pthread_create( &writerThread, NULL, writeClientsThread, this)) == 0) { writerThreadRunning = true; + } } } @@ -115,6 +145,8 @@ TCPComm::~TCPComm() { close(*it); } + close(pipeWriteFD); + close(pipeReadFD); pthread_mutex_destroy(&clientInfo.sleeplock); pthread_mutex_destroy(&clientInfo.countlock); pthread_cond_destroy(&clientInfo.wakeup); @@ -130,8 +162,9 @@ bool TCPComm::readPacket(int pFD, SFPacket &pPacket) { char l; char* buffer[SFPacket::getMaxPayloadLength()]; - - if (readFD(pFD, &l, 1) != 1) + int err; + + if (readFD(pFD, &l, 1, &err) != 1) { return false; } @@ -139,7 +172,7 @@ bool TCPComm::readPacket(int pFD, SFPacket &pPacket) { return false; } - if (readFD(pFD, (char*) buffer, static_cast(l)) != l) + if (readFD(pFD, (char*) buffer, static_cast(l), &err) != l) { return false; } @@ -153,7 +186,7 @@ bool TCPComm::readPacket(int pFD, SFPacket &pPacket) } } -int TCPComm::writeFD(int fd, const char *buffer, int count) +int TCPComm::writeFD(int fd, const char *buffer, int count, int *err) { int actual = 0; while (count > 0) @@ -163,8 +196,8 @@ int TCPComm::writeFD(int fd, const char *buffer, int count) #else int n = send(fd, buffer, count, MSG_NOSIGNAL); #endif - if (n == -1) - { + if (n == -1) { + *err = errno; return -1; } count -= n; @@ -178,11 +211,12 @@ int TCPComm::writeFD(int fd, const char *buffer, int count) bool TCPComm::writePacket(int pFD, SFPacket &pPacket) { char len = pPacket.getLength(); - if (writeFD(pFD, &len, 1) != 1) + int err; + if (writeFD(pFD, &len, 1, &err) != 1) { return false; } - if (writeFD(pFD, pPacket.getPayload(), len) != len) + if (writeFD(pFD, pPacket.getPayload(), len, &err) != len) { return false; } @@ -194,15 +228,16 @@ bool TCPComm::versionCheck(int clientFD) { char check[2], us[2]; int version; - + int err = 0; /* Indicate version and check if a TinyOS 2.0 serial forwarder on the other end */ us[0] = 'U'; us[1] = ' '; - if (writeFD(clientFD, us, 2) != 2) + + if (writeFD(clientFD, us, 2, &err) != 2) { return false; } - if (readFD(clientFD, check, 2) != 2) + if (readFD(clientFD, check, 2, &err) != 2) { return false; } @@ -246,6 +281,7 @@ void TCPComm::addClient(int clientFD) pthread_cond_broadcast( &clientInfo.wakeup ); } pthread_mutex_unlock( &clientInfo.countlock ); + stuffPipe(); DEBUG("TCPComm::addClient : unlock") } @@ -272,6 +308,7 @@ void TCPComm::removeClient(int clientFD) writeBuffer.clear(); } pthread_mutex_unlock( &clientInfo.countlock ); + stuffPipe(); DEBUG("TCPComm::removeClient : unlock") } @@ -332,12 +369,12 @@ void TCPComm::readClients() // copy set in to temp set clientFDs = clientInfo.FDs; // removes the cleanup handler and executes it (unlock mutex) - pthread_cleanup_pop(1); - + pthread_cleanup_pop(1); // check all fds (work with temp set)... fd_set rfds; FD_ZERO(&rfds); - int maxFD = -1; + int maxFD = pipeReadFD; + FD_SET(pipeReadFD, &rfds); set::iterator it; for( it = clientFDs.begin(); it != clientFDs.end(); it++ ) { @@ -350,23 +387,24 @@ void TCPComm::readClients() if (select(maxFD + 1, &rfds, NULL, NULL, NULL) < 0 ) { // run = false; - reportError("TCPComm::readClients : select(maxFD+1, &rfds, NULL, NULL NULL)", -1); + reportError("TCPComm::readClients : select(maxFD+1, &rfds, NULL, NULL, NULL)", -1); } else { - for ( it = clientFDs.begin(); it != clientFDs.end(); it++) + if(FD_ISSET(pipeReadFD, &rfds)) { + clearPipe(); + } + for (it = clientFDs.begin(); it != clientFDs.end(); it++) { if (FD_ISSET(*it, &rfds)) { SFPacket packet; - if (readPacket(*it, packet)) - { + if(readPacket(*it, packet)) { // this call blocks until buffer is not full readBuffer.enqueueBack(packet); ++readPacketCount; } - else - { + else { DEBUG("TCPComm::readClients : removeClient") removeClient(*it); } @@ -548,3 +586,16 @@ void TCPComm::reportStatus(ostream& os) << " , packets read = " << readPacketCount << " , packets written = " << writtenPacketCount << endl; } + +void TCPComm::stuffPipe() +{ + char info = 'n'; + write(pipeWriteFD, &info, 1); +} + +void TCPComm::clearPipe() { + char buf; + while(read(pipeReadFD, &buf, 1) > 0) { + ; + } +}