Skip to content

Commit

Permalink
Added retry logic if client request returns an error code that is con…
Browse files Browse the repository at this point in the history
…figured for retrying

Closes #4408

Signed-off-by: Allan Clements <[email protected]>
  • Loading branch information
criminosis committed Apr 24, 2024
1 parent 0d601cf commit 1bccf28
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 2 deletions.
4 changes: 4 additions & 0 deletions docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ Elasticsearch index configuration
| index.[X].elasticsearch.enable_index_names_cache | Enables cache for generated index store names. It is recommended to always enable index store names cache unless you have more then 50000 indexes per index store. | Boolean | true | MASKABLE |
| index.[X].elasticsearch.health-request-timeout | When JanusGraph initializes its ES backend, JanusGraph waits up to this duration for the ES cluster health to reach at least yellow status. This string should be formatted as a natural number followed by the lowercase letter "s", e.g. 3s or 60s. | String | 30s | MASKABLE |
| index.[X].elasticsearch.interface | Interface for connecting to Elasticsearch. TRANSPORT_CLIENT and NODE were previously supported, but now are required to migrate to REST_CLIENT. See the JanusGraph upgrade instructions for more details. | String | REST_CLIENT | MASKABLE |
| index.[X].elasticsearch.retry-error-codes | Elasticsearch REST client ResponseException error codes To Retry. | String[] | | LOCAL |
| index.[X].elasticsearch.retry-initial-wait | Sets the initial retry wait time (in milliseconds) before exponential backoff. | Long | 1 | LOCAL |
| index.[X].elasticsearch.retry-limit | Sets the number of attempts for configured retryable error codes. | Integer | 0 | LOCAL |
| index.[X].elasticsearch.retry-max-wait | Sets the max retry wait time (in milliseconds). | Long | 1000 | LOCAL |
| index.[X].elasticsearch.retry_on_conflict | Specify how many times should the operation be retried when a conflict occurs. | Integer | 0 | MASKABLE |
| index.[X].elasticsearch.scroll-keep-alive | How long (in seconds) elasticsearch should keep alive the scroll context. | Integer | 60 | GLOBAL_OFFLINE |
| index.[X].elasticsearch.setup-max-open-scroll-contexts | Whether JanusGraph should setup max_open_scroll_context to maximum value for the cluster or not. | Boolean | true | MASKABLE |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,26 @@ public class ElasticSearchIndex implements IndexProvider {
"Sets the maximum socket timeout (in milliseconds).", ConfigOption.Type.MASKABLE,
Integer.class, RestClientBuilder.DEFAULT_SOCKET_TIMEOUT_MILLIS);

public static final ConfigOption<Integer> RETRY_LIMIT =
new ConfigOption<>(ELASTICSEARCH_NS, "retry-limit",
"Sets the number of attempts for configured retryable error codes.", ConfigOption.Type.LOCAL,
Integer.class, 0);

public static final ConfigOption<Long> RETRY_INITIAL_WAIT =
new ConfigOption<>(ELASTICSEARCH_NS, "retry-initial-wait",
"Sets the initial retry wait time (in milliseconds) before exponential backoff.",
ConfigOption.Type.LOCAL, Long.class, 1L);

public static final ConfigOption<Long> RETRY_MAX_WAIT =
new ConfigOption<>(ELASTICSEARCH_NS, "retry-max-wait",
"Sets the max retry wait time (in milliseconds).", ConfigOption.Type.LOCAL,
Long.class, 1000L);

public static final ConfigOption<String[]> RETRY_ERROR_CODES =
new ConfigOption<>(ELASTICSEARCH_NS, "retry-error-codes",
"Elasticsearch REST client ResponseException error codes To Retry.", ConfigOption.Type.LOCAL,
String[].class, new String[0]);

public static final int HOST_PORT_DEFAULT = 9200;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_HOSTS;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_PORT;
Expand Down Expand Up @@ -81,6 +84,12 @@ public ElasticSearchClient connect(Configuration config) throws IOException {
Integer retryOnConflict = config.has(ElasticSearchIndex.RETRY_ON_CONFLICT) ? config.get(ElasticSearchIndex.RETRY_ON_CONFLICT) : null;
client.setRetryOnConflict(retryOnConflict);

client.setRetryAttemptLimit(config.getOrDefault(ElasticSearchIndex.RETRY_LIMIT));
client.setRetryInitialWaitMs(config.getOrDefault(ElasticSearchIndex.RETRY_INITIAL_WAIT));
client.setRetryMaxWaitMs(config.getOrDefault(ElasticSearchIndex.RETRY_MAX_WAIT));
Set<Integer> errorCodesToRetry = Arrays.stream(config.getOrDefault(ElasticSearchIndex.RETRY_ERROR_CODES)).mapToInt(Integer::parseInt).boxed().collect(Collectors.toSet());
client.setRetryOnErrorCodes(errorCodesToRetry);

return client;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -110,7 +112,15 @@ public class RestElasticSearchClient implements ElasticSearchClient {
private Integer retryOnConflict;

private final String retryOnConflictKey;


private int retryAttemptLimit;

private final Set<Integer> retryOnErrorCodes = new HashSet<>();

private long retryInitialWaitMs;

private long retryMaxWaitMs;

public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean useMappingTypesForES7) {
this.delegate = delegate;
majorVersion = getMajorVersion();
Expand Down Expand Up @@ -419,6 +429,22 @@ public void setRetryOnConflict(Integer retryOnConflict) {
this.retryOnConflict = retryOnConflict;
}

public void setRetryAttemptLimit(int retryAttemptLimit) {
this.retryAttemptLimit = retryAttemptLimit;
}

public void setRetryInitialWaitMs(long retryInitialWaitMs) {
this.retryInitialWaitMs = retryInitialWaitMs;
}

public void setRetryMaxWaitMs(long retryMaxWaitMs) {
this.retryMaxWaitMs = retryMaxWaitMs;
}

public void setRetryOnErrorCodes(Set<Integer> retryOnErrorCodes) {
this.retryOnErrorCodes.addAll(retryOnErrorCodes);
}

@Override
public long countTotal(String indexName, Map<String, Object> requestData) throws IOException {

Expand Down Expand Up @@ -546,13 +572,42 @@ private Response performRequest(String method, String path, byte[] requestData)
return performRequest(new Request(method, path), requestData);
}

private Response performRequestWithRetry(Request request) throws IOException {
int retryCount = 0;
ResponseException lastException;
do {
if (retryCount > 0) {
//Wait before trying again
long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount - 1)), retryMaxWaitMs);
log.warn("Retrying Elasticsearch request in {} ms. Attempt {} of {}", waitDurationMs, retryCount, retryAttemptLimit);
try {
Thread.sleep(waitDurationMs);
} catch (InterruptedException e) {
throw new RuntimeException(String.format("Thread interrupted while waiting for retry attempt %d of %d", retryCount, retryAttemptLimit), e);

Check warning on line 586 in janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java#L586

Avoid throwing raw exception types.

Check warning on line 586 in janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java#L585-L586

Added lines #L585 - L586 were not covered by tests
}
}
try {
return delegate.performRequest(request);
} catch (ResponseException e) {
if (!retryOnErrorCodes.contains(e.getResponse().getStatusLine().getStatusCode())) {
throw e;
}
lastException = e;
}
retryCount++;
} while (retryCount <= retryAttemptLimit);

//If we've exceeded our retry limit then throw the last exception that was captured from retrying
throw lastException;

Check warning on line 601 in janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java#L601

Added line #L601 was not covered by tests
}

private Response performRequest(Request request, byte[] requestData) throws IOException {

final HttpEntity entity = requestData != null ? new ByteArrayEntity(requestData, ContentType.APPLICATION_JSON) : null;

request.setEntity(entity);

final Response response = delegate.performRequest(request);
final Response response = performRequestWithRetry(request);

if (response.getStatusLine().getStatusCode() >= 400) {
throw new IOException("Error executing request: " + response.getStatusLine().getReasonPhrase());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2024 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.diskstorage.es.rest;

import com.google.common.collect.Sets;
import org.apache.http.StatusLine;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

import java.io.IOException;
import java.util.Collections;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
public class RestClientRetryTest {
@Mock
private RestClient restClientMock;

@Mock
private ResponseException responseException;

@Mock
private Response response;

@Mock
private StatusLine statusLine;

@Captor
private ArgumentCaptor<Request> requestCaptor;

@Test
public void testRetryOnExpectedErrorStatus() throws IOException {
//Just throw an exception when there's an attempt to look up the ES version during instantiation
when(restClientMock.performRequest(any())).thenThrow(new IOException());
RestElasticSearchClient restClient = new RestElasticSearchClient(restClientMock, 0, false);

//There's an initial query to get the ES version we need to accommodate, and then reset for the actual test
Mockito.reset(restClientMock);

Integer retryCode = 429;
int expectedNumberOfRequestAttempts = 2;
doReturn(retryCode).when(statusLine).getStatusCode();
doReturn(statusLine).when(response).getStatusLine();
doReturn(response).when(responseException).getResponse();
//Just throw an expected exception the second time to confirm the retry occurred
//rather than mock out a parsable response
IOException expectedFinalException = new IOException("Expected");
when(restClientMock.performRequest(any()))
.thenThrow(responseException)
.thenThrow(expectedFinalException);


restClient.setRetryAttemptLimit(expectedNumberOfRequestAttempts - 1);
restClient.setRetryOnErrorCodes(Sets.newHashSet(retryCode));
restClient.setRetryMaxWaitMs(0); //don't actually slow the test down
try {
restClient.bulkRequest(Collections.emptyList(), null);
Assertions.fail("Should have thrown the expected exception after retry");
} catch (Exception e) {
Assertions.assertSame(e, expectedFinalException);
}
verify(restClientMock, times(expectedNumberOfRequestAttempts)).performRequest(requestCaptor.capture());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.net.ssl.SSLContext;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
Expand Down Expand Up @@ -92,6 +94,12 @@ public class RestClientSetupTest {

private static final Integer RETRY_ON_CONFLICT = ElasticSearchIndex.RETRY_ON_CONFLICT.getDefaultValue();

private static final Integer RETRY_LIMIT = ElasticSearchIndex.RETRY_LIMIT.getDefaultValue();

private static final Long RETRY_INITIAL_WAIT = ElasticSearchIndex.RETRY_INITIAL_WAIT.getDefaultValue();

private static final Long RETRY_MAX_WAIT = ElasticSearchIndex.RETRY_MAX_WAIT.getDefaultValue();

private static final AtomicInteger instanceCount = new AtomicInteger();

@Captor
Expand Down Expand Up @@ -207,6 +215,10 @@ public void testConnectBasicHttpConfigurationAllOptions() throws Exception {
put("index." + INDEX_NAME + ".elasticsearch.scroll-keep-alive", String.valueOf(ES_SCROLL_KA)).
put("index." + INDEX_NAME + ".elasticsearch.bulk-refresh", ES_BULK_REFRESH).
put("index." + INDEX_NAME + ".elasticsearch.retry_on_conflict", String.valueOf(RETRY_ON_CONFLICT)).
put("index." + INDEX_NAME + ".elasticsearch.retry-limit", String.valueOf(RETRY_LIMIT)).
put("index." + INDEX_NAME + ".elasticsearch.retry-initial-wait", String.valueOf(RETRY_INITIAL_WAIT)).
put("index." + INDEX_NAME + ".elasticsearch.retry-max-wait", String.valueOf(RETRY_MAX_WAIT)).
put("index." + INDEX_NAME + ".elasticsearch.retry-error-codes", "408,429").
build());

assertNotNull(hostsConfigured);
Expand All @@ -223,6 +235,10 @@ public void testConnectBasicHttpConfigurationAllOptions() throws Exception {

verify(restElasticSearchClientMock).setBulkRefresh(eq(ES_BULK_REFRESH));
verify(restElasticSearchClientMock).setRetryOnConflict(eq(RETRY_ON_CONFLICT));
verify(restElasticSearchClientMock).setRetryAttemptLimit(eq(RETRY_LIMIT));
verify(restElasticSearchClientMock).setRetryInitialWaitMs(eq(RETRY_INITIAL_WAIT));
verify(restElasticSearchClientMock).setRetryMaxWaitMs(eq(RETRY_MAX_WAIT));
verify(restElasticSearchClientMock).setRetryOnErrorCodes(eq(Stream.of(408, 429).collect(Collectors.toSet())));

}

Expand Down

0 comments on commit 1bccf28

Please sign in to comment.