diff --git a/docs/configs/janusgraph-cfg.md b/docs/configs/janusgraph-cfg.md index dd506a9582e..352dbfc265b 100644 --- a/docs/configs/janusgraph-cfg.md +++ b/docs/configs/janusgraph-cfg.md @@ -156,6 +156,10 @@ Elasticsearch index configuration | index.[X].elasticsearch.enable_index_names_cache | Enables cache for generated index store names. It is recommended to always enable index store names cache unless you have more then 50000 indexes per index store. | Boolean | true | MASKABLE | | index.[X].elasticsearch.health-request-timeout | When JanusGraph initializes its ES backend, JanusGraph waits up to this duration for the ES cluster health to reach at least yellow status. This string should be formatted as a natural number followed by the lowercase letter "s", e.g. 3s or 60s. | String | 30s | MASKABLE | | index.[X].elasticsearch.interface | Interface for connecting to Elasticsearch. TRANSPORT_CLIENT and NODE were previously supported, but now are required to migrate to REST_CLIENT. See the JanusGraph upgrade instructions for more details. | String | REST_CLIENT | MASKABLE | +| index.[X].elasticsearch.retry-error-codes | Elasticsearch REST client ResponseException error codes To Retry. | String[] | | LOCAL | +| index.[X].elasticsearch.retry-initial-wait | Sets the initial retry wait time (in milliseconds) before exponential backoff. | Long | 1 | LOCAL | +| index.[X].elasticsearch.retry-limit | Sets the number of attempts for configured retryable error codes. | Integer | 0 | LOCAL | +| index.[X].elasticsearch.retry-max-wait | Sets the max retry wait time (in milliseconds). | Long | 1000 | LOCAL | | index.[X].elasticsearch.retry_on_conflict | Specify how many times should the operation be retried when a conflict occurs. | Integer | 0 | MASKABLE | | index.[X].elasticsearch.scroll-keep-alive | How long (in seconds) elasticsearch should keep alive the scroll context. | Integer | 60 | GLOBAL_OFFLINE | | index.[X].elasticsearch.setup-max-open-scroll-contexts | Whether JanusGraph should setup max_open_scroll_context to maximum value for the cluster or not. | Boolean | true | MASKABLE | diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java index 549865af082..19f7bedfa42 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java @@ -304,6 +304,26 @@ public class ElasticSearchIndex implements IndexProvider { "Sets the maximum socket timeout (in milliseconds).", ConfigOption.Type.MASKABLE, Integer.class, RestClientBuilder.DEFAULT_SOCKET_TIMEOUT_MILLIS); + public static final ConfigOption RETRY_LIMIT = + new ConfigOption<>(ELASTICSEARCH_NS, "retry-limit", + "Sets the number of attempts for configured retryable error codes.", ConfigOption.Type.LOCAL, + Integer.class, 0); + + public static final ConfigOption RETRY_INITIAL_WAIT = + new ConfigOption<>(ELASTICSEARCH_NS, "retry-initial-wait", + "Sets the initial retry wait time (in milliseconds) before exponential backoff.", + ConfigOption.Type.LOCAL, Long.class, 1L); + + public static final ConfigOption RETRY_MAX_WAIT = + new ConfigOption<>(ELASTICSEARCH_NS, "retry-max-wait", + "Sets the max retry wait time (in milliseconds).", ConfigOption.Type.LOCAL, + Long.class, 1000L); + + public static final ConfigOption RETRY_ERROR_CODES = + new ConfigOption<>(ELASTICSEARCH_NS, "retry-error-codes", + "Elasticsearch REST client ResponseException error codes To Retry.", ConfigOption.Type.LOCAL, + String[].class, new String[0]); + public static final int HOST_PORT_DEFAULT = 9200; /** diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestClientSetup.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestClientSetup.java index 457756546b4..dd1d2321029 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestClientSetup.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestClientSetup.java @@ -39,8 +39,11 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_HOSTS; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_PORT; @@ -81,6 +84,12 @@ public ElasticSearchClient connect(Configuration config) throws IOException { Integer retryOnConflict = config.has(ElasticSearchIndex.RETRY_ON_CONFLICT) ? config.get(ElasticSearchIndex.RETRY_ON_CONFLICT) : null; client.setRetryOnConflict(retryOnConflict); + client.setRetryAttemptLimit(config.getOrDefault(ElasticSearchIndex.RETRY_LIMIT)); + client.setRetryInitialWaitMs(config.getOrDefault(ElasticSearchIndex.RETRY_INITIAL_WAIT)); + client.setRetryMaxWaitMs(config.getOrDefault(ElasticSearchIndex.RETRY_MAX_WAIT)); + Set errorCodesToRetry = Arrays.stream(config.getOrDefault(ElasticSearchIndex.RETRY_ERROR_CODES)).mapToInt(Integer::parseInt).boxed().collect(Collectors.toSet()); + client.setRetryOnErrorCodes(errorCodesToRetry); + return client; } diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java index 1101e1185ce..e0ed8d6981f 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java @@ -48,9 +48,11 @@ import java.io.IOException; import java.io.InputStream; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -110,7 +112,15 @@ public class RestElasticSearchClient implements ElasticSearchClient { private Integer retryOnConflict; private final String retryOnConflictKey; - + + private int retryAttemptLimit; + + private final Set retryOnErrorCodes = new HashSet<>(); + + private long retryInitialWaitMs; + + private long retryMaxWaitMs; + public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean useMappingTypesForES7) { this.delegate = delegate; majorVersion = getMajorVersion(); @@ -419,6 +429,22 @@ public void setRetryOnConflict(Integer retryOnConflict) { this.retryOnConflict = retryOnConflict; } + public void setRetryAttemptLimit(int retryAttemptLimit) { + this.retryAttemptLimit = retryAttemptLimit; + } + + public void setRetryInitialWaitMs(long retryInitialWaitMs) { + this.retryInitialWaitMs = retryInitialWaitMs; + } + + public void setRetryMaxWaitMs(long retryMaxWaitMs) { + this.retryMaxWaitMs = retryMaxWaitMs; + } + + public void setRetryOnErrorCodes(Set retryOnErrorCodes) { + this.retryOnErrorCodes.addAll(retryOnErrorCodes); + } + @Override public long countTotal(String indexName, Map requestData) throws IOException { @@ -546,13 +572,42 @@ private Response performRequest(String method, String path, byte[] requestData) return performRequest(new Request(method, path), requestData); } + private Response performRequestWithRetry(Request request) throws IOException { + int retryCount = 0; + ResponseException lastException; + do { + if (retryCount > 0) { + //Wait before trying again + long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount - 1)), retryMaxWaitMs); + log.warn("Retrying Elasticsearch request in {} ms. Attempt {} of {}", waitDurationMs, retryCount, retryAttemptLimit); + try { + Thread.sleep(waitDurationMs); + } catch (InterruptedException e) { + throw new RuntimeException(String.format("Thread interrupted while waiting for retry attempt %d of %d", retryCount, retryAttemptLimit), e); + } + } + try { + return delegate.performRequest(request); + } catch (ResponseException e) { + if (!retryOnErrorCodes.contains(e.getResponse().getStatusLine().getStatusCode())) { + throw e; + } + lastException = e; + } + retryCount++; + } while (retryCount <= retryAttemptLimit); + + //If we've exceeded our retry limit then throw the last exception that was captured from retrying + throw lastException; + } + private Response performRequest(Request request, byte[] requestData) throws IOException { final HttpEntity entity = requestData != null ? new ByteArrayEntity(requestData, ContentType.APPLICATION_JSON) : null; request.setEntity(entity); - final Response response = delegate.performRequest(request); + final Response response = performRequestWithRetry(request); if (response.getStatusLine().getStatusCode() >= 400) { throw new IOException("Error executing request: " + response.getStatusLine().getReasonPhrase()); diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientSetupTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientSetupTest.java index 7b2c8755d89..1f0f04ce49a 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientSetupTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientSetupTest.java @@ -54,6 +54,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.net.ssl.SSLContext; import static org.junit.jupiter.api.Assertions.assertArrayEquals; @@ -92,6 +94,12 @@ public class RestClientSetupTest { private static final Integer RETRY_ON_CONFLICT = ElasticSearchIndex.RETRY_ON_CONFLICT.getDefaultValue(); + private static final Integer RETRY_LIMIT = ElasticSearchIndex.RETRY_LIMIT.getDefaultValue(); + + private static final Long RETRY_INITIAL_WAIT = ElasticSearchIndex.RETRY_INITIAL_WAIT.getDefaultValue(); + + private static final Long RETRY_MAX_WAIT = ElasticSearchIndex.RETRY_MAX_WAIT.getDefaultValue(); + private static final AtomicInteger instanceCount = new AtomicInteger(); @Captor @@ -207,6 +215,10 @@ public void testConnectBasicHttpConfigurationAllOptions() throws Exception { put("index." + INDEX_NAME + ".elasticsearch.scroll-keep-alive", String.valueOf(ES_SCROLL_KA)). put("index." + INDEX_NAME + ".elasticsearch.bulk-refresh", ES_BULK_REFRESH). put("index." + INDEX_NAME + ".elasticsearch.retry_on_conflict", String.valueOf(RETRY_ON_CONFLICT)). + put("index." + INDEX_NAME + ".elasticsearch.retry-limit", String.valueOf(RETRY_LIMIT)). + put("index." + INDEX_NAME + ".elasticsearch.retry-initial-wait", String.valueOf(RETRY_INITIAL_WAIT)). + put("index." + INDEX_NAME + ".elasticsearch.retry-max-wait", String.valueOf(RETRY_MAX_WAIT)). + put("index." + INDEX_NAME + ".elasticsearch.retry-error-codes", "408,429"). build()); assertNotNull(hostsConfigured); @@ -223,6 +235,10 @@ public void testConnectBasicHttpConfigurationAllOptions() throws Exception { verify(restElasticSearchClientMock).setBulkRefresh(eq(ES_BULK_REFRESH)); verify(restElasticSearchClientMock).setRetryOnConflict(eq(RETRY_ON_CONFLICT)); + verify(restElasticSearchClientMock).setRetryAttemptLimit(eq(RETRY_LIMIT)); + verify(restElasticSearchClientMock).setRetryInitialWaitMs(eq(RETRY_INITIAL_WAIT)); + verify(restElasticSearchClientMock).setRetryMaxWaitMs(eq(RETRY_MAX_WAIT)); + verify(restElasticSearchClientMock).setRetryOnErrorCodes(eq(Stream.of(408, 429).collect(Collectors.toSet()))); }