Skip to content
This repository has been archived by the owner on Jul 27, 2023. It is now read-only.

Commit

Permalink
Fix blocking query read timeout issue (#289)
Browse files Browse the repository at this point in the history
* Decrease OkHTTP Read Timeout to 2 second

Currently, the read timeout is the default value of okhttp i.e. 10 seconds.
That is a very long duration when we consider testing.
Decreasing it to 2 second should have limited to no impact.

* Make ConsulCache AutoCloseable

This is useful for test purposes.

* Add tests for Key/Value cache

The first test checks that the cache is notified at startup.
The second test checks cache notifications for blocking queries of 1 and 10 seconds.
Note that it is successful for 1 sec (<readTime) but failing for 10 sec.

* Add timeout interceptor

This "interceptor" is required to extract the wait query parameter from the url.
Then, it adjusts the OkHttp read timeout accordingly.
This feature requires OkHttp 3.9.0 which has been already added to retrofit2, but not yet release.

* Activate read timeout auto adjustment feature

* [pom] Set the version of OkHttp in properties
  • Loading branch information
yfouquet authored and rickfast committed Dec 27, 2017
1 parent 6cb1471 commit 2e2b7c6
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 9 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
<junitparams.version>1.0.5</junitparams.version>
<mockito.version>1.10.19</mockito.version>
<retrofit.version>2.3.0</retrofit.version>
<okhttp.version>3.9.0</okhttp.version>
<slf4j.version>1.7.21</slf4j.version>
<typesafe.version>1.3.2</typesafe.version>
</properties>
Expand Down Expand Up @@ -87,6 +88,11 @@
<artifactId>converter-jackson</artifactId>
<version>${retrofit.version}</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
9 changes: 4 additions & 5 deletions src/main/java/com/orbitz/consul/Consul.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@
import com.google.common.io.BaseEncoding;
import com.google.common.net.HostAndPort;
import com.orbitz.consul.util.Jackson;
import com.orbitz.consul.cache.TimeoutInterceptor;
import com.orbitz.consul.util.bookend.ConsulBookend;
import com.orbitz.consul.util.bookend.ConsulBookendInterceptor;
import okhttp3.Dispatcher;
import okhttp3.HttpUrl;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.*;
import okhttp3.internal.Util;
import retrofit2.Retrofit;
import retrofit2.converter.jackson.JacksonConverterFactory;
Expand Down Expand Up @@ -592,6 +589,8 @@ private Retrofit createRetrofit(String url, SSLContext sslContext, HostnameVerif
builder.writeTimeout(writeTimeoutMillis, TimeUnit.MILLISECONDS);
}

builder.addInterceptor(new TimeoutInterceptor());

Dispatcher dispatcher = new Dispatcher(executorService);
dispatcher.setMaxRequests(Integer.MAX_VALUE);
dispatcher.setMaxRequestsPerHost(Integer.MAX_VALUE);
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/com/orbitz/consul/cache/CacheConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ class CacheConfig {
static String CONFIG_CACHE_PATH = "com.orbitz.consul.cache";
@VisibleForTesting
static String BACKOFF_DELAY = "backOffDelay";
private static String TIMEOUT_AUTO_ENABLED = "timeout.autoAdjustment.enable";
private static String TIMEOUT_AUTO_MARGIN = "timeout.autoAdjustment.margin";

private static final Supplier<CacheConfig> INSTANCE = Suppliers.memoize(CacheConfig::new);

Expand Down Expand Up @@ -47,4 +49,29 @@ Duration getBackOffDelay() {
throw new RuntimeException(String.format("Error extracting config variable %s", BACKOFF_DELAY), ex);
}
}

/**
* Is the automatic adjustment of read timeout enabled?
* @throws RuntimeException if an error occurs while retrieving the configuration property.
*/
boolean isTimeoutAutoAdjustmentEnabled() {
try {
return config.getBoolean(TIMEOUT_AUTO_ENABLED);
} catch (Exception ex) {
throw new RuntimeException(String.format("Error extracting config variable %s", TIMEOUT_AUTO_ENABLED), ex);
}
}

/**
* Gets the margin of the read timeout for caches.
* The margin represents the additional amount of time given to the read timeout, in addition to the wait duration.
* @throws RuntimeException if an error occurs while retrieving the configuration property.
*/
Duration getTimeoutAutoAdjustmentMargin() {
try {
return config.getDuration(TIMEOUT_AUTO_MARGIN);
} catch (Exception ex) {
throw new RuntimeException(String.format("Error extracting config variable %s", TIMEOUT_AUTO_ENABLED), ex);
}
}
}
8 changes: 6 additions & 2 deletions src/main/java/com/orbitz/consul/cache/ConsulCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@
*
* @param <V>
*/
public class ConsulCache<K, V> {

public class ConsulCache<K, V> implements AutoCloseable {
enum State {latent, starting, started, stopped }

private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache.class);
Expand Down Expand Up @@ -139,6 +138,11 @@ public void stop() {
}
}

@Override
public void close() throws Exception {
stop();
}

private void runCallback() {
if (isRunning()) {
callBackConsumer.consume(latestIndex.get(), responseCallback);
Expand Down
74 changes: 74 additions & 0 deletions src/main/java/com/orbitz/consul/cache/TimeoutInterceptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.orbitz.consul.cache;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import okhttp3.Interceptor;
import okhttp3.Request;
import okhttp3.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

public class TimeoutInterceptor implements Interceptor {

private final static Logger LOGGER = LoggerFactory.getLogger(TimeoutInterceptor.class);

private CacheConfig config;

public TimeoutInterceptor() {
this(CacheConfig.get());
}

@VisibleForTesting
TimeoutInterceptor(CacheConfig config) {
this.config = config;
}

@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
int readTimeout = chain.readTimeoutMillis();

if (config.isTimeoutAutoAdjustmentEnabled()) {
String waitQuery = request.url().queryParameter("wait");
Duration waitDuration = parseWaitQuery(waitQuery);
if (waitDuration != null) {
int waitDurationMs = (int) waitDuration.toMillis();
int readTimeoutConfigMargin = (int) config.getTimeoutAutoAdjustmentMargin().toMillis();

// According to https://www.consul.io/api/index.html#blocking-queries
// A small random amount of additional wait time is added to the supplied maximum wait time by consul
// agent to spread out the wake up time of any concurrent requests.
// This adds up to (wait / 16) additional time to the maximum duration.
int readTimeoutRequiredMargin = (int) Math.ceil((double)(waitDurationMs) / 16);

readTimeout = waitDurationMs + readTimeoutRequiredMargin + readTimeoutConfigMargin;
}
}

return chain
.withReadTimeout(readTimeout, TimeUnit.MILLISECONDS)
.proceed(request);
}

private Duration parseWaitQuery(String query) {
if (Strings.isNullOrEmpty(query)) {
return null;
}

Duration wait = null;
try {
if (query.contains("m")) {
wait = Duration.ofMinutes(Integer.valueOf(query.replace("m","")));
} else if (query.contains("s")) {
wait = Duration.ofSeconds(Integer.valueOf(query.replace("s","")));
}
} catch (Exception e) {
LOGGER.warn(String.format("Error while extracting wait duration from query parameters: %s", query));
}
return wait;
}
}
4 changes: 4 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
com.orbitz.consul {
cache.backOffDelay: 10 seconds
cache.timeout.autoAdjustment {
enable: true
margin: 2 seconds
}
}
7 changes: 6 additions & 1 deletion src/test/java/com/orbitz/consul/BaseIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
import com.google.common.net.HostAndPort;
import org.junit.BeforeClass;

import java.time.Duration;

public abstract class BaseIntegrationTest {

protected static Consul client;

@BeforeClass
public static void beforeClass() {
client = Consul.builder().withHostAndPort(HostAndPort.fromParts("localhost", 8500)).build();
client = Consul.builder()
.withHostAndPort(HostAndPort.fromParts("localhost", 8500))
.withReadTimeoutMillis(Duration.ofSeconds(2).toMillis())
.build();
}
}
93 changes: 92 additions & 1 deletion src/test/java/com/orbitz/consul/cache/KVCacheTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.orbitz.consul.cache;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.orbitz.consul.BaseIntegrationTest;
import com.orbitz.consul.KeyValueClient;
import com.orbitz.consul.model.kv.ImmutableValue;
import com.orbitz.consul.model.kv.Value;
import junitparams.JUnitParamsRunner;
Expand All @@ -9,11 +12,99 @@
import org.junit.Test;
import org.junit.runner.RunWith;

import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@RunWith(JUnitParamsRunner.class)
public class KVCacheTest {
public class KVCacheTest extends BaseIntegrationTest {

@Test
public void ensureCacheInitialization() throws InterruptedException {
KeyValueClient keyValueClient = client.keyValueClient();
String key = UUID.randomUUID().toString();
String value = UUID.randomUUID().toString();
keyValueClient.putValue(key, value);

final CountDownLatch completed = new CountDownLatch(1);
final AtomicBoolean success = new AtomicBoolean(false);

try (KVCache cache = KVCache.newCache(keyValueClient, key, (int)Duration.ofSeconds(1).getSeconds())) {
cache.addListener(values -> {
success.set(isValueEqualsTo(values, value));
completed.countDown();
});

cache.start();
completed.await(2, TimeUnit.SECONDS);
} catch (Exception e) {
fail(e.getMessage());
} finally {
keyValueClient.deleteKey(key);
}

assertTrue(success.get());
}

@Test
@Parameters(method = "getBlockingQueriesDuration")
@TestCaseName("queries of {0} seconds")
public void checkUpdateNotifications(int queryDurationSec) throws InterruptedException {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("kvcache-itest-%d").build()
);

KeyValueClient keyValueClient = client.keyValueClient();
String key = UUID.randomUUID().toString();
String value = UUID.randomUUID().toString();
String newValue = UUID.randomUUID().toString();
keyValueClient.putValue(key, value);

final CountDownLatch completed = new CountDownLatch(2);
final AtomicBoolean success = new AtomicBoolean(false);

try (KVCache cache = KVCache.newCache(keyValueClient, key, queryDurationSec)) {
cache.addListener(values -> {
success.set(isValueEqualsTo(values, newValue));
completed.countDown();
});

cache.start();
executor.schedule(() -> keyValueClient.putValue(key, newValue), 3, TimeUnit.SECONDS);
completed.await(4, TimeUnit.SECONDS);
} catch (Exception e) {
fail(e.getMessage());
} finally {
keyValueClient.deleteKey(key);
executor.shutdownNow();
}

assertTrue(success.get());
}

public Object getBlockingQueriesDuration() {
return new Object[]{
new Object[]{1},
new Object[]{10}

};
}

private boolean isValueEqualsTo(Map<String, Value> values, String expectedValue) {
Value value = values.get("");
if (value == null) {
return false;
}
Optional<String> valueAsString = value.getValueAsString();
return valueAsString.isPresent() && expectedValue.equals(valueAsString.get());
}

@Test
@Parameters(method = "getKeyValueTestValues")
Expand Down
74 changes: 74 additions & 0 deletions src/test/java/com/orbitz/consul/cache/TimeoutInterceptorTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.orbitz.consul.cache;

import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import junitparams.naming.TestCaseName;
import okhttp3.Interceptor;
import okhttp3.Request;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.*;

@RunWith(JUnitParamsRunner.class)
public class TimeoutInterceptorTest {

@Test
@Parameters(method = "getInterceptParameters")
@TestCaseName("expected timeout of {4} ms for url {0} with timeout of {1} ms and margin of {3} ms (enabled: {2})")
public void checkIntercept(String url, int defaultTimeout, boolean enabled, int margin, int expectedTimeoutMs)
throws IOException {
CacheConfig config = createConfigMock(enabled, margin);
Interceptor.Chain chain = createChainMock(defaultTimeout, url);

TimeoutInterceptor interceptor = new TimeoutInterceptor(config);
interceptor.intercept(chain);
verify(chain).withReadTimeout(eq(expectedTimeoutMs), eq(TimeUnit.MILLISECONDS));
}

public Object getInterceptParameters() {
return new Object[]{
// Auto Adjustment disabled
new Object[]{"http://my_call", 1, false, 0, 1},
// Auto Adjustment disabled and valid "wait" query parameter
new Object[]{"http://my_call?wait=1s", 1, false, 0, 1},
// Auto Adjustment enabled but not "wait" query parameter
new Object[]{"http://my_call", 1, true, 0, 1},
new Object[]{"http://my_call", 1, true, 2, 1},
// Auto Adjustment enabled but invalid "wait" query parameter
new Object[]{"http://my_call?wait=1", 1, true, 0, 1},
new Object[]{"http://my_call?wait=3h", 1, true, 2, 1},
// Auto Adjustment enabled and valid "wait" query parameter
// Note: ceil(1/16*1000) = 63 and ceil(1/16*60000)=3750
new Object[]{"http://my_call?wait=1s", 1, true, 0, 1063},
new Object[]{"http://my_call?wait=1s", 0, true, 2, 1065},
new Object[]{"http://my_call?wait=1s", 1, true, 2, 1065},
new Object[]{"http://my_call?wait=1m", 1, true, 2, 63752},
};
}

private CacheConfig createConfigMock(boolean autoAdjustEnabled, int autoAdjustMargin) {
CacheConfig config = mock(CacheConfig.class);
when(config.isTimeoutAutoAdjustmentEnabled()).thenReturn(autoAdjustEnabled);
when(config.getTimeoutAutoAdjustmentMargin()).thenReturn(Duration.ofMillis(autoAdjustMargin));
return config;
}

private Interceptor.Chain createChainMock(int defaultTimeout, String url) throws IOException {
Request request = new Request.Builder().url(url).build();

Interceptor.Chain chain = mock(Interceptor.Chain.class);
when(chain.request()).thenReturn(request);
when(chain.readTimeoutMillis()).thenReturn(defaultTimeout);
doReturn(chain).when(chain).withReadTimeout(anyInt(), any(TimeUnit.class));
doReturn(null).when(chain).proceed(any(Request.class));

return chain;
}
}

0 comments on commit 2e2b7c6

Please sign in to comment.