-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ISSUE-640 : Fix URL failover mechanism in Schema Registry Client #664
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you put a doc stitching up the URLSelectors, RetryPolicy and what it means to use of them?
|
||
import java.util.Objects; | ||
|
||
public class RetryContext<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java Doc please
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create a private constructor to make sure that the instance can only be created through the builder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I missed this. Thanks for pointing out.
Class<? extends RuntimeException> exceptionClass = retryContext.retryOnException(); | ||
RuntimeException exception = null; | ||
|
||
System.out.println("Policy : " + policy); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep
|
||
import java.util.Map; | ||
|
||
public class ExponentialBackoffRetryPolicy extends RetryPolicy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JavaDoc please
|
||
public class NOOPRetryPolicy extends RetryPolicy { | ||
|
||
public NOOPRetryPolicy() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOOPRetryPolicy is effectively no retry, if you are doing right away. Isnt it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep.
private static final String RETRY_STRATEGY_CONFIG_KEY = "config"; | ||
|
||
private static final RetryManager RETRY_MANAGER = new RetryManager(); | ||
private static final String DEFAULT_RETRY_STRATEGY_CLASS = NOOPRetryPolicy.class.getCanonicalName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should have FixedRetry/Exponential as a default policy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many of the existing customers would be handling retry on the application side, in order to changes backward compatible, I have kept NOOPRetry as the default policy.
throw retryableException; | ||
} | ||
try { | ||
LOG.info("Calling the body : " + targets.rootTarget); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it debug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
return retryBody.run(targets); | ||
} catch (RegistryRetryableException e) { | ||
urlSelector.urlWithError(targets.rootTarget.getUri().toString(), e); | ||
retryableException = e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the point of making it with error, if we are retry it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Left some comments to address.
.name())).longValue(), | ||
TimeUnit.SECONDS) | ||
.build(); | ||
.maximumSize(((Number) configuration.getValue(Configuration.SCHEMA_TEXT_CACHE_SIZE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's best practice to avoid other changes when fixing the bugs. It helps in cherry-pick the fix to other branches and to do quick code-review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I understand, sorry about that.
ClassLoader classLoader = this.getClass().getClassLoader(); | ||
Class<? extends RetryPolicy> clazz = null; | ||
try { | ||
clazz = (Class<? extends RetryPolicy>) Class.forName(retryPolicyClass, true, classLoader); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need the classLoader? Do you expect the user to load their own RetryPolicy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not quite sure. At the time when I plugged this in, it made sense, now I am not quite sure. Do you guys think this could be a real requirement? @guruchai
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessary. It picks current class's classloader anyway.
try { | ||
clazz = (Class<? extends RetryPolicy>) Class.forName(retryPolicyClass, true, classLoader); | ||
} catch (ClassNotFoundException e) { | ||
throw new RuntimeException("Unable to Class : " + retryPolicyClass, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Unable to initiate the class:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
.keyManagerFactoryAlgorithm(sslConfigurations.get("keyManagerFactoryAlgorithm")) | ||
.keyManagerFactoryProvider(sslConfigurations.get("keyManagerFactoryProvider")); | ||
.keyStoreFile(sslConfigurations.get(SSL_KEY_STORE_PATH)) | ||
.keyStorePassword(sslConfigurations.get("keyStorePassword")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert unnecessary changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
} while (policy.mayBeSleep(++iteration, System.currentTimeMillis() - startTime)); | ||
|
||
LOG.debug("Reached the limit of retries for the request after iteration : " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Use {}
instead of string concatenation for readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
|
||
import java.util.Objects; | ||
|
||
public class RetryContext<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create a private constructor to make sure that the instance can only be created through the builder.
|
||
public class RetryContext<T> { | ||
|
||
private RetryPolicy policy = new NOOPRetryPolicy(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The builder should set the defaults in the build
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep,agree.
|
||
public static class Builder<T> { | ||
|
||
private RetryContext<T> retryContext = new RetryContext<T>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One can reuse the same Builder
instance in multiple places. Better to create the RetryContext
inside the build
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, makes sense.
} catch (LoginException e) { | ||
throw new RegistryRetryableException(e); | ||
} | ||
Response response = RETRY_MANAGER.execute(new RetryContext.Builder<Response>().policy(retryPolicy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Building the RetryContext with same policy and retryOnException for all the requests is redundant. Can you extract it in a method?
(eg)
@Override
public Collection<SchemaVersionKey> findSchemasByFields(SchemaFieldQuery schemaFieldQuery) {
return invoke((SchemaRegistryTargets targets) -> {
WebTarget target = targets.searchFieldsTarget;
for (Map.Entry<String, String> entry : schemaFieldQuery.toQueryMap().entrySet()) {
target = target.queryParam(entry.getKey(), entry.getValue());
}
return getEntities(target, SchemaVersionKey.class);
});
}
private <T> T invoke(RetryBody<T> retryBody) {
return RETRY_MANAGER.execute(new RetryContext.Builder<T>()
.policy(retryPolicy)
.retryOnException(RegistryRetryableException.class)
.build(() -> runWithFailoverContext(retryBody)));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about removing Request class from RetryContext, then retryManager can be invoked as
retryManager.execute(retryContext, () -> {});
This translates to execute this block code against this retryContext, which makes sense.
That along with your invoke method would clean up the code nicely.
if (initialWebTarget == null) { | ||
initialWebTarget = targets.rootTarget; | ||
} else if (initialWebTarget.equals(targets.rootTarget)) { | ||
throw retryableException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you throwing exception here? Please explain it in a comment so that others can understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, will add more comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the part of the code that prompted me in asking you to document how URLSelector and RetryPolicy work together.
@@ -8,4 +8,11 @@ schema.registry.client.schema.metadata.cache.expiry.interval : 300 | |||
schema.registry.client.schema.text.cache.size : 1024 | |||
schema.registry.client.schema.text.cache.expiry.interval : 300 | |||
schema.registry.client.url.selector : "com.hortonworks.registries.schemaregistry.client.FailoverUrlSelector" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the URL selector come under schema.registry.client.retry.strategy
config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't move this URL selector config under the retry strategy config, as it will break backward compatibility.
|
||
public abstract void init(Map<String, Object> properties); | ||
|
||
public boolean mayBeSleep(int iteration, long timeElapsed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please rename the iteration
to attempt
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will rename this to attemptNumber.
try { | ||
Thread.sleep(sleepTimeMs); | ||
} catch (InterruptedException e) { | ||
throw new RetryPolicyException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we masking the InterruptedException
as RetryPolicyException
? Can it be handled in RetryManager itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set the interrupt flag if you want to swallow the Interrupted exception and throw it as RuntimeException.
Thread.currentThread().interrupt();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this should be RuntimeException instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some minor comments to address.
}); | ||
} | ||
|
||
private RetryExecutor createRetryExecutor(long sleepMs, int iteration, long maxSleepMs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: refactor the variable names w.r.t src code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Map<String, Object> props = new HashMap<>(); | ||
switch (retryPolicyType) { | ||
case FIXED: | ||
props.put(BackoffPolicy.TIMEOUT_MS, maxSleepMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: props.put
can be placed outside of switch statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
})); | ||
|
||
Assert.assertEquals(2, iteration.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxAttempts is set to 3. Why the expected value in assert statement is 2? (change the iteration to attempt and initialise it to 1).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
}); | ||
|
||
Assert.assertTrue((stopTime.get() - startTime.get() < 2000) && iteration.get() > 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test should assert that the retry gets halted when the #attempts exceed the timeout
((stop - start > timeout)). But, the above statement assert for something else. Please refactor this test case.
nit: Use java.time.Instant
for timing operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove this test, I don't see any point to it.
} | ||
|
||
@Test(expected = RuntimeException.class) | ||
public void testExceptionMaxIteration() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Give a meaningful name to the test. (eg) testThrowExceptionOnExceedingMaxAttempts
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
@Test(expected = RuntimeException.class) | ||
public void testExceptionForMaxSleep() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@Test(expected = RuntimeException.class) | ||
public void testExceptionForMaxSleep() { | ||
createRetryExecutor(100, 1000, 300).execute(() -> { | ||
throw new RuntimeException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, test for the state of interrupt
flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will come back for more tests in another PR as we are pressed on time.
float exponent = 2; | ||
ExponentialBackoffPolicy exponentialBackoffRetryPolicy = new ExponentialBackoffPolicy(sleepMs, exponent, 10, 10000L); | ||
for (int i = 1; i <= 10; i++) { | ||
long sleep = (long) (sleepMs * Math.pow(exponent, (i - 1))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
repeating the src formula isn't a good approach. If the formula has error, the test case won't detect it. Expected output for a given input would be good.
|
||
} | ||
|
||
public ExponentialBackoffPolicy(Long sleepTimeMs, Float exponent, Integer maxAttempts, Long maxSleepTimeMs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exponent parameter is unused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
@guruchai: Can I go ahead and merge the PR ? |
* Back up * Back up tests * Back up * Revert schema registry changes * Plugin retry manager code in schema registry client * Plug in failover mechanism with retry in schema registry client * Remove retry manager exception from the code * Add support for retry based on exceptions * Fix all APIs to remove calls to currentSchemaRegistryTargets * Address PR comments * Refactor tests
* Back up * Back up tests * Back up * Revert schema registry changes * Plugin retry manager code in schema registry client * Plug in failover mechanism with retry in schema registry client * Remove retry manager exception from the code * Add support for retry based on exceptions * Fix all APIs to remove calls to currentSchemaRegistryTargets * Address PR comments * Refactor tests
* Back up * Back up tests * Back up * Revert schema registry changes * Plugin retry manager code in schema registry client * Plug in failover mechanism with retry in schema registry client * Remove retry manager exception from the code * Add support for retry based on exceptions * Fix all APIs to remove calls to currentSchemaRegistryTargets * Address PR comments * Refactor tests
Fixes #640