Skip to content

Commit

Permalink
Added retry logic if client request returns an error code that is con…
Browse files Browse the repository at this point in the history
…figured for retrying

Closes #4408

Signed-off-by: Allan Clements <[email protected]>
  • Loading branch information
criminosis committed Apr 22, 2024
1 parent 0d601cf commit 2a30ce3
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 2 deletions.
4 changes: 4 additions & 0 deletions docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ Elasticsearch index configuration
| index.[X].elasticsearch.enable_index_names_cache | Enables cache for generated index store names. It is recommended to always enable index store names cache unless you have more then 50000 indexes per index store. | Boolean | true | MASKABLE |
| index.[X].elasticsearch.health-request-timeout | When JanusGraph initializes its ES backend, JanusGraph waits up to this duration for the ES cluster health to reach at least yellow status. This string should be formatted as a natural number followed by the lowercase letter "s", e.g. 3s or 60s. | String | 30s | MASKABLE |
| index.[X].elasticsearch.interface | Interface for connecting to Elasticsearch. TRANSPORT_CLIENT and NODE were previously supported, but now are required to migrate to REST_CLIENT. See the JanusGraph upgrade instructions for more details. | String | REST_CLIENT | MASKABLE |
| index.[X].elasticsearch.retry-error-codes | Elasticsearch REST client ResponseException error codes To Retry. | String[] | | LOCAL |
| index.[X].elasticsearch.retry-initial-wait | Sets the initial retry wait time (in milliseconds) before exponential backoff. | Long | 1 | LOCAL |
| index.[X].elasticsearch.retry-limit | Sets the number of attempts for configured retryable error codes. | Integer | 0 | LOCAL |
| index.[X].elasticsearch.retry-max-wait | Sets the max retry wait time (in milliseconds). | Long | 1000 | LOCAL |
| index.[X].elasticsearch.retry_on_conflict | Specify how many times should the operation be retried when a conflict occurs. | Integer | 0 | MASKABLE |
| index.[X].elasticsearch.scroll-keep-alive | How long (in seconds) elasticsearch should keep alive the scroll context. | Integer | 60 | GLOBAL_OFFLINE |
| index.[X].elasticsearch.setup-max-open-scroll-contexts | Whether JanusGraph should setup max_open_scroll_context to maximum value for the cluster or not. | Boolean | true | MASKABLE |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,26 @@ public class ElasticSearchIndex implements IndexProvider {
"Sets the maximum socket timeout (in milliseconds).", ConfigOption.Type.MASKABLE,
Integer.class, RestClientBuilder.DEFAULT_SOCKET_TIMEOUT_MILLIS);

public static final ConfigOption<Integer> RETRY_LIMIT =
new ConfigOption<>(ELASTICSEARCH_NS, "retry-limit",
"Sets the number of attempts for configured retryable error codes.", ConfigOption.Type.LOCAL,
Integer.class, 0);

public static final ConfigOption<Long> RETRY_INITIAL_WAIT =
new ConfigOption<>(ELASTICSEARCH_NS, "retry-initial-wait",
"Sets the initial retry wait time (in milliseconds) before exponential backoff.",
ConfigOption.Type.LOCAL, Long.class, 1L);

public static final ConfigOption<Long> RETRY_MAX_WAIT =
new ConfigOption<>(ELASTICSEARCH_NS, "retry-max-wait",
"Sets the max retry wait time (in milliseconds).", ConfigOption.Type.LOCAL,
Long.class, 1000L);

public static final ConfigOption<String[]> RETRY_ERROR_CODES =
new ConfigOption<>(ELASTICSEARCH_NS, "retry-error-codes",
"Elasticsearch REST client ResponseException error codes To Retry.", ConfigOption.Type.LOCAL,
String[].class, new String[0]);

public static final int HOST_PORT_DEFAULT = 9200;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_HOSTS;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_PORT;
Expand Down Expand Up @@ -81,6 +84,12 @@ public ElasticSearchClient connect(Configuration config) throws IOException {
Integer retryOnConflict = config.has(ElasticSearchIndex.RETRY_ON_CONFLICT) ? config.get(ElasticSearchIndex.RETRY_ON_CONFLICT) : null;
client.setRetryOnConflict(retryOnConflict);

client.setRetryAttemptLimit(config.getOrDefault(ElasticSearchIndex.RETRY_LIMIT));
client.setRetryInitialWaitMs(config.getOrDefault(ElasticSearchIndex.RETRY_INITIAL_WAIT));
client.setRetryMaxWaitMs(config.getOrDefault(ElasticSearchIndex.RETRY_MAX_WAIT));
Set<Integer> errorCodesToRetry = Arrays.stream(config.getOrDefault(ElasticSearchIndex.RETRY_ERROR_CODES)).mapToInt(Integer::parseInt).boxed().collect(Collectors.toSet());
client.setRetryOnErrorCodes(errorCodesToRetry);

return client;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -110,7 +112,15 @@ public class RestElasticSearchClient implements ElasticSearchClient {
private Integer retryOnConflict;

private final String retryOnConflictKey;


private int retryAttemptLimit;

private final Set<Integer> retryOnErrorCodes = new HashSet<>();

private long retryInitialWaitMs;

private long retryMaxWaitMs;

public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean useMappingTypesForES7) {
this.delegate = delegate;
majorVersion = getMajorVersion();
Expand Down Expand Up @@ -419,6 +429,22 @@ public void setRetryOnConflict(Integer retryOnConflict) {
this.retryOnConflict = retryOnConflict;
}

public void setRetryAttemptLimit(int retryAttemptLimit) {
this.retryAttemptLimit = retryAttemptLimit;
}

public void setRetryInitialWaitMs(long retryInitialWaitMs) {
this.retryInitialWaitMs = retryInitialWaitMs;
}

public void setRetryMaxWaitMs(long retryMaxWaitMs) {
this.retryMaxWaitMs = retryMaxWaitMs;
}

public void setRetryOnErrorCodes(Set<Integer> retryOnErrorCodes) {
this.retryOnErrorCodes.addAll(retryOnErrorCodes);
}

@Override
public long countTotal(String indexName, Map<String, Object> requestData) throws IOException {

Expand Down Expand Up @@ -546,13 +572,42 @@ private Response performRequest(String method, String path, byte[] requestData)
return performRequest(new Request(method, path), requestData);
}

private Response performRequestWithRetry(Request request) throws IOException {
int retryCount = 0;
ResponseException lastException;
do {
if (retryCount > 0) {
//Wait before trying again
long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount - 1)), retryMaxWaitMs);
log.warn("Retrying Elasticsearch request in {} ms. Attempt {} of {}", waitDurationMs, retryCount, retryAttemptLimit);

Check warning on line 582 in janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java#L581-L582

Added lines #L581 - L582 were not covered by tests
try {
Thread.sleep(waitDurationMs);
} catch (InterruptedException e) {
throw new RuntimeException(String.format("Thread interrupted while waiting for retry attempt %d of %d", retryCount, retryAttemptLimit), e);

Check warning on line 586 in janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java#L586

Avoid throwing raw exception types.
}

Check warning on line 587 in janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java#L584-L587

Added lines #L584 - L587 were not covered by tests
}
try {
return delegate.performRequest(request);
} catch (ResponseException e) {
if (!retryOnErrorCodes.contains(e.getResponse().getStatusLine().getStatusCode())) {
throw e;
}
lastException = e;

Check warning on line 595 in janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java#L595

Added line #L595 was not covered by tests
}
retryCount++;

Check warning on line 597 in janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java#L597

Added line #L597 was not covered by tests
} while (retryCount <= retryAttemptLimit);

//If we've exceeded our retry limit then throw the last exception that was captured from retrying
throw lastException;

Check warning on line 601 in janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java#L601

Added line #L601 was not covered by tests
}

private Response performRequest(Request request, byte[] requestData) throws IOException {

final HttpEntity entity = requestData != null ? new ByteArrayEntity(requestData, ContentType.APPLICATION_JSON) : null;

request.setEntity(entity);

final Response response = delegate.performRequest(request);
final Response response = performRequestWithRetry(request);

if (response.getStatusLine().getStatusCode() >= 400) {
throw new IOException("Error executing request: " + response.getStatusLine().getReasonPhrase());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.net.ssl.SSLContext;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
Expand Down Expand Up @@ -92,6 +94,12 @@ public class RestClientSetupTest {

private static final Integer RETRY_ON_CONFLICT = ElasticSearchIndex.RETRY_ON_CONFLICT.getDefaultValue();

private static final Integer RETRY_LIMIT = ElasticSearchIndex.RETRY_LIMIT.getDefaultValue();

private static final Long RETRY_INITIAL_WAIT = ElasticSearchIndex.RETRY_INITIAL_WAIT.getDefaultValue();

private static final Long RETRY_MAX_WAIT = ElasticSearchIndex.RETRY_MAX_WAIT.getDefaultValue();

private static final AtomicInteger instanceCount = new AtomicInteger();

@Captor
Expand Down Expand Up @@ -207,6 +215,10 @@ public void testConnectBasicHttpConfigurationAllOptions() throws Exception {
put("index." + INDEX_NAME + ".elasticsearch.scroll-keep-alive", String.valueOf(ES_SCROLL_KA)).
put("index." + INDEX_NAME + ".elasticsearch.bulk-refresh", ES_BULK_REFRESH).
put("index." + INDEX_NAME + ".elasticsearch.retry_on_conflict", String.valueOf(RETRY_ON_CONFLICT)).
put("index." + INDEX_NAME + ".elasticsearch.retry-limit", String.valueOf(RETRY_LIMIT)).
put("index." + INDEX_NAME + ".elasticsearch.retry-initial-wait", String.valueOf(RETRY_INITIAL_WAIT)).
put("index." + INDEX_NAME + ".elasticsearch.retry-max-wait", String.valueOf(RETRY_MAX_WAIT)).
put("index." + INDEX_NAME + ".elasticsearch.retry-error-codes", "408,429").
build());

assertNotNull(hostsConfigured);
Expand All @@ -223,6 +235,10 @@ public void testConnectBasicHttpConfigurationAllOptions() throws Exception {

verify(restElasticSearchClientMock).setBulkRefresh(eq(ES_BULK_REFRESH));
verify(restElasticSearchClientMock).setRetryOnConflict(eq(RETRY_ON_CONFLICT));
verify(restElasticSearchClientMock).setRetryAttemptLimit(eq(RETRY_LIMIT));
verify(restElasticSearchClientMock).setRetryInitialWaitMs(eq(RETRY_INITIAL_WAIT));
verify(restElasticSearchClientMock).setRetryMaxWaitMs(eq(RETRY_MAX_WAIT));
verify(restElasticSearchClientMock).setRetryOnErrorCodes(eq(Stream.of(408, 429).collect(Collectors.toSet())));

}

Expand Down

0 comments on commit 2a30ce3

Please sign in to comment.