Skip to content

Commit

Permalink
Make all sockets non-blocking and implement our own connection timeout (
Browse files Browse the repository at this point in the history
#163)

* Switch to all non-blocking sockets

* Add heartbeat to the core library

* Implement pseudo-blocking read/write with 5 second timeout
  • Loading branch information
MisterTea authored Jan 3, 2019
1 parent 201378b commit e370bd3
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 68 deletions.
1 change: 1 addition & 0 deletions src/base/BackedWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ BackedWriterWriteState BackedWriter::write(const string& buf) {
if (bytesWritten == count) {
return BackedWriterWriteState::SUCCESS;
}
usleep(1000);
} else {
// Error, we don't know how many bytes were written but it
// doesn't matter because the reader is going to have to
Expand Down
18 changes: 0 additions & 18 deletions src/base/PipeSocketHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,6 @@ int PipeSocketHandler::connect(const SocketEndpoint& endpoint) {
remote.sun_family = AF_UNIX;
strcpy(remote.sun_path, pipePath.c_str());

// Set nonblocking just for the connect phase
{
int opts;
opts = fcntl(sockFd, F_GETFL);
FATAL_FAIL(opts);
opts |= O_NONBLOCK;
FATAL_FAIL(fcntl(sockFd, F_SETFL, opts));
}

VLOG(3) << "Connecting to " << endpoint << " with fd " << sockFd;
int result =
::connect(sockFd, (struct sockaddr*)&remote, sizeof(sockaddr_un));
Expand Down Expand Up @@ -62,15 +53,6 @@ int PipeSocketHandler::connect(const SocketEndpoint& endpoint) {

if (so_error == 0) {
LOG(INFO) << "Connected to endpoint " << endpoint;
// Make sure that socket becomes blocking once it's attached to a
// server.
{
int opts;
opts = fcntl(sockFd, F_GETFL);
FATAL_FAIL(opts);
opts &= (~O_NONBLOCK);
FATAL_FAIL(fcntl(sockFd, F_SETFL, opts));
}
// Initialize the socket again once it's blocking to make sure timeouts
// are set
initSocket(sockFd);
Expand Down
17 changes: 5 additions & 12 deletions src/base/TcpSocketHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,6 @@ int TcpSocketHandler::connect(const SocketEndpoint &endpoint) {
continue;
}

// Set nonblocking just for the connect phase
{
int opts;
opts = fcntl(sockFd, F_GETFL);
FATAL_FAIL(opts);
opts |= O_NONBLOCK;
FATAL_FAIL(fcntl(sockFd, F_SETFL, opts));
}
VLOG(4) << "Set nonblocking";
if (::connect(sockFd, p->ai_addr, p->ai_addrlen) == -1 &&
errno != EINPROGRESS) {
if (p->ai_canonname) {
Expand Down Expand Up @@ -253,8 +244,10 @@ void TcpSocketHandler::stopListening(const SocketEndpoint &endpoint) {

void TcpSocketHandler::initSocket(int fd) {
UnixSocketHandler::initSocket(fd);
int flag = 1;
FATAL_FAIL_UNLESS_EINVAL(
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(int)));
{
int flag = 1;
FATAL_FAIL_UNLESS_EINVAL(
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(int)));
}
}
} // namespace et
86 changes: 49 additions & 37 deletions src/base/UnixSocketHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
namespace et {
UnixSocketHandler::UnixSocketHandler() {}

bool UnixSocketHandler::hasData(int fd) {
bool UnixSocketHandler::waitForData(int fd, int64_t sec, int64_t usec) {
fd_set input;
FD_ZERO(&input);
FD_SET(fd, &input);
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 0;
timeout.tv_sec = sec;
timeout.tv_usec = usec;
int n = select(fd + 1, &input, NULL, NULL, &timeout);
if (n == -1) {
// Select timed out or failed.
Expand All @@ -32,6 +32,8 @@ bool UnixSocketHandler::hasData(int fd) {
return true;
}

bool UnixSocketHandler::hasData(int fd) { return waitForData(fd, 0, 0); }

ssize_t UnixSocketHandler::read(int fd, void *buf, size_t count) {
if (fd <= 0) {
LOG(FATAL) << "Tried to read from an invalid socket: " << fd;
Expand All @@ -46,6 +48,7 @@ ssize_t UnixSocketHandler::read(int fd, void *buf, size_t count) {
return -1;
}
}
waitForData(fd, 5, 0);
lock_guard<recursive_mutex> guard(*(it->second));
VLOG(4) << "Unixsocket handler read from fd: " << fd;
ssize_t readBytes = ::read(fd, buf, count);
Expand All @@ -70,12 +73,33 @@ ssize_t UnixSocketHandler::write(int fd, const void *buf, size_t count) {
return -1;
}
}
lock_guard<recursive_mutex> guard(*(it->second));
// Try to write for around 5 seconds before giving up
time_t startTime = time(NULL);
int bytesWritten = 0;
while (bytesWritten < count) {
lock_guard<recursive_mutex> guard(*(it->second));
int w;
#ifdef MSG_NOSIGNAL
return ::send(fd, buf, count, MSG_NOSIGNAL);
w = ::send(fd, ((const char *)buf) + bytesWritten, count - bytesWritten,
MSG_NOSIGNAL);
#else
return ::write(fd, buf, count);
w = ::write(fd, ((const char *)buf) + bytesWritten, count - bytesWritten);
#endif
if (w < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
usleep(1000);
if (time(NULL) > startTime + 5) {
// Give up
return -1;
}
} else {
return -1;
}
} else {
bytesWritten += w;
}
}
return count;
}

void UnixSocketHandler::addToActiveSockets(int fd) {
Expand Down Expand Up @@ -106,14 +130,6 @@ int UnixSocketHandler::accept(int sockFd) {
VLOG(3) << "Socket " << sockFd
<< " accepted, returned client_sock: " << client_sock;
if (client_sock >= 0) {
// Make sure that socket becomes blocking once it's attached to a client.
{
int opts;
opts = fcntl(client_sock, F_GETFL, 0);
FATAL_FAIL(opts);
opts &= (~O_NONBLOCK);
FATAL_FAIL(fcntl(client_sock, F_SETFL, opts));
}
initSocket(client_sock);
addToActiveSockets(client_sock);
VLOG(3) << "Client_socket inserted to activeSockets";
Expand Down Expand Up @@ -152,32 +168,24 @@ vector<int> UnixSocketHandler::getActiveSockets() {
}

void UnixSocketHandler::initSocket(int fd) {
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
FATAL_FAIL_UNLESS_EINVAL(setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv,
sizeof(struct timeval)));
FATAL_FAIL_UNLESS_EINVAL(setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,
sizeof(struct timeval)));
// Set linger
struct linger so_linger;
so_linger.l_onoff = 1;
so_linger.l_linger = 5;
int z =
setsockopt(fd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof so_linger);
if (z) {
LOG(FATAL) << "set socket linger failed";
{
// Set linger
struct linger so_linger;
so_linger.l_onoff = 1;
so_linger.l_linger = 5;
int z = setsockopt(fd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof so_linger);
if (z) {
LOG(FATAL) << "set socket linger failed";
}
}
#ifndef MSG_NOSIGNAL
// If we don't have MSG_NOSIGNAL, use SO_NOSIGPIPE
int val = 1;
FATAL_FAIL_UNLESS_EINVAL(
setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&val, sizeof(val)));
{
// If we don't have MSG_NOSIGNAL, use SO_NOSIGPIPE
int val = 1;
FATAL_FAIL_UNLESS_EINVAL(
setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&val, sizeof(val)));
}
#endif
}

void UnixSocketHandler::initServerSocket(int fd) {
initSocket(fd);
// Also set the accept socket as non-blocking
{
int opts;
Expand All @@ -186,6 +194,10 @@ void UnixSocketHandler::initServerSocket(int fd) {
opts |= O_NONBLOCK;
FATAL_FAIL_UNLESS_EINVAL(fcntl(fd, F_SETFL, opts));
}
}

void UnixSocketHandler::initServerSocket(int fd) {
initSocket(fd);
// Also set the accept socket as reusable
{
int flag = 1;
Expand Down
1 change: 1 addition & 0 deletions src/base/UnixSocketHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class UnixSocketHandler : public SocketHandler {
UnixSocketHandler();
virtual ~UnixSocketHandler() {}

virtual bool waitForData(int fd, int64_t sec, int64_t usec);
virtual bool hasData(int fd);
virtual ssize_t read(int fd, void* buf, size_t count);
virtual ssize_t write(int fd, const void* buf, size_t count);
Expand Down
2 changes: 1 addition & 1 deletion test/BackedTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ TEST_F(ReliableBackedTest, ReadWrite) {
BackedWriterWriteState r =
serverCollector->write(string((&s[0] + a * 1024), 1024));
if (r != BackedWriterWriteState::SUCCESS) {
throw runtime_error("Oops");
LOG(FATAL) << "Invalid write state: " << int(r);
}
}

Expand Down

0 comments on commit e370bd3

Please sign in to comment.