diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index a7eafd544..9e31537bf 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -120,6 +120,11 @@ http-client-spi ${awssdk.version} + + software.amazon.awssdk + dynamodb-enhanced + ${awssdk.version} + software.amazon.glue diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java index c4aecdda2..e9740ccac 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java @@ -223,6 +223,9 @@ void shutdown() { workerMetricsThreadPool.shutdown(); try { if (!lamThreadPool.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + log.info( + "LamThreadPool did not shutdown in {}s, forcefully shutting down", + SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS); lamThreadPool.shutdownNow(); } } catch (final InterruptedException e) { @@ -232,6 +235,9 @@ void shutdown() { try { if (!workerMetricsThreadPool.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + log.info( + "WorkerMetricsThreadPool did not shutdown in {}s, forcefully shutting down", + SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS); workerMetricsThreadPool.shutdownNow(); } } catch (final InterruptedException e) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java index 6235c5a93..912f0dc9a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java @@ -87,7 +87,7 @@ public synchronized void enter(final ClientVersion fromClientVersion) throws Dep } @Override - public void leave() { + public synchronized void leave() { if (entered && !left) { log.info("Leaving {}", this); cancelRollbackMonitor(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java index 96e16a0f5..ad744bfa9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java @@ -126,7 +126,7 @@ public void shutdown() { if (!stateMachineThreadPool.isShutdown()) { stateMachineThreadPool.shutdown(); try { - if (stateMachineThreadPool.awaitTermination(THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + if (!stateMachineThreadPool.awaitTermination(THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { log.info( "StateMachineThreadPool did not shutdown within {} seconds, forcefully shutting down", THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS);