From 443a5896191edbf5652841856f9f534d38665496 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 25 Jun 2019 11:52:23 +0200 Subject: [PATCH] Handle thread interruption in credentials refresh service --- .../DefaultCredentialsRefreshService.java | 35 ++++++++++-- .../DefaultCredentialsRefreshServiceTest.java | 57 +++++++++++++++++++ 2 files changed, 88 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/impl/DefaultCredentialsRefreshService.java b/src/main/java/com/rabbitmq/client/impl/DefaultCredentialsRefreshService.java index ed081017ae..032a94c779 100644 --- a/src/main/java/com/rabbitmq/client/impl/DefaultCredentialsRefreshService.java +++ b/src/main/java/com/rabbitmq/client/impl/DefaultCredentialsRefreshService.java @@ -275,13 +275,38 @@ void maybeSetRefreshTask(Supplier> scheduledFutureSupplier) { } void refresh() { - // FIXME check whether thread has been cancelled or not before refresh() and refreshAction.call() + if (Thread.currentThread().isInterrupted()) { + return; + } - // FIXME protect this call, or at least log some error - this.credentialsProvider.refresh(); + int attemptCount = 0; + boolean refreshSucceeded = false; + while (attemptCount < 3) { + LOGGER.debug("Refreshing token for credentials provider {}", credentialsProvider); + try { + this.credentialsProvider.refresh(); + LOGGER.debug("Token refreshed for credentials provider {}", credentialsProvider); + refreshSucceeded = true; + break; + } catch (Exception e) { + LOGGER.warn("Error while trying to refresh token: {}", e.getMessage()); + } + attemptCount++; + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + + if (!refreshSucceeded) { + LOGGER.warn("Token refresh failed after retry, aborting callbacks"); + return; + } Iterator iterator = registrations.values().iterator(); - while (iterator.hasNext()) { + while (iterator.hasNext() && !Thread.currentThread().isInterrupted()) { Registration registration = iterator.next(); // FIXME set a timeout on the call? (needs a separate thread) try { @@ -291,6 +316,8 @@ void refresh() { iterator.remove(); } registration.errorHistory.set(0); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } catch (Exception e) { LOGGER.warn("Error while trying to refresh a connection token", e); registration.errorHistory.incrementAndGet(); diff --git a/src/test/java/com/rabbitmq/client/impl/DefaultCredentialsRefreshServiceTest.java b/src/test/java/com/rabbitmq/client/impl/DefaultCredentialsRefreshServiceTest.java index 2229c34d52..5a1518610d 100644 --- a/src/test/java/com/rabbitmq/client/impl/DefaultCredentialsRefreshServiceTest.java +++ b/src/test/java/com/rabbitmq/client/impl/DefaultCredentialsRefreshServiceTest.java @@ -167,6 +167,63 @@ public void refreshActionIsRemovedIfItErrorsTooMuch() throws Exception { verify(refreshAction, times(callsCountBeforeCancellation)).call(); } + @Test + public void errorInRefreshShouldBeRetried() throws Exception { + DefaultCredentialsRefreshService.CredentialsProviderState state = new DefaultCredentialsRefreshService.CredentialsProviderState( + credentialsProvider + ); + doThrow(RuntimeException.class).doThrow(RuntimeException.class) + .doNothing().when(credentialsProvider).refresh(); + + when(refreshAction.call()).thenReturn(true); + + state.add(new DefaultCredentialsRefreshService.Registration("1", refreshAction)); + + state.refresh(); + + verify(credentialsProvider, times(3)).refresh(); + verify(refreshAction, times(1)).call(); + } + + @Test + public void callbacksAreNotCalledWhenRetryOnRefreshIsExhausted() throws Exception { + DefaultCredentialsRefreshService.CredentialsProviderState state = new DefaultCredentialsRefreshService.CredentialsProviderState( + credentialsProvider + ); + doThrow(RuntimeException.class).when(credentialsProvider).refresh(); + + state.add(new DefaultCredentialsRefreshService.Registration("1", refreshAction)); + + state.refresh(); + + verify(credentialsProvider, times(3)).refresh(); + verify(refreshAction, times(0)).call(); + } + + @Test + public void refreshCanBeInterrupted() throws Exception { + DefaultCredentialsRefreshService.CredentialsProviderState state = new DefaultCredentialsRefreshService.CredentialsProviderState( + credentialsProvider + ); + + AtomicInteger callbackCount = new AtomicInteger(10); + when(refreshAction.call()).thenAnswer(invocation -> { + callbackCount.decrementAndGet(); + Thread.sleep(1000L); + return true; + }); + + IntStream.range(0, callbackCount.get()).forEach(i -> state.add(new DefaultCredentialsRefreshService.Registration(i + "", refreshAction))); + + Thread refreshThread = new Thread(() -> state.refresh()); + refreshThread.start(); + Thread.sleep(1000L); + refreshThread.interrupt(); + refreshThread.join(5000); + assertThat(refreshThread.isAlive()).isFalse(); + assertThat(callbackCount).hasValueGreaterThan(1); // not all the callbacks were called, because thread has been cancelled + } + @Test public void fixedDelayBeforeExpirationRefreshDelayStrategyTest() { Function delayStrategy = fixedDelayBeforeExpirationRefreshDelayStrategy(ofSeconds(20));