Skip to content

Commit

Permalink
Handle thread interruption in credentials refresh service
Browse files Browse the repository at this point in the history
  • Loading branch information
acogoluegnes committed Jun 25, 2019
1 parent 97c8700 commit 443a589
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,38 @@ void maybeSetRefreshTask(Supplier<ScheduledFuture<?>> scheduledFutureSupplier) {
}

void refresh() {
// FIXME check whether thread has been cancelled or not before refresh() and refreshAction.call()
if (Thread.currentThread().isInterrupted()) {
return;
}

// FIXME protect this call, or at least log some error
this.credentialsProvider.refresh();
int attemptCount = 0;
boolean refreshSucceeded = false;
while (attemptCount < 3) {
LOGGER.debug("Refreshing token for credentials provider {}", credentialsProvider);
try {
this.credentialsProvider.refresh();
LOGGER.debug("Token refreshed for credentials provider {}", credentialsProvider);
refreshSucceeded = true;
break;
} catch (Exception e) {
LOGGER.warn("Error while trying to refresh token: {}", e.getMessage());
}
attemptCount++;
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}

if (!refreshSucceeded) {
LOGGER.warn("Token refresh failed after retry, aborting callbacks");
return;
}

Iterator<Registration> iterator = registrations.values().iterator();
while (iterator.hasNext()) {
while (iterator.hasNext() && !Thread.currentThread().isInterrupted()) {
Registration registration = iterator.next();
// FIXME set a timeout on the call? (needs a separate thread)
try {
Expand All @@ -291,6 +316,8 @@ void refresh() {
iterator.remove();
}
registration.errorHistory.set(0);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
LOGGER.warn("Error while trying to refresh a connection token", e);
registration.errorHistory.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,63 @@ public void refreshActionIsRemovedIfItErrorsTooMuch() throws Exception {
verify(refreshAction, times(callsCountBeforeCancellation)).call();
}

@Test
public void errorInRefreshShouldBeRetried() throws Exception {
DefaultCredentialsRefreshService.CredentialsProviderState state = new DefaultCredentialsRefreshService.CredentialsProviderState(
credentialsProvider
);
doThrow(RuntimeException.class).doThrow(RuntimeException.class)
.doNothing().when(credentialsProvider).refresh();

when(refreshAction.call()).thenReturn(true);

state.add(new DefaultCredentialsRefreshService.Registration("1", refreshAction));

state.refresh();

verify(credentialsProvider, times(3)).refresh();
verify(refreshAction, times(1)).call();
}

@Test
public void callbacksAreNotCalledWhenRetryOnRefreshIsExhausted() throws Exception {
DefaultCredentialsRefreshService.CredentialsProviderState state = new DefaultCredentialsRefreshService.CredentialsProviderState(
credentialsProvider
);
doThrow(RuntimeException.class).when(credentialsProvider).refresh();

state.add(new DefaultCredentialsRefreshService.Registration("1", refreshAction));

state.refresh();

verify(credentialsProvider, times(3)).refresh();
verify(refreshAction, times(0)).call();
}

@Test
public void refreshCanBeInterrupted() throws Exception {
DefaultCredentialsRefreshService.CredentialsProviderState state = new DefaultCredentialsRefreshService.CredentialsProviderState(
credentialsProvider
);

AtomicInteger callbackCount = new AtomicInteger(10);
when(refreshAction.call()).thenAnswer(invocation -> {
callbackCount.decrementAndGet();
Thread.sleep(1000L);
return true;
});

IntStream.range(0, callbackCount.get()).forEach(i -> state.add(new DefaultCredentialsRefreshService.Registration(i + "", refreshAction)));

Thread refreshThread = new Thread(() -> state.refresh());
refreshThread.start();
Thread.sleep(1000L);
refreshThread.interrupt();
refreshThread.join(5000);
assertThat(refreshThread.isAlive()).isFalse();
assertThat(callbackCount).hasValueGreaterThan(1); // not all the callbacks were called, because thread has been cancelled
}

@Test
public void fixedDelayBeforeExpirationRefreshDelayStrategyTest() {
Function<Duration, Duration> delayStrategy = fixedDelayBeforeExpirationRefreshDelayStrategy(ofSeconds(20));
Expand Down

0 comments on commit 443a589

Please sign in to comment.