Skip to content
This repository has been archived by the owner on Jun 29, 2023. It is now read-only.

Commit

Permalink
Polishing #248
Browse files Browse the repository at this point in the history
Extract BackOff capping into BoundedBackOff. Add BackOff to TCP SSL sender. Add unit tests for BackOff, reformat code, Javadoc.

Original pull request: #249
  • Loading branch information
mp911de committed Jul 28, 2020
1 parent 9976077 commit d1b810a
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 36 deletions.
34 changes: 33 additions & 1 deletion src/main/java/biz/paluch/logging/gelf/intern/sender/BackOff.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,37 @@
package biz.paluch.logging.gelf.intern.sender;

public interface BackOff {
/**
* Provide a {@link BackOffExecution} that indicates the rate at which an operation should be retried.
*
* <p>
* Users of this interface are expected to use it like this:
*
* <pre class="code">
* BackOffExecution exec = backOff.start();
*
* // In the operation recovery/retry loop:
* long waitInterval = exec.nextBackOff();
* if (waitInterval == BackOffExecution.STOP) {
* // do not retry operation
* }
* else {
* // sleep, e.g. Thread.sleep(waitInterval)
* // retry operation
* }
* }
* </pre>
*
* Once the underlying operation has completed successfully, the execution instance can be simply discarded.
*
* @author netudima
* @see BackOffExecution
*/
interface BackOff {

/**
* Start a new back off execution.
*
* @return a fresh {@link BackOffExecution} ready to be used
*/
BackOffExecution start();
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
package biz.paluch.logging.gelf.intern.sender;

public interface BackOffExecution {
/**
* Represent a particular back-off execution.
*
* <p>
* Implementations do not need to be thread safe.
*
* @author netudima
* @see BackOff
*/
interface BackOffExecution {

/**
* Return value of {@link #nextBackOff()} that indicates that the operation should not be retried.
*/
long STOP = -1;

/**
* @return time in ms to wait before a next attempt
*/
int nextBackOff();
long nextBackOff();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package biz.paluch.logging.gelf.intern.sender;

import java.util.concurrent.TimeUnit;

/**
* Bounded {@link BackOff} implementation that return a {@link BackOffExecution#STOP} signal if the total awaited time is
* greater than the specified limit. The max time can exceeded in favor to backoff created by the delegate. Any subsequent
* backoff requests return {@link BackOffExecution#STOP}.
*
* @author Mark Paluch
*/
class BoundedBackOff implements BackOff {

private final BackOff delegate;

private final long capMs;

public BoundedBackOff(BackOff delegate, long timeout, TimeUnit timeUnit) {
this.delegate = delegate;
this.capMs = timeUnit.toMillis(timeout);
}

@Override
public BackOffExecution start() {
return new BoundedBackOffExecution(delegate.start(), capMs);
}

static class BoundedBackOffExecution implements BackOffExecution {

private final BackOffExecution delegate;

private final long capMs;

private long pastBackOff;

public BoundedBackOffExecution(BackOffExecution delegate, long capMs) {
this.delegate = delegate;
this.capMs = capMs;
}

@Override
public long nextBackOff() {

if (pastBackOff >= capMs) {
return STOP;
}

long backOff = delegate.nextBackOff();

pastBackOff += backOff;

return backOff;
}

}

}
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
package biz.paluch.logging.gelf.intern.sender;

public class ConstantBackOff implements BackOff {
private final int backoffTimeMs;
import java.util.concurrent.TimeUnit;

public ConstantBackOff(int backoffTimeMs) {
this.backoffTimeMs = backoffTimeMs;
/**
* Constant {@link BackOff} implementation.
*
* @author Mark Paluch
*/
class ConstantBackOff implements BackOff, BackOffExecution {

private final long backoffTimeMs;

public ConstantBackOff(long backoffTime, TimeUnit timeUnit) {
this.backoffTimeMs = timeUnit.toMillis(backoffTime);
}

@Override
public long nextBackOff() {
return backoffTimeMs;
}

@Override
public BackOffExecution start() {
return new BackOffExecution() {
@Override
public int nextBackOff() {
return backoffTimeMs;
}
};
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,16 @@ public GelfSender create(GelfSenderConfiguration configuration, String host, int
int deliveryAttempts = QueryStringParser.getInt(params, GelfTCPSender.RETRIES, 1);
boolean keepAlive = QueryStringParser.getString(params, GelfTCPSender.KEEPALIVE, false);

int writeBackoffTimeMs = (int) QueryStringParser.getTimeAsMs(params, GelfTCPSender.WRITE_BACKOFF_TIME, 50);
int writeBackoffThreshold = QueryStringParser.getInt(params, GelfTCPSender.WRITE_BACKOFF_THRESHOLD, 10);
int maxWriteBackoffTimeMs = (int) QueryStringParser.getTimeAsMs(params, GelfTCPSender.MAX_WRITE_BACKOFF_TIME, connectionTimeMs);

int writeBackoffTimeMs = (int) QueryStringParser.getTimeAsMs(params, GelfTCPSender.WRITE_BACKOFF_TIME, 50);
int writeBackoffThreshold = QueryStringParser.getInt(params, GelfTCPSender.WRITE_BACKOFF_THRESHOLD, 10);
int maxWriteBackoffTimeMs = (int) QueryStringParser.getTimeAsMs(params, GelfTCPSender.MAX_WRITE_BACKOFF_TIME, connectionTimeMs);

String tcpGraylogHost = QueryStringParser.getHost(uri);

return new GelfTCPSender(tcpGraylogHost, port, connectionTimeMs, readTimeMs, deliveryAttempts, keepAlive,
new ConstantBackOff(writeBackoffTimeMs), writeBackoffThreshold, maxWriteBackoffTimeMs,
new BoundedBackOff(new ConstantBackOff(writeBackoffTimeMs, TimeUnit.MILLISECONDS), maxWriteBackoffTimeMs,
TimeUnit.MILLISECONDS),
writeBackoffThreshold,
configuration.getErrorReporter());
}
};
Expand All @@ -73,10 +75,18 @@ public GelfSender create(GelfSenderConfiguration configuration, String host, int
int deliveryAttempts = QueryStringParser.getInt(params, GelfTCPSender.RETRIES, 1);
boolean keepAlive = QueryStringParser.getString(params, GelfTCPSender.KEEPALIVE, false);

int writeBackoffTimeMs = (int) QueryStringParser.getTimeAsMs(params, GelfTCPSender.WRITE_BACKOFF_TIME, 50);
int writeBackoffThreshold = QueryStringParser.getInt(params, GelfTCPSender.WRITE_BACKOFF_THRESHOLD, 10);
int maxWriteBackoffTimeMs = (int) QueryStringParser.getTimeAsMs(params, GelfTCPSender.MAX_WRITE_BACKOFF_TIME,
connectionTimeMs);

String tcpGraylogHost = QueryStringParser.getHost(uri);

try {
return new GelfTCPSSLSender(tcpGraylogHost, port, connectionTimeMs, readTimeMs, deliveryAttempts, keepAlive,
new BoundedBackOff(new ConstantBackOff(writeBackoffTimeMs, TimeUnit.MILLISECONDS),
maxWriteBackoffTimeMs, TimeUnit.MILLISECONDS),
writeBackoffThreshold,
configuration.getErrorReporter(), SSLContext.getDefault());
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,31 @@ public GelfTCPSSLSender(String host, int port, int connectTimeoutMs, int readTim
this.sslContext = sslContext;
}

/**
* @param host the host, must not be {@literal null}.
* @param port the port.
* @param connectTimeoutMs connection timeout, in {@link TimeUnit#MILLISECONDS}.
* @param readTimeoutMs read timeout, in {@link TimeUnit#MILLISECONDS}.
* @param deliveryAttempts number of delivery attempts.
* @param keepAlive {@literal true} to enable TCP keep-alive.
* @param backoff Backoff strategy to activate if a socket sender buffer is full and several attempts to write to the socket
* are unsuccessful due to it.
* @param writeBackoffThreshold attempts to write to a socket before a backoff will be activated.
* @param errorReporter the error reporter, must not be {@literal null}.
* @param sslContext the SSL context, must not be {@literal null}.
* @throws IOException in case of I/O errors
*/
public GelfTCPSSLSender(String host, int port, int connectTimeoutMs, int readTimeoutMs, int deliveryAttempts,
boolean keepAlive, BackOff backoff, int writeBackoffThreshold, ErrorReporter errorReporter, SSLContext sslContext)
throws IOException {

super(host, port, connectTimeoutMs, readTimeoutMs, deliveryAttempts, keepAlive, backoff, writeBackoffThreshold,
errorReporter);

this.connectTimeoutMs = connectTimeoutMs;
this.sslContext = sslContext;
}

@Override
protected boolean connect() throws IOException {
if (super.connect()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public class GelfTCPSender extends AbstractNioSender<SocketChannel> implements G
private final int deliveryAttempts;

private final int writeBackoffThreshold;
private final int maxWriteBackoffTimeMs;
private final BackOff backoff;

private final Object ioLock = new Object();
Expand Down Expand Up @@ -72,7 +71,8 @@ public GelfTCPSender(String host, int port, int connectTimeoutMs, int readTimeou
boolean keepAlive, ErrorReporter errorReporter) throws IOException {

this(host, port, connectTimeoutMs, readTimeoutMs, deliveryAttempts, keepAlive,
new ConstantBackOff(50), 10, connectTimeoutMs, errorReporter);
new BoundedBackOff(new ConstantBackOff(50, TimeUnit.MILLISECONDS), connectTimeoutMs, TimeUnit.MILLISECONDS), 10,
errorReporter);
}

/**
Expand All @@ -84,13 +84,12 @@ public GelfTCPSender(String host, int port, int connectTimeoutMs, int readTimeou
* @param keepAlive {@literal true} to enable TCP keep-alive.
* @param backoff Backoff strategy to activate if a socket sender buffer is full and several attempts to write to the socket are unsuccessful due to it.
* @param writeBackoffThreshold attempts to write to a socket before a backoff will be activated.
* @param maxWriteBackoffTimeMs Maximum time spent for awaiting during a backoff for a single message send operation.
* @param errorReporter the error reporter, must not be {@literal null}.
* @throws IOException in case of I/O errors
*/
public GelfTCPSender(String host, int port, int connectTimeoutMs, int readTimeoutMs, int deliveryAttempts,
boolean keepAlive,
BackOff backoff, int writeBackoffThreshold, int maxWriteBackoffTimeMs,
BackOff backoff, int writeBackoffThreshold,
ErrorReporter errorReporter) throws IOException {

super(errorReporter, host, port);
Expand All @@ -102,7 +101,6 @@ public GelfTCPSender(String host, int port, int connectTimeoutMs, int readTimeou

this.backoff = backoff;
this.writeBackoffThreshold = writeBackoffThreshold;
this.maxWriteBackoffTimeMs = maxWriteBackoffTimeMs;

this.setChannel(createSocketChannel(readTimeoutMs, keepAlive));
}
Expand Down Expand Up @@ -169,8 +167,8 @@ public boolean sendMessage(GelfMessage message) {
}

protected void write(ByteBuffer buffer) throws IOException, InterruptedException {

int nothingWrittenTimesInRow = 0;
int totalSleepTimeMs = 0;
BackOffExecution backoffExecution = null;

while (buffer.hasRemaining()) {
Expand All @@ -187,28 +185,19 @@ protected void write(ByteBuffer buffer) throws IOException, InterruptedException
}
nothingWrittenTimesInRow++;
if (nothingWrittenTimesInRow > writeBackoffThreshold) {
if (totalSleepTimeMs > maxWriteBackoffTimeMs) {
long toSleep = backoffExecution.nextBackOff();
if (toSleep == BackOffExecution.STOP) {
Closer.close(channel());
throw new SocketException("Cannot write buffer to channel, no progress in writing");
}
totalSleepTimeMs += sleep(backoffExecution.nextBackOff());
Thread.sleep(toSleep);
}
} else { // written > 0
nothingWrittenTimesInRow = 0;
}
}
}

private static long sleep(long millis) throws InterruptedException {
long startTime = System.nanoTime();
Thread.sleep(millis);
long sleepTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
if (sleepTimeMs < 0) {
sleepTimeMs = 0;
}
return sleepTimeMs;
}

protected boolean connect() throws IOException {

if (isConnected()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package biz.paluch.logging.gelf.intern.sender;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;

/**
* Unit tests for {@link BoundedBackOff}.
*
* @author Mark Paluch
*/
class BoundedBackOffUnitTests {

BoundedBackOff backOff = new BoundedBackOff(new ConstantBackOff(10, TimeUnit.SECONDS), 15, TimeUnit.SECONDS);

@Test
void shouldPassThruBackoff() {
assertThat(backOff.start().nextBackOff()).isEqualTo(TimeUnit.SECONDS.toMillis(10));
}

@Test
void shouldCapBackoff() {

BackOffExecution backOffExecution = backOff.start();

assertThat(backOffExecution.nextBackOff()).isEqualTo(TimeUnit.SECONDS.toMillis(10));
assertThat(backOffExecution.nextBackOff()).isEqualTo(TimeUnit.SECONDS.toMillis(10));
assertThat(backOffExecution.nextBackOff()).isEqualTo(BackOffExecution.STOP);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package biz.paluch.logging.gelf.intern.sender;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;

/**
* Unit tests for {@link ConstantBackOff}.
*
* @author Mark Paluch
*/
class ConstantBackOffUnitTests {

@Test
void shouldReturnConstantBackoff() {

ConstantBackOff backOff = new ConstantBackOff(10, TimeUnit.SECONDS);

assertThat(backOff.start().nextBackOff()).isEqualTo(TimeUnit.SECONDS.toMillis(10));
}

}

0 comments on commit d1b810a

Please sign in to comment.