Skip to content
This repository has been archived by the owner on Jun 29, 2023. It is now read-only.

Commit

Permalink
Add constant write backoff in GelfTCPSender #248
Browse files Browse the repository at this point in the history
for cases when a sender cannot write any bytes to the socket channel several times. Limit the time to stay in such mode.
stop attempts to send a message in case of InterruptedException.

Original pull request: #249
  • Loading branch information
netudima authored and mp911de committed Jul 28, 2020
1 parent 9f4968a commit 9976077
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package biz.paluch.logging.gelf.intern.sender;

public interface BackOff {
BackOffExecution start();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package biz.paluch.logging.gelf.intern.sender;

public interface BackOffExecution {
/**
* @return time in ms to wait before a next attempt
*/
int nextBackOff();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package biz.paluch.logging.gelf.intern.sender;

public class ConstantBackOff implements BackOff {
private final int backoffTimeMs;

public ConstantBackOff(int backoffTimeMs) {
this.backoffTimeMs = backoffTimeMs;
}

@Override
public BackOffExecution start() {
return new BackOffExecution() {
@Override
public int nextBackOff() {
return backoffTimeMs;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,14 @@ public GelfSender create(GelfSenderConfiguration configuration, String host, int
int deliveryAttempts = QueryStringParser.getInt(params, GelfTCPSender.RETRIES, 1);
boolean keepAlive = QueryStringParser.getString(params, GelfTCPSender.KEEPALIVE, false);

int writeBackoffTimeMs = (int) QueryStringParser.getTimeAsMs(params, GelfTCPSender.WRITE_BACKOFF_TIME, 50);
int writeBackoffThreshold = QueryStringParser.getInt(params, GelfTCPSender.WRITE_BACKOFF_THRESHOLD, 10);
int maxWriteBackoffTimeMs = (int) QueryStringParser.getTimeAsMs(params, GelfTCPSender.MAX_WRITE_BACKOFF_TIME, connectionTimeMs);

String tcpGraylogHost = QueryStringParser.getHost(uri);

return new GelfTCPSender(tcpGraylogHost, port, connectionTimeMs, readTimeMs, deliveryAttempts, keepAlive,
new ConstantBackOff(writeBackoffTimeMs), writeBackoffThreshold, maxWriteBackoffTimeMs,
configuration.getErrorReporter());
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@ public class GelfTCPSender extends AbstractNioSender<SocketChannel> implements G
public static final String READ_TIMEOUT = "readTimeout";
public static final String RETRIES = "deliveryAttempts";
public static final String KEEPALIVE = "keepAlive";
public static final String WRITE_BACKOFF_TIME = "writeBackoffTime";
public static final String WRITE_BACKOFF_THRESHOLD = "writeBackoffThreshold";
public static final String MAX_WRITE_BACKOFF_TIME = "maxWriteBackoffTime";

private final int readTimeoutMs;
private final int connectTimeoutMs;
private final boolean keepAlive;
private final int deliveryAttempts;

private final int writeBackoffThreshold;
private final int maxWriteBackoffTimeMs;
private final BackOff backoff;

private final Object ioLock = new Object();

private final ThreadLocal<ByteBuffer> writeBuffers = new ThreadLocal<ByteBuffer>() {
Expand Down Expand Up @@ -64,13 +71,39 @@ public GelfTCPSender(String host, int port, int connectTimeoutMs, int readTimeou
public GelfTCPSender(String host, int port, int connectTimeoutMs, int readTimeoutMs, int deliveryAttempts,
boolean keepAlive, ErrorReporter errorReporter) throws IOException {

this(host, port, connectTimeoutMs, readTimeoutMs, deliveryAttempts, keepAlive,
new ConstantBackOff(50), 10, connectTimeoutMs, errorReporter);
}

/**
* @param host the host, must not be {@literal null}.
* @param port the port.
* @param connectTimeoutMs connection timeout, in {@link TimeUnit#MILLISECONDS}.
* @param readTimeoutMs read timeout, in {@link TimeUnit#MILLISECONDS}.
* @param deliveryAttempts number of delivery attempts.
* @param keepAlive {@literal true} to enable TCP keep-alive.
* @param backoff Backoff strategy to activate if a socket sender buffer is full and several attempts to write to the socket are unsuccessful due to it.
* @param writeBackoffThreshold attempts to write to a socket before a backoff will be activated.
* @param maxWriteBackoffTimeMs Maximum time spent for awaiting during a backoff for a single message send operation.
* @param errorReporter the error reporter, must not be {@literal null}.
* @throws IOException in case of I/O errors
*/
public GelfTCPSender(String host, int port, int connectTimeoutMs, int readTimeoutMs, int deliveryAttempts,
boolean keepAlive,
BackOff backoff, int writeBackoffThreshold, int maxWriteBackoffTimeMs,
ErrorReporter errorReporter) throws IOException {

super(errorReporter, host, port);

this.connectTimeoutMs = connectTimeoutMs;
this.readTimeoutMs = readTimeoutMs;
this.keepAlive = keepAlive;
this.deliveryAttempts = deliveryAttempts < 1 ? Integer.MAX_VALUE : deliveryAttempts;

this.backoff = backoff;
this.writeBackoffThreshold = writeBackoffThreshold;
this.maxWriteBackoffTimeMs = maxWriteBackoffTimeMs;

this.setChannel(createSocketChannel(readTimeoutMs, keepAlive));
}

Expand Down Expand Up @@ -110,8 +143,14 @@ public boolean sendMessage(GelfMessage message) {
buffer = GelfBuffers.toTCPBuffer(message, writeBuffers);
}

synchronized (ioLock) {
write(buffer);
try {
synchronized (ioLock) {
write(buffer);
}
} catch (InterruptedException e) {
reportError(e.getMessage(), new IOException("Cannot send data to " + getHost() + ":" + getPort(), e));
Thread.currentThread().interrupt();
return false;
}

return true;
Expand All @@ -129,7 +168,10 @@ public boolean sendMessage(GelfMessage message) {
return false;
}

protected void write(ByteBuffer buffer) throws IOException {
protected void write(ByteBuffer buffer) throws IOException, InterruptedException {
int nothingWrittenTimesInRow = 0;
int totalSleepTimeMs = 0;
BackOffExecution backoffExecution = null;

while (buffer.hasRemaining()) {
int written = channel().write(buffer);
Expand All @@ -139,7 +181,32 @@ protected void write(ByteBuffer buffer) throws IOException {
Closer.close(channel());
throw new SocketException("Cannot write buffer to channel");
}
if (written == 0) {
if (backoffExecution == null) {
backoffExecution = backoff.start();
}
nothingWrittenTimesInRow++;
if (nothingWrittenTimesInRow > writeBackoffThreshold) {
if (totalSleepTimeMs > maxWriteBackoffTimeMs) {
Closer.close(channel());
throw new SocketException("Cannot write buffer to channel, no progress in writing");
}
totalSleepTimeMs += sleep(backoffExecution.nextBackOff());
}
} else { // written > 0
nothingWrittenTimesInRow = 0;
}
}
}

private static long sleep(long millis) throws InterruptedException {
long startTime = System.nanoTime();
Thread.sleep(millis);
long sleepTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
if (sleepTimeMs < 0) {
sleepTimeMs = 0;
}
return sleepTimeMs;
}

protected boolean connect() throws IOException {
Expand Down
5 changes: 4 additions & 1 deletion src/site/markdown/tcp.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ The TCP transport for logstash-gelf allows to configure TCP-specific options. Op
* `readTimeout` Socket Read-Timeout (SO_TIMEOUT). The unit can be specified as suffix (see below). A timeout of zero is interpreted as an infinite timeout. Defaults to `2s`.
* `connectionTimeout` Socket Connection-Timeout (SO_TIMEOUT). The unit can be specified as suffix (see below). A timeout of zero is interpreted as an infinite timeout. Defaults to `2s`.
* `deliveryAttempts` Number of Delivery-Attempts. Will retry to deliver the message and reconnect if necessary. A number of zero is interpreted as an infinite attempts. Defaults to `1`.
* `keepAlive` Enable TCP keepAlive. Defaults to `false`.
* `keepAlive` Enable TCP keepAlive. Defaults to `false`.
* `writeBackoffTime` Delay between subsequent attempts to write to a socket when a backoff is activated. Backoff is activated if a socket sender buffer is full and several attempts to write to the socket are unsuccessful due to it. The unit can be specified as suffix (see below). Defaults to `50ms`.
* `writeBackoffThreshold` Attempts to write to a socket before a backoff will be activated. Defaults to `10`.
* `maxWriteBackoffTime` Maximum time spent for awaiting during a backoff for a single message send operation. The unit can be specified as suffix (see below). Default equals to `connectionTimeout`.

## SSL

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -34,6 +36,7 @@ class GelfTCPSenderIntegrationTests {
private final Queue<Socket> sockets = new LinkedBlockingQueue<>();
private volatile ServerSocket serverSocket;
private volatile boolean loopActive = true;
private volatile boolean readFromServerSocket = true;

private Thread thread;

Expand Down Expand Up @@ -64,7 +67,9 @@ public void run() {
InputStream inputStream = socket.getInputStream();

while (!socket.isClosed()) {
IOUtils.copy(inputStream, out);
if (readFromServerSocket) {
IOUtils.copy(inputStream, out);
}
Thread.sleep(1);

if (latch.getCount() == 0) {
Expand Down Expand Up @@ -168,6 +173,31 @@ public void reportError(String message, Exception e) {
sender.close();
}

@Test
void sendToNonConsumingPort() throws Exception {

serverSocket.setReceiveBufferSize(100);
readFromServerSocket = false; // emulate read delays on a server side
thread.start();
final List<String> errors = new ArrayList<>();

SmallBufferTCPSender sender = new SmallBufferTCPSender("localhost", PORT, 1000, 1000, new ErrorReporter() {
@Override
public void reportError(String message, Exception e) {
errors.add(message);
}
});

GelfMessage gelfMessage = new GelfMessage("hello", StringUtils.repeat("hello", 100000), PORT, "7");

sender.sendMessage(gelfMessage);

assertThat(errors).hasSize(1);
assertThat(errors).containsOnly("Cannot write buffer to channel, no progress in writing");

sender.close();
}

static class SmallBufferTCPSender extends GelfTCPSender {

SmallBufferTCPSender(String host, int port, int connectTimeoutMs, int readTimeoutMs, ErrorReporter errorReporter)
Expand Down

0 comments on commit 9976077

Please sign in to comment.