/* opens tcp server port for listening and start threads*/
TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), errorReported(false), errorMsg(""), control(pControl)
-{
+{
// init values
writerThreadRunning = false;
readerThreadRunning = false;
readPacketCount = 0;
writtenPacketCount = 0;
port = pPort;
+
pthread_mutex_init(&clientInfo.sleeplock, NULL);
pthread_mutex_init(&clientInfo.countlock, NULL);
pthread_cond_init(&clientInfo.wakeup, NULL);
struct sockaddr_in me;
int opt;
int rxBuf = 1024;
-
- serverFD = reportError("TCPComm::TCPComm : socket(AF_INET, SOCK_STREAM, 0)", socket(AF_INET, SOCK_STREAM, 0));
+
+ /* create pipe to inform client reader of new clients */
+ if (!errorReported) {
+ int pipeFDPair[2];
+ reportError("TCPComm::TCPComm : pipe(pipeFDPair)", pipe(pipeFDPair));
+ pipeWriteFD = pipeFDPair[1];
+ pipeReadFD = pipeFDPair[0];
+ }
+ if (!errorReported) {
+ reportError("TCPComm::TCPComm : fcntl(pipeReadFD, F_SETFL, O_NONBLOCK);",
+ fcntl(pipeReadFD, F_SETFL, O_NONBLOCK));
+ }
+ /* create server socket where clients connect */
+ if (!errorReported) {
+ serverFD = reportError("TCPComm::TCPComm : socket(AF_INET, SOCK_STREAM, 0)",
+ socket(AF_INET, SOCK_STREAM, 0));
+ }
memset(&me, 0, sizeof me);
me.sin_family = AF_INET;
me.sin_port = htons(port);
-
+
opt = 1;
- if (!errorReported)
- reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))", setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)));
- if (!errorReported)
- reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))", setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf)));
- if (!errorReported)
- reportError("TCPComm::TCPComm : bind(serverFD, (struct sockaddr *)&me, sizeof me)", bind(serverFD, (struct sockaddr *)&me, sizeof me));
- if (!errorReported)
- reportError("TCPComm::TCPComm : listen(serverFD, 5)", listen(serverFD, 5));
+ if (!errorReported) {
+ reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))",
+ setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)));
+ }
+ if (!errorReported) {
+ reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))",
+ setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf)));
+ }
+ if (!errorReported) {
+ reportError("TCPComm::TCPComm : bind(serverFD, (struct sockaddr *)&me, sizeof me)",
+ bind(serverFD, (struct sockaddr *)&me, sizeof me));
+ }
+ if (!errorReported) {
+ reportError("TCPComm::TCPComm : listen(serverFD, 5)",
+ listen(serverFD, 5));
+ }
// start thread for server socket (adding and removing clients)
if (!errorReported)
{
- if (reportError("TCPComm::TCPComm : pthread_create( &serverThread, NULL, checkClientsThread, this)", pthread_create( &serverThread, NULL, checkClientsThread, this)) == 0)
+ if (reportError("TCPComm::TCPComm : pthread_create( &serverThread, NULL, checkClientsThread, this)",
+ pthread_create( &serverThread, NULL, checkClientsThread, this)) == 0) {
serverThreadRunning = true;
+ }
// start thread for reading from client connections
- if (reportError("TCPComm::TCPComm : pthread_create( &readerThread, NULL, readClientsThread, this)", pthread_create( &readerThread, NULL, readClientsThread, this)) == 0)
+ if (reportError("TCPComm::TCPComm : pthread_create( &readerThread, NULL, readClientsThread, this)",
+ pthread_create( &readerThread, NULL, readClientsThread, this)) == 0) {
readerThreadRunning = true;
+ }
// start thread for writing to client connections
- if (reportError("TCPComm::TCPComm : pthread_create( &writerThread, NULL, writeClientsThread, this)", pthread_create( &writerThread, NULL, writeClientsThread, this)) == 0)
+ if (reportError("TCPComm::TCPComm : pthread_create( &writerThread, NULL, writeClientsThread, this)",
+ pthread_create( &writerThread, NULL, writeClientsThread, this)) == 0) {
writerThreadRunning = true;
+ }
}
}
{
close(*it);
}
+ close(pipeWriteFD);
+ close(pipeReadFD);
pthread_mutex_destroy(&clientInfo.sleeplock);
pthread_mutex_destroy(&clientInfo.countlock);
pthread_cond_destroy(&clientInfo.wakeup);
{
char l;
char* buffer[SFPacket::getMaxPayloadLength()];
-
- if (readFD(pFD, &l, 1) != 1)
+ int err;
+
+ if (readFD(pFD, &l, 1, &err) != 1)
{
return false;
}
{
return false;
}
- if (readFD(pFD, (char*) buffer, static_cast<int>(l)) != l)
+ if (readFD(pFD, (char*) buffer, static_cast<int>(l), &err) != l)
{
return false;
}
}
}
-int TCPComm::writeFD(int fd, const char *buffer, int count)
+int TCPComm::writeFD(int fd, const char *buffer, int count, int *err)
{
int actual = 0;
while (count > 0)
{
+#ifdef __APPLE__
+ int n = send(fd, buffer, count, 0);
+#else
int n = send(fd, buffer, count, MSG_NOSIGNAL);
- if (n == -1)
- {
+#endif
+ if (n == -1) {
+ *err = errno;
return -1;
}
count -= n;
bool TCPComm::writePacket(int pFD, SFPacket &pPacket)
{
char len = pPacket.getLength();
- if (writeFD(pFD, &len, 1) != 1)
+ int err;
+ if (writeFD(pFD, &len, 1, &err) != 1)
{
return false;
}
- if (writeFD(pFD, pPacket.getPayload(), len) != len)
+ if (writeFD(pFD, pPacket.getPayload(), len, &err) != len)
{
return false;
}
{
char check[2], us[2];
int version;
-
+ int err = 0;
/* Indicate version and check if a TinyOS 2.0 serial forwarder on the other end */
us[0] = 'U';
us[1] = ' ';
- if (writeFD(clientFD, us, 2) != 2)
+
+ if (writeFD(clientFD, us, 2, &err) != 2)
{
return false;
}
- if (readFD(clientFD, check, 2) != 2)
+ if (readFD(clientFD, check, 2, &err) != 2)
{
return false;
}
pthread_cond_broadcast( &clientInfo.wakeup );
}
pthread_mutex_unlock( &clientInfo.countlock );
+ stuffPipe();
DEBUG("TCPComm::addClient : unlock")
}
writeBuffer.clear();
}
pthread_mutex_unlock( &clientInfo.countlock );
+ stuffPipe();
DEBUG("TCPComm::removeClient : unlock")
}
// copy set in to temp set
clientFDs = clientInfo.FDs;
// removes the cleanup handler and executes it (unlock mutex)
- pthread_cleanup_pop(1);
-
+ pthread_cleanup_pop(1);
// check all fds (work with temp set)...
fd_set rfds;
FD_ZERO(&rfds);
- int maxFD = -1;
+ int maxFD = pipeReadFD;
+ FD_SET(pipeReadFD, &rfds);
set<int>::iterator it;
for( it = clientFDs.begin(); it != clientFDs.end(); it++ )
{
if (select(maxFD + 1, &rfds, NULL, NULL, NULL) < 0 )
{
// run = false;
- reportError("TCPComm::readClients : select(maxFD+1, &rfds, NULL, NULL NULL)", -1);
+ reportError("TCPComm::readClients : select(maxFD+1, &rfds, NULL, NULL, NULL)", -1);
}
else
{
- for ( it = clientFDs.begin(); it != clientFDs.end(); it++)
+ if(FD_ISSET(pipeReadFD, &rfds)) {
+ clearPipe();
+ }
+ for (it = clientFDs.begin(); it != clientFDs.end(); it++)
{
if (FD_ISSET(*it, &rfds))
{
SFPacket packet;
- if (readPacket(*it, packet))
- {
+ if(readPacket(*it, packet)) {
// this call blocks until buffer is not full
readBuffer.enqueueBack(packet);
++readPacketCount;
}
- else
- {
+ else {
DEBUG("TCPComm::readClients : removeClient")
removeClient(*it);
}
<< " , packets read = " << readPacketCount
<< " , packets written = " << writtenPacketCount << endl;
}
+
+void TCPComm::stuffPipe()
+{
+ char info = 'n';
+ write(pipeWriteFD, &info, 1);
+}
+
+void TCPComm::clearPipe() {
+ char buf;
+ while(read(pipeReadFD, &buf, 1) > 0) {
+ ;
+ }
+}