Skip to content

Commit

Permalink
Moved retry configurations into constructor and removed setters
Browse files Browse the repository at this point in the history
  • Loading branch information
criminosis committed Apr 25, 2024
1 parent 13e65bf commit c3961fc
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,20 @@ public ElasticSearchClient connect(Configuration config) throws IOException {
final int scrollKeepAlive = config.get(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE);
Preconditions.checkArgument(scrollKeepAlive >= 1, "Scroll keep-alive should be greater than or equal to 1");
final boolean useMappingTypesForES7 = config.get(ElasticSearchIndex.USE_MAPPING_FOR_ES7);
final RestElasticSearchClient client = getElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7);
int retryLimit = config.getOrDefault(ElasticSearchIndex.RETRY_LIMIT);
long retryInitialWaitMs = config.getOrDefault(ElasticSearchIndex.RETRY_INITIAL_WAIT);
long retryMaxWaitMs = config.getOrDefault(ElasticSearchIndex.RETRY_MAX_WAIT);
Set<Integer> errorCodesToRetry = Arrays.stream(config.getOrDefault(ElasticSearchIndex.RETRY_ERROR_CODES))
.mapToInt(Integer::parseInt).boxed().collect(Collectors.toSet());
final RestElasticSearchClient client = getElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7,
retryLimit, errorCodesToRetry, retryInitialWaitMs, retryMaxWaitMs);
if (config.has(ElasticSearchIndex.BULK_REFRESH)) {
client.setBulkRefresh(config.get(ElasticSearchIndex.BULK_REFRESH));
}

Integer retryOnConflict = config.has(ElasticSearchIndex.RETRY_ON_CONFLICT) ? config.get(ElasticSearchIndex.RETRY_ON_CONFLICT) : null;
client.setRetryOnConflict(retryOnConflict);

client.setRetryAttemptLimit(config.getOrDefault(ElasticSearchIndex.RETRY_LIMIT));
client.setRetryInitialWaitMs(config.getOrDefault(ElasticSearchIndex.RETRY_INITIAL_WAIT));
client.setRetryMaxWaitMs(config.getOrDefault(ElasticSearchIndex.RETRY_MAX_WAIT));
Set<Integer> errorCodesToRetry = Arrays.stream(config.getOrDefault(ElasticSearchIndex.RETRY_ERROR_CODES)).mapToInt(Integer::parseInt).boxed().collect(Collectors.toSet());
client.setRetryOnErrorCodes(errorCodesToRetry);

return client;
}

Expand All @@ -113,8 +113,11 @@ protected RestClientBuilder getRestClientBuilder(HttpHost[] hosts) {
return RestClient.builder(hosts);
}

protected RestElasticSearchClient getElasticSearchClient(RestClient rc, int scrollKeepAlive, boolean useMappingTypesForES7) {
return new RestElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7);
protected RestElasticSearchClient getElasticSearchClient(RestClient rc, int scrollKeepAlive, boolean useMappingTypesForES7,
int retryAttemptLimit, Set<Integer> retryOnErrorCodes, long retryInitialWaitMs,
long retryMaxWaitMs) {
return new RestElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7, retryAttemptLimit, retryOnErrorCodes,
retryInitialWaitMs, retryMaxWaitMs);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -113,21 +113,27 @@ public class RestElasticSearchClient implements ElasticSearchClient {

private final String retryOnConflictKey;

private int retryAttemptLimit;
private final int retryAttemptLimit;

private final Set<Integer> retryOnErrorCodes = new HashSet<>();
private final Set<Integer> retryOnErrorCodes;

private long retryInitialWaitMs;
private final long retryInitialWaitMs;

private long retryMaxWaitMs;
private final long retryMaxWaitMs;

public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean useMappingTypesForES7) {
public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean useMappingTypesForES7,
int retryAttemptLimit, Set<Integer> retryOnErrorCodes, long retryInitialWaitMs,
long retryMaxWaitMs) {
this.delegate = delegate;
majorVersion = getMajorVersion();
this.scrollKeepAlive = scrollKeepAlive+"s";
esVersion7 = ElasticMajorVersion.SEVEN.equals(majorVersion);
useMappingTypes = majorVersion.getValue() < 7 || (useMappingTypesForES7 && esVersion7);
retryOnConflictKey = majorVersion.getValue() >= 7 ? "retry_on_conflict" : "_retry_on_conflict";
this.retryAttemptLimit = retryAttemptLimit;
this.retryOnErrorCodes = Collections.unmodifiableSet(retryOnErrorCodes);
this.retryInitialWaitMs = retryInitialWaitMs;
this.retryMaxWaitMs = retryMaxWaitMs;
}

@Override
Expand Down Expand Up @@ -429,22 +435,6 @@ public void setRetryOnConflict(Integer retryOnConflict) {
this.retryOnConflict = retryOnConflict;
}

public void setRetryAttemptLimit(int retryAttemptLimit) {
this.retryAttemptLimit = retryAttemptLimit;
}

public void setRetryInitialWaitMs(long retryInitialWaitMs) {
this.retryInitialWaitMs = retryInitialWaitMs;
}

public void setRetryMaxWaitMs(long retryMaxWaitMs) {
this.retryMaxWaitMs = retryMaxWaitMs;
}

public void setRetryOnErrorCodes(Set<Integer> retryOnErrorCodes) {
this.retryOnErrorCodes.addAll(retryOnErrorCodes);
}

@Override
public long countTotal(String indexName, Map<String, Object> requestData) throws IOException {

Expand Down Expand Up @@ -582,7 +572,7 @@ private Response performRequestWithRetry(Request request) throws IOException {
throw e;
}
//Wait before trying again
long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount - 1)), retryMaxWaitMs);
long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount)), retryMaxWaitMs);
log.warn("Retrying Elasticsearch request in {} ms. Attempt {} of {}", waitDurationMs, retryCount, retryAttemptLimit);
try {
Thread.sleep(waitDurationMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
Expand All @@ -32,6 +31,7 @@

import java.io.IOException;
import java.util.Collections;
import java.util.Set;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
Expand All @@ -57,16 +57,15 @@ public class RestClientRetryTest {
@Captor
private ArgumentCaptor<Request> requestCaptor;

private RestElasticSearchClient restClientUnderTest;

@BeforeEach
public void setUp() throws IOException {
RestElasticSearchClient createClient(int retryAttemptLimit, Set<Integer> retryErrorCodes) throws IOException {
//Just throw an exception when there's an attempt to look up the ES version during instantiation
when(restClientMock.performRequest(any())).thenThrow(new IOException());
restClientUnderTest = new RestElasticSearchClient(restClientMock, 0, false);
restClientUnderTest.setRetryMaxWaitMs(0); //don't actually slow the test down

RestElasticSearchClient clientUnderTest = new RestElasticSearchClient(restClientMock, 0, false,
retryAttemptLimit, retryErrorCodes, 0, 0);
//There's an initial query to get the ES version we need to accommodate, and then reset for the actual test
Mockito.reset(restClientMock);
return clientUnderTest;
}

@Test
Expand All @@ -79,18 +78,17 @@ public void testRetryOnConfiguredErrorStatus() throws IOException {
//Just throw an expected exception the second time to confirm the retry occurred
//rather than mock out a parsable response
IOException expectedFinalException = new IOException("Expected");
when(restClientMock.performRequest(any()))
.thenThrow(responseException)
.thenThrow(expectedFinalException);


restClientUnderTest.setRetryAttemptLimit(expectedNumberOfRequestAttempts - 1);
restClientUnderTest.setRetryOnErrorCodes(Sets.newHashSet(retryCode));
try {
try (RestElasticSearchClient restClientUnderTest = createClient(expectedNumberOfRequestAttempts - 1,
Sets.newHashSet(retryCode))) {
//prime the restClientMock again after it's reset after creation
when(restClientMock.performRequest(any()))
.thenThrow(responseException)
.thenThrow(expectedFinalException);
restClientUnderTest.bulkRequest(Collections.emptyList(), null);
Assertions.fail("Should have thrown the expected exception after retry");
} catch (Exception e) {
Assertions.assertSame(e, expectedFinalException);
} catch (Exception actualException) {
Assertions.assertSame(expectedFinalException, actualException);
}
verify(restClientMock, times(expectedNumberOfRequestAttempts)).performRequest(requestCaptor.capture());
}
Expand All @@ -104,20 +102,21 @@ public void testRetriesExhaustedReturnsLastRetryException() throws IOException {
doReturn(response).when(responseException).getResponse();
ResponseException initialRetryException = mock(ResponseException.class);
doReturn(response).when(initialRetryException).getResponse();
when(restClientMock.performRequest(any()))
//first throw a different retry exception instance, then make sure it's the latter one
//that was retained and then thrown
.thenThrow(initialRetryException)
.thenThrow(responseException);

try (RestElasticSearchClient restClientUnderTest = createClient(expectedNumberOfRequestAttempts - 1,
Sets.newHashSet(retryCode))) {
//prime the restClientMock again after it's reset after creation
when(restClientMock.performRequest(any()))
//first throw a different retry exception instance, then make sure it's the latter one
//that was retained and then thrown
.thenThrow(initialRetryException)
.thenThrow(responseException);


restClientUnderTest.setRetryAttemptLimit(expectedNumberOfRequestAttempts - 1);
restClientUnderTest.setRetryOnErrorCodes(Sets.newHashSet(retryCode));
try {
restClientUnderTest.bulkRequest(Collections.emptyList(), null);
Assertions.fail("Should have thrown the expected exception after retry");
} catch (Exception e) {
Assertions.assertSame(e, responseException);
Assertions.assertSame(responseException, e);
}
verify(restClientMock, times(expectedNumberOfRequestAttempts)).performRequest(requestCaptor.capture());
}
Expand All @@ -127,15 +126,16 @@ public void testNonRetryErrorCodeException() throws IOException {
doReturn(503).when(statusLine).getStatusCode();
doReturn(statusLine).when(response).getStatusLine();
doReturn(response).when(responseException).getResponse();
when(restClientMock.performRequest(any()))
.thenThrow(responseException);
//Other retry error code is configured
restClientUnderTest.setRetryOnErrorCodes(Sets.newHashSet(429));
try {
try (RestElasticSearchClient restClientUnderTest = createClient(0,
//Other retry error code is configured
Sets.newHashSet(429))) {
//prime the restClientMock again after it's reset after creation
when(restClientMock.performRequest(any()))
.thenThrow(responseException);
restClientUnderTest.bulkRequest(Collections.emptyList(), null);
Assertions.fail("Should have thrown the expected exception");
} catch (Exception e) {
Assertions.assertSame(e, responseException);
Assertions.assertSame(responseException, e);
}
verify(restClientMock, times(1)).performRequest(requestCaptor.capture());
}
Expand All @@ -145,11 +145,11 @@ public void testNonResponseExceptionErrorThrown() throws IOException {
IOException differentExceptionType = new IOException();
when(restClientMock.performRequest(any()))
.thenThrow(differentExceptionType);
try {
try (RestElasticSearchClient restClientUnderTest = createClient(0, Collections.emptySet())) {
restClientUnderTest.bulkRequest(Collections.emptyList(), null);
Assertions.fail("Should have thrown the expected exception");
} catch (Exception e) {
Assertions.assertSame(e, differentExceptionType);
Assertions.assertSame(differentExceptionType, e);
}
verify(restClientMock, times(1)).performRequest(requestCaptor.capture());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -62,6 +63,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyInt;
Expand Down Expand Up @@ -108,6 +111,18 @@ public class RestClientSetupTest {
@Captor
ArgumentCaptor<Integer> scrollKACaptor;

@Captor
ArgumentCaptor<Integer> retryAttemptLimitCaptor;

@Captor
ArgumentCaptor<Long> retryInitialWaitCaptor;

@Captor
ArgumentCaptor<Long> retryMaxWaitCaptor;

@Captor
ArgumentCaptor<Set<Integer>> retryErrorCodesCaptor;

@Spy
private RestClientSetup restClientSetup = new RestClientSetup();

Expand Down Expand Up @@ -142,7 +157,8 @@ private ElasticSearchClient baseConfigTest(Map<String, String> extraConfigValues
doReturn(restClientBuilderMock).
when(restClientSetup).getRestClientBuilder(any());
doReturn(restElasticSearchClientMock).when(restClientSetup).
getElasticSearchClient(any(RestClient.class), anyInt(), anyBoolean());
getElasticSearchClient(any(RestClient.class), anyInt(), anyBoolean(),
anyInt(), anySet(), anyLong(), anyLong());

return restClientSetup.connect(config.restrictTo(INDEX_NAME));
}
Expand Down Expand Up @@ -173,7 +189,8 @@ public void testConnectBasicHttpConfigurationSingleHost() throws Exception {
assertEquals(SCHEME_HTTP, host0.getSchemeName());
assertEquals(ElasticSearchIndex.HOST_PORT_DEFAULT, host0.getPort());

verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean());
verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean(),
anyInt(), anySet(), anyLong(), anyLong());
assertEquals(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE.getDefaultValue().intValue(),
scrollKACaptor.getValue().intValue());

Expand All @@ -199,7 +216,8 @@ public void testConnectBasicHttpConfigurationMultiHost() throws Exception {
assertEquals(SCHEME_HTTP, host1.getSchemeName());
assertEquals(ElasticSearchIndex.HOST_PORT_DEFAULT, host1.getPort());

verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean());
verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean(),
anyInt(), anySet(), anyLong(), anyLong());
assertEquals(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE.getDefaultValue().intValue(),
scrollKACaptor.getValue().intValue());

Expand Down Expand Up @@ -229,17 +247,22 @@ public void testConnectBasicHttpConfigurationAllOptions() throws Exception {
assertEquals(SCHEME_HTTP, host0.getSchemeName());
assertEquals(ES_PORT, host0.getPort());

verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean());
verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean(),
retryAttemptLimitCaptor.capture(), retryErrorCodesCaptor.capture(), retryInitialWaitCaptor.capture(),
retryMaxWaitCaptor.capture());
assertEquals(ES_SCROLL_KA,
scrollKACaptor.getValue().intValue());
assertEquals(RETRY_LIMIT,
retryAttemptLimitCaptor.getValue().intValue());
assertEquals(Stream.of(408, 429).collect(Collectors.toSet()),
retryErrorCodesCaptor.capture());
assertEquals(RETRY_INITIAL_WAIT,
retryInitialWaitCaptor.getValue().longValue());
assertEquals(RETRY_MAX_WAIT,
retryMaxWaitCaptor.getValue().longValue());

verify(restElasticSearchClientMock).setBulkRefresh(eq(ES_BULK_REFRESH));
verify(restElasticSearchClientMock).setRetryOnConflict(eq(RETRY_ON_CONFLICT));
verify(restElasticSearchClientMock).setRetryAttemptLimit(eq(RETRY_LIMIT));
verify(restElasticSearchClientMock).setRetryInitialWaitMs(eq(RETRY_INITIAL_WAIT));
verify(restElasticSearchClientMock).setRetryMaxWaitMs(eq(RETRY_MAX_WAIT));
verify(restElasticSearchClientMock).setRetryOnErrorCodes(eq(Stream.of(408, 429).collect(Collectors.toSet())));

}

@Test
Expand All @@ -258,7 +281,8 @@ public void testConnectBasicHttpsConfigurationSingleHost() throws Exception {
assertEquals(SCHEME_HTTPS, host0.getSchemeName());
assertEquals(ElasticSearchIndex.HOST_PORT_DEFAULT, host0.getPort());

verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean());
verify(restClientSetup).getElasticSearchClient(same(restClientMock), scrollKACaptor.capture(), anyBoolean(),
anyInt(), anySet(), anyLong(), anyLong());
assertEquals(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE.getDefaultValue().intValue(),
scrollKACaptor.getValue().intValue());

Expand Down

0 comments on commit c3961fc

Please sign in to comment.