Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-640 : Fix URL failover mechanism in Schema Registry Client #664

Merged
merged 11 commits into from
Feb 19, 2020

Conversation

raju-saravanan
Copy link
Collaborator

@raju-saravanan raju-saravanan commented Feb 6, 2020

Fixes #640

Copy link
Contributor

@guruchai guruchai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put a doc stitching up the URLSelectors, RetryPolicy and what it means to use of them?


import java.util.Objects;

public class RetryContext<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java Doc please

Copy link
Contributor

@kamalcph kamalcph Feb 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a private constructor to make sure that the instance can only be created through the builder.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed this. Thanks for pointing out.

Class<? extends RuntimeException> exceptionClass = retryContext.retryOnException();
RuntimeException exception = null;

System.out.println("Policy : " + policy);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be deleted.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep


import java.util.Map;

public class ExponentialBackoffRetryPolicy extends RetryPolicy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JavaDoc please


public class NOOPRetryPolicy extends RetryPolicy {

public NOOPRetryPolicy() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOOPRetryPolicy is effectively no retry, if you are doing right away. Isnt it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.

private static final String RETRY_STRATEGY_CONFIG_KEY = "config";

private static final RetryManager RETRY_MANAGER = new RetryManager();
private static final String DEFAULT_RETRY_STRATEGY_CLASS = NOOPRetryPolicy.class.getCanonicalName();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should have FixedRetry/Exponential as a default policy.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many of the existing customers would be handling retry on the application side, in order to changes backward compatible, I have kept NOOPRetry as the default policy.

throw retryableException;
}
try {
LOG.info("Calling the body : " + targets.rootTarget);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it debug

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

return retryBody.run(targets);
} catch (RegistryRetryableException e) {
urlSelector.urlWithError(targets.rootTarget.getUri().toString(), e);
retryableException = e;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point of making it with error, if we are retry it?

@guruchai guruchai requested a review from kamalcph February 11, 2020 14:06
Copy link
Contributor

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Left some comments to address.

.name())).longValue(),
TimeUnit.SECONDS)
.build();
.maximumSize(((Number) configuration.getValue(Configuration.SCHEMA_TEXT_CACHE_SIZE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's best practice to avoid other changes when fixing the bugs. It helps in cherry-pick the fix to other branches and to do quick code-review.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I understand, sorry about that.

ClassLoader classLoader = this.getClass().getClassLoader();
Class<? extends RetryPolicy> clazz = null;
try {
clazz = (Class<? extends RetryPolicy>) Class.forName(retryPolicyClass, true, classLoader);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need the classLoader? Do you expect the user to load their own RetryPolicy?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not quite sure. At the time when I plugged this in, it made sense, now I am not quite sure. Do you guys think this could be a real requirement? @guruchai

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary. It picks current class's classloader anyway.

try {
clazz = (Class<? extends RetryPolicy>) Class.forName(retryPolicyClass, true, classLoader);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Unable to Class : " + retryPolicyClass, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Unable to initiate the class:

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.keyManagerFactoryAlgorithm(sslConfigurations.get("keyManagerFactoryAlgorithm"))
.keyManagerFactoryProvider(sslConfigurations.get("keyManagerFactoryProvider"));
.keyStoreFile(sslConfigurations.get(SSL_KEY_STORE_PATH))
.keyStorePassword(sslConfigurations.get("keyStorePassword"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert unnecessary changes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
} while (policy.mayBeSleep(++iteration, System.currentTimeMillis() - startTime));

LOG.debug("Reached the limit of retries for the request after iteration : " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Use {} instead of string concatenation for readability.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure


import java.util.Objects;

public class RetryContext<T> {
Copy link
Contributor

@kamalcph kamalcph Feb 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a private constructor to make sure that the instance can only be created through the builder.


public class RetryContext<T> {

private RetryPolicy policy = new NOOPRetryPolicy();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The builder should set the defaults in the build method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep,agree.


public static class Builder<T> {

private RetryContext<T> retryContext = new RetryContext<T>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One can reuse the same Builder instance in multiple places. Better to create the RetryContext inside the build method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, makes sense.

} catch (LoginException e) {
throw new RegistryRetryableException(e);
}
Response response = RETRY_MANAGER.execute(new RetryContext.Builder<Response>().policy(retryPolicy)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Building the RetryContext with same policy and retryOnException for all the requests is redundant. Can you extract it in a method?

(eg)

   @Override
    public Collection<SchemaVersionKey> findSchemasByFields(SchemaFieldQuery schemaFieldQuery) {
        return invoke((SchemaRegistryTargets targets) -> {
            WebTarget target = targets.searchFieldsTarget;
            for (Map.Entry<String, String> entry : schemaFieldQuery.toQueryMap().entrySet()) {
                target = target.queryParam(entry.getKey(), entry.getValue());
            }
            return getEntities(target, SchemaVersionKey.class);
        });
    }

    private <T> T invoke(RetryBody<T> retryBody) {
        return RETRY_MANAGER.execute(new RetryContext.Builder<T>()
                .policy(retryPolicy)
                .retryOnException(RegistryRetryableException.class)
                .build(() -> runWithFailoverContext(retryBody)));
    }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about removing Request class from RetryContext, then retryManager can be invoked as
retryManager.execute(retryContext, () -> {});
This translates to execute this block code against this retryContext, which makes sense.
That along with your invoke method would clean up the code nicely.

if (initialWebTarget == null) {
initialWebTarget = targets.rootTarget;
} else if (initialWebTarget.equals(targets.rootTarget)) {
throw retryableException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you throwing exception here? Please explain it in a comment so that others can understand.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will add more comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the part of the code that prompted me in asking you to document how URLSelector and RetryPolicy work together.

@@ -8,4 +8,11 @@ schema.registry.client.schema.metadata.cache.expiry.interval : 300
schema.registry.client.schema.text.cache.size : 1024
schema.registry.client.schema.text.cache.expiry.interval : 300
schema.registry.client.url.selector : "com.hortonworks.registries.schemaregistry.client.FailoverUrlSelector"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the URL selector come under schema.registry.client.retry.strategy config?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't move this URL selector config under the retry strategy config, as it will break backward compatibility.


public abstract void init(Map<String, Object> properties);

public boolean mayBeSleep(int iteration, long timeElapsed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please rename the iteration to attempt.

Copy link
Collaborator Author

@raju-saravanan raju-saravanan Feb 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will rename this to attemptNumber.

try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException e) {
throw new RetryPolicyException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we masking the InterruptedException as RetryPolicyException? Can it be handled in RetryManager itself?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set the interrupt flag if you want to swallow the Interrupted exception and throw it as RuntimeException.
Thread.currentThread().interrupt();

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this should be RuntimeException instead.

@raju-saravanan
Copy link
Collaborator Author

@kamalcph and @guruchai : Addressed PR comments.

@raju-saravanan
Copy link
Collaborator Author

@guruchai and @kamalcph: Can you guys review the PR ? I want to merge this by end of day (18 Feb).

Copy link
Contributor

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some minor comments to address.

});
}

private RetryExecutor createRetryExecutor(long sleepMs, int iteration, long maxSleepMs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: refactor the variable names w.r.t src code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Map<String, Object> props = new HashMap<>();
switch (retryPolicyType) {
case FIXED:
props.put(BackoffPolicy.TIMEOUT_MS, maxSleepMs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: props.put can be placed outside of switch statement.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}));

Assert.assertEquals(2, iteration.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxAttempts is set to 3. Why the expected value in assert statement is 2? (change the iteration to attempt and initialise it to 1).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
});

Assert.assertTrue((stopTime.get() - startTime.get() < 2000) && iteration.get() > 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should assert that the retry gets halted when the #attempts exceed the timeout ((stop - start > timeout)). But, the above statement assert for something else. Please refactor this test case.

nit: Use java.time.Instant for timing operations.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this test, I don't see any point to it.

}

@Test(expected = RuntimeException.class)
public void testExceptionMaxIteration() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Give a meaningful name to the test. (eg) testThrowExceptionOnExceedingMaxAttempts.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

@Test(expected = RuntimeException.class)
public void testExceptionForMaxSleep() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@Test(expected = RuntimeException.class)
public void testExceptionForMaxSleep() {
createRetryExecutor(100, 1000, 300).execute(() -> {
throw new RuntimeException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, test for the state of interrupt flag.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will come back for more tests in another PR as we are pressed on time.

float exponent = 2;
ExponentialBackoffPolicy exponentialBackoffRetryPolicy = new ExponentialBackoffPolicy(sleepMs, exponent, 10, 10000L);
for (int i = 1; i <= 10; i++) {
long sleep = (long) (sleepMs * Math.pow(exponent, (i - 1)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

repeating the src formula isn't a good approach. If the formula has error, the test case won't detect it. Expected output for a given input would be good.


}

public ExponentialBackoffPolicy(Long sleepTimeMs, Float exponent, Integer maxAttempts, Long maxSleepTimeMs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exponent parameter is unused.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@raju-saravanan
Copy link
Collaborator Author

@guruchai: Can I go ahead and merge the PR ?

@raju-saravanan raju-saravanan merged commit ce7be7e into master Feb 19, 2020
raju-saravanan added a commit that referenced this pull request Feb 19, 2020
*   Back up

*   Back up tests

*   Back up

*   Revert schema registry changes

*   Plugin retry manager code in schema registry client

*   Plug in failover mechanism with retry in schema registry client

*   Remove retry manager exception from the code

*   Add support for retry based on exceptions

*   Fix all APIs to remove calls to currentSchemaRegistryTargets

*   Address PR comments

*   Refactor tests
sipeti pushed a commit that referenced this pull request Sep 23, 2020
*   Back up

*   Back up tests

*   Back up

*   Revert schema registry changes

*   Plugin retry manager code in schema registry client

*   Plug in failover mechanism with retry in schema registry client

*   Remove retry manager exception from the code

*   Add support for retry based on exceptions

*   Fix all APIs to remove calls to currentSchemaRegistryTargets

*   Address PR comments

*   Refactor tests
sipeti pushed a commit that referenced this pull request Sep 23, 2020
*   Back up

*   Back up tests

*   Back up

*   Revert schema registry changes

*   Plugin retry manager code in schema registry client

*   Plug in failover mechanism with retry in schema registry client

*   Remove retry manager exception from the code

*   Add support for retry based on exceptions

*   Fix all APIs to remove calls to currentSchemaRegistryTargets

*   Address PR comments

*   Refactor tests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

SchemaRegistryClient doesn't take advantage of the UrlSelector for failover scenarios
3 participants