From e0afd4cd02b69af4bc0a57200d58ceadbbdfd6c9 Mon Sep 17 00:00:00 2001 From: Ari Ekmekji Date: Tue, 24 Oct 2023 17:32:46 -0700 Subject: [PATCH] Guarantee in order delivery for partitions --- CHANGELOG.md | 4 + pom.xml | 2 +- .../java/rockset/RocksetConnectorConfig.java | 13 +- src/main/java/rockset/RocksetSinkTask.java | 205 ++++++------------ .../java/rockset/utils/BlockingExecutor.java | 12 +- .../java/rockset/utils/RetriableTask.java | 108 --------- .../java/rockset/RocksetSinkTaskTest.java | 60 ++--- .../rockset/utils/BlockingExecutorTest.java | 22 +- .../java/rockset/utils/RetriableTaskTest.java | 134 ------------ 9 files changed, 107 insertions(+), 453 deletions(-) delete mode 100644 src/main/java/rockset/utils/RetriableTask.java delete mode 100644 src/test/java/rockset/utils/RetriableTaskTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index e833008..8d04c49 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,8 @@ # Rockset Kafka Connect Changelog +## v2.0.0 2023-10-30 +- New configuration option `rockset.retry.backoff.ms` +- Bug fix for potential out-of-order message delivery + ## v1.4.3 2023-09-15 - Update rockset-java client dependency diff --git a/pom.xml b/pom.xml index 22db96c..a3e3a40 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ rockset kafka-connect-rockset - 1.4.3 + 2.0.0 jar kafka-connect-rockset diff --git a/src/main/java/rockset/RocksetConnectorConfig.java b/src/main/java/rockset/RocksetConnectorConfig.java index aafdf17..98dc12e 100644 --- a/src/main/java/rockset/RocksetConnectorConfig.java +++ b/src/main/java/rockset/RocksetConnectorConfig.java @@ -23,6 +23,7 @@ public class RocksetConnectorConfig extends AbstractConfig { public static final String ROCKSET_WORKSPACE = "rockset.workspace"; public static final String ROCKSET_TASK_THREADS = "rockset.task.threads"; public static final String ROCKSET_BATCH_SIZE = "rockset.batch.size"; + public static final String ROCKSET_RETRY_BACKOFF_MS = "rockset.retry.backoff.ms"; private RocksetConnectorConfig(ConfigDef config, Map originals) { super(config, originals, true); @@ -44,7 +45,7 @@ public static ConfigDef config() { .documentation("Rockset API Server URL") .importance(Importance.HIGH) .validator(RocksetConnectorConfig::validateApiServer) - .defaultValue("https://api.rs2.usw2.rockset.com") + .defaultValue("https://api.usw2a1.rockset.com") .build()) .define( ConfigKeyBuilder.of(ROCKSET_INTEGRATION_KEY, Type.STRING) @@ -91,6 +92,12 @@ public static ConfigDef config() { "(Deprecated) Rockset workspace that incoming documents will be written to.") .importance(Importance.HIGH) .defaultValue("commons") + .build()) + .define( + ConfigKeyBuilder.of(ROCKSET_RETRY_BACKOFF_MS, Type.INT) + .documentation("How long to backoff in milliseconds between retriable errors") + .importance(Importance.MEDIUM) + .defaultValue(5000) .build()); } @@ -154,4 +161,8 @@ public int getRocksetBatchSize() { public String getFormat() { return this.getString(FORMAT); } + + public int getRetryBackoffMs() { + return this.getInt(ROCKSET_RETRY_BACKOFF_MS); + } } diff --git a/src/main/java/rockset/RocksetSinkTask.java b/src/main/java/rockset/RocksetSinkTask.java index 01547a2..25aee57 100644 --- a/src/main/java/rockset/RocksetSinkTask.java +++ b/src/main/java/rockset/RocksetSinkTask.java @@ -1,19 +1,12 @@ package rockset; import com.github.jcustenborder.kafka.connect.utils.VersionUtil; -import com.google.common.base.Preconditions; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import okhttp3.OkHttpClient; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -28,25 +21,15 @@ import rockset.parser.JsonParser; import rockset.parser.RecordParser; import rockset.utils.BlockingExecutor; -import rockset.utils.RetriableTask; public class RocksetSinkTask extends SinkTask { - private static Logger log = LoggerFactory.getLogger(RocksetSinkTask.class); - private RequestWrapper rw; - - // taskExecutorService is responsible to run the task of sending data - // to Rockset. If the task fails, it submits it to retryExecutorService - // to be submitted back to taskExecutorService after a delay. - private BlockingExecutor taskExecutorService; - - // retryExecutorService scheduled tasks to be retried after a delay and - // submits it to taskExecutorService. If the retryExecutorService is full - // it will fail the task immediately (retrying is best effort). - // Make sure this has more threads than the task executor thread pool always - // otherwise it can lead to deadlock. - private ExecutorService retryExecutorService; - - private Map> futureMap; + + private static final Logger log = LoggerFactory.getLogger(RocksetSinkTask.class); + + private RequestWrapper requestWrapper; + + private BlockingExecutor executorService; + private RocksetConnectorConfig config; private RecordParser recordParser; @@ -71,44 +54,30 @@ public void start(Map settings) { .writeTimeout(1, TimeUnit.MINUTES) .readTimeout(1, TimeUnit.MINUTES) .build(); - this.rw = new RocksetRequestWrapper(config, httpClient); - - int numThreads = this.config.getRocksetTaskThreads(); - this.taskExecutorService = - new BlockingExecutor(numThreads, Executors.newFixedThreadPool(numThreads)); - - this.retryExecutorService = - new ThreadPoolExecutor( - numThreads * 2, - numThreads * 2, - 1, - TimeUnit.MINUTES, - new LinkedBlockingQueue<>(numThreads * 10)); - - this.futureMap = new HashMap<>(); + this.requestWrapper = new RocksetRequestWrapper(config, httpClient); + this.executorService = + new BlockingExecutor( + config.getRocksetTaskThreads(), + Executors.newScheduledThreadPool(config.getRocksetTaskThreads())); this.recordParser = getRecordParser(config.getFormat()); } @TestOnly public void start( Map settings, - RequestWrapper rw, - ExecutorService executorService, - ExecutorService retryExecutorService) { + RequestWrapper requestWrapper, + BlockingExecutor executorService) { this.config = new RocksetConnectorConfig(settings); - this.rw = rw; - this.taskExecutorService = - new BlockingExecutor(config.getRocksetTaskThreads(), executorService); - this.retryExecutorService = retryExecutorService; - this.futureMap = new HashMap<>(); + this.requestWrapper = requestWrapper; + this.executorService = executorService; this.recordParser = getRecordParser(config.getFormat()); } @Override public void stop() { log.info("Stopping Rockset Kafka Connect Plugin, waiting for active tasks to complete"); - if (taskExecutorService != null) { - taskExecutorService.shutdownNow(); + if (executorService != null) { + executorService.shutdownNow(); } log.info("Stopped Rockset Kafka Connect Plugin"); } @@ -116,119 +85,67 @@ public void stop() { // open() will be called for a partition before any put() is called for it @Override public void open(Collection partitions) { - log.debug(String.format("Opening %d partitions: %s", partitions.size(), partitions)); - partitions.forEach( - tp -> { - Preconditions.checkState(!futureMap.containsKey(tp)); - futureMap.put(tp, new ArrayList<>()); - }); + log.debug("Opening {} partitions: {}", partitions.size(), partitions); } @Override public void close(Collection partitions) { - log.debug(String.format("Closing %d partitions: %s", partitions.size(), partitions)); - partitions.forEach( - tp -> { - Preconditions.checkState(futureMap.containsKey(tp)); - futureMap.remove(tp); - }); + log.debug("Closing {} partitions: {}", partitions.size(), partitions); } - // put() doesn't need to block until the writes complete, that is what flush() is for @Override public void put(Collection records) { - if (records.size() == 0) { - log.debug("zero records in put call, returning"); - return; - } - - groupRecordsByTopicPartition(records) - .forEach( - (tp, recordBatch) -> { - try { - RetriableTask task = - new RetriableTask( - taskExecutorService, - retryExecutorService, - () -> addDocs(tp.topic(), recordBatch)); - - // this should only block if all the threads are busy - taskExecutorService.submit(task); - - Preconditions.checkState(futureMap.containsKey(tp)); - futureMap.get(tp).add(task); - } catch (InterruptedException e) { - throw new ConnectException("Failed to put records", e); + Map> futures = + records.stream() + .collect(Collectors.groupingBy(r -> new TopicPartition(r.topic(), r.kafkaPartition()))) + .entrySet() + .stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> + executorService.submit( + () -> + requestWrapper.addDoc( + e.getKey().topic(), + e.getValue(), + recordParser, + config.getRocksetBatchSize())))); + futures.forEach( + (tp, f) -> { + try { + f.get(); + } catch (Exception e) { + // Stop all tasks still executing since this put() will be retried anyway + futures.values().forEach(g -> g.cancel(true)); + if (isRetriableException(e)) { + if (context != null) { + context.timeout(config.getRetryBackoffMs()); } - }); + throw new RetriableException( + String.format( + "Encountered a retriable exception for topic %s partition %s", + tp.topic(), tp.partition()), + e); + } + throw new ConnectException( + String.format( + "Encountered an unexpected exception for topic %s partition %s", + tp.topic(), tp.partition()), + e); + } + }); } @Override public void flush(Map map) { - map.forEach( - (tp, offsetAndMetadata) -> { - log.debug( - "Flushing for topic: {}, partition: {}, offset: {}, metadata: {}", - tp.topic(), - tp.partition(), - offsetAndMetadata.offset(), - offsetAndMetadata.metadata()); - checkForFailures(tp); - }); - } - - private Map> groupRecordsByTopicPartition( - Collection records) { - Map> topicPartitionedRecords = new HashMap<>(); - for (SinkRecord record : records) { - TopicPartition key = new TopicPartition(record.topic(), record.kafkaPartition()); - topicPartitionedRecords.computeIfAbsent(key, k -> new ArrayList<>()).add(record); - } - - return topicPartitionedRecords; + // Not needed } private boolean isRetriableException(Throwable e) { return (e.getCause() != null && e.getCause() instanceof RetriableException); } - private void checkForFailures(TopicPartition tp) { - if (futureMap.get(tp) == null) { - return; - } - - List futures = futureMap.get(tp); - Iterator futureIterator = futures.iterator(); - while (futureIterator.hasNext()) { - Future future = futureIterator.next(); - try { - future.get(); - } catch (Exception e) { - if (isRetriableException(e)) { - throw new RetriableException( - String.format( - "Unable to write document for topic: %s, partition: %s, in Rockset," - + " should retry, cause: %s", - tp.topic(), tp.partition(), e.getMessage()), - e); - } - - throw new RuntimeException( - String.format( - "Unable to write document for topic: %s, partition: %s, in Rockset," + " cause: %s", - tp.topic(), tp.partition(), e.getMessage()), - e); - } finally { - futureIterator.remove(); - } - } - } - - private void addDocs(String topic, Collection records) { - log.debug("Adding {} records to Rockset for topic: {}", records.size(), topic); - this.rw.addDoc(topic, records, recordParser, this.config.getRocksetBatchSize()); - } - @Override public String version() { return VersionUtil.version(this.getClass()); diff --git a/src/main/java/rockset/utils/BlockingExecutor.java b/src/main/java/rockset/utils/BlockingExecutor.java index 93c8d56..6ef22f5 100644 --- a/src/main/java/rockset/utils/BlockingExecutor.java +++ b/src/main/java/rockset/utils/BlockingExecutor.java @@ -1,8 +1,10 @@ package rockset.utils; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.Semaphore; +import org.apache.kafka.connect.errors.RetriableException; /** * Use BlockingExecutor to block threads submitting tasks, when the executor is completely occupied. @@ -19,8 +21,14 @@ public BlockingExecutor(int numThreads, ExecutorService executorService) { // returns immediately if a thread is available to run the task, // else blocks until one of the active tasks completes - public Future submit(RetriableTask task) throws InterruptedException { - semaphore.acquire(); + public Future submit(Runnable task) { + try { + semaphore.acquire(); + } catch (InterruptedException unused) { + CompletableFuture a = new CompletableFuture<>(); + a.completeExceptionally(new RetriableException("Thread interrrupted")); + return a; + } return executorService.submit( () -> { try { diff --git a/src/main/java/rockset/utils/RetriableTask.java b/src/main/java/rockset/utils/RetriableTask.java deleted file mode 100644 index 65e8163..0000000 --- a/src/main/java/rockset/utils/RetriableTask.java +++ /dev/null @@ -1,108 +0,0 @@ -package rockset.utils; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.FutureTask; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadLocalRandom; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.errors.RetriableException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -// -// RetriableTask encapsulates a runnable expression. If the runnable fails -// due to a retriable exception, the run method submits it to another thread pool -// to submit it to task thread pool to run after some time. -// If all the retries expire, run method sets the execution exception -// and fails. -// If the the retry queue is full, it will reject the task (retries are best effort) -// - -public class RetriableTask extends FutureTask { - private static Logger log = LoggerFactory.getLogger(RetriableTask.class); - - private static final int MAX_RETRIES_COUNT = 5; - private static final int INITIAL_DELAY = 250; - private static final double JITTER_FACTOR = 0.2; - - private final Runnable runnable; - private final BlockingExecutor taskExecutorService; - private final ExecutorService retryExecutorService; - - private int numRetries = 0; - private int delay = INITIAL_DELAY; - - public RetriableTask( - BlockingExecutor taskExecutorService, - ExecutorService retryExecutorService, - Runnable runnable) { - - super(runnable, null); - this.taskExecutorService = taskExecutorService; - this.retryExecutorService = retryExecutorService; - this.runnable = runnable; - } - - private void retry(Throwable retryException) { - delay *= 2; - long jitterDelay = jitter(delay); - Long retryTime = System.currentTimeMillis() + jitterDelay; - - log.warn( - String.format( - "Encountered retriable error. Retry count: %s. Retrying in %s ms.", - numRetries, jitterDelay), - retryException); - - Runnable runnable = - () -> { - try { - Long sleepTime = retryTime - System.currentTimeMillis(); - if (sleepTime > 0) { - Thread.sleep(jitterDelay); - } - - taskExecutorService.submit(this); - } catch (InterruptedException e) { - throw new ConnectException("Failed to put records", e); - } - }; - - try { - retryExecutorService.submit(runnable); - } catch (RejectedExecutionException e) { - setException(e); - return; - } - } - - private static long jitter(int delay) { - double rnd = ThreadLocalRandom.current().nextDouble(-1, 1); - return (long) (delay * (1 + JITTER_FACTOR * rnd)); - } - - @Override - public void run() { - try { - runnable.run(); - - // mark completion of the task - set(null); - } catch (Exception e) { - // if not a retriable exception, set the exception and return - if (!(e instanceof RetriableException)) { - setException(e); - return; - } - - ++numRetries; - // if retries exhausted, set the exception and return - if (numRetries > MAX_RETRIES_COUNT) { - setException(e); - return; - } - - retry(e); - } - } -} diff --git a/src/test/java/rockset/RocksetSinkTaskTest.java b/src/test/java/rockset/RocksetSinkTaskTest.java index cb580e5..ac6b1b9 100644 --- a/src/test/java/rockset/RocksetSinkTaskTest.java +++ b/src/test/java/rockset/RocksetSinkTaskTest.java @@ -2,13 +2,11 @@ import static org.junit.jupiter.api.Assertions.assertThrows; -import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -21,14 +19,14 @@ import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import rockset.utils.BlockingExecutor; public class RocksetSinkTaskTest { private static final Logger log = LoggerFactory.getLogger(RocksetSinkTaskTest.class); private void addDoc(String topic, Map settings, Collection records) { RocksetRequestWrapper rr = Mockito.mock(RocksetRequestWrapper.class); - ExecutorService executorService = MoreExecutors.newDirectExecutorService(); - ExecutorService retryExecutorService = MoreExecutors.newDirectExecutorService(); + BlockingExecutor executorService = new BlockingExecutor(1, Executors.newScheduledThreadPool(1)); TopicPartition tp = new TopicPartition(topic, 1); Collection assignedPartitions = Collections.singleton(tp); @@ -36,7 +34,7 @@ private void addDoc(String topic, Map settings, Collection settings = new HashMap<>(); RocksetRequestWrapper rc = Mockito.mock(RocksetRequestWrapper.class); - ExecutorService executorService = MoreExecutors.newDirectExecutorService(); - ExecutorService retryExecutorService = Executors.newFixedThreadPool(2); + BlockingExecutor executorService = new BlockingExecutor(1, Executors.newScheduledThreadPool(1)); RocksetSinkTask rst = new RocksetSinkTask(); - rst.start(settings, rc, executorService, retryExecutorService); + rst.start(settings, rc, executorService); rst.open(Collections.singleton(new TopicPartition(topic, partition))); // Do a put that simulates throwing Retryable exception from apiserver - // The put does not throw, but rather the succeeding flush throws the exception. Mockito.doThrow(new RetriableException("retry")) .when(rc) .addDoc(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt()); - rst.put(records); - Map fmap = new HashMap<>(); fmap.put(new TopicPartition(topic, partition), new OffsetAndMetadata(0)); - assertThrows(RetriableException.class, () -> rst.flush(fmap)); - - // The previous flush has thrown an exception and the exception is cleared. - // New puts should be successful. + assertThrows( + RetriableException.class, + () -> { + rst.put(records); + rst.flush(fmap); + }); + + // Post flush, new puts should be successful. Mockito.doNothing() .when(rc) .addDoc(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt()); @@ -114,34 +112,4 @@ public void testRetriesPut() { rst.close(Collections.singleton(new TopicPartition(topic, partition))); rst.stop(); } - - @Test - public void testRetriesFlush() { - String topic = "testRetries"; - int partition = 1; - SinkRecord sr = new SinkRecord(topic, partition, null, "key", null, "{\"name\":\"johnny\"}", 0); - Collection records = new ArrayList<>(); - records.add(sr); - - Map settings = new HashMap<>(); - - RocksetRequestWrapper rc = Mockito.mock(RocksetRequestWrapper.class); - Mockito.doThrow(new RetriableException("retry")) - .when(rc) - .addDoc(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt()); - - ExecutorService executorService = MoreExecutors.newDirectExecutorService(); - ExecutorService retryExecutorService = Executors.newFixedThreadPool(2); - - RocksetSinkTask rst = new RocksetSinkTask(); - rst.start(settings, rc, executorService, retryExecutorService); - rst.open(Collections.singleton(new TopicPartition(topic, partition))); - rst.put(records); - - Map map = new HashMap<>(); - map.put(new TopicPartition("testRetries", 1), new OffsetAndMetadata(1L)); - assertThrows(RetriableException.class, () -> rst.flush(map)); - rst.close(Collections.singleton(new TopicPartition(topic, partition))); - rst.stop(); - } } diff --git a/src/test/java/rockset/utils/BlockingExecutorTest.java b/src/test/java/rockset/utils/BlockingExecutorTest.java index c06e905..c0516e3 100644 --- a/src/test/java/rockset/utils/BlockingExecutorTest.java +++ b/src/test/java/rockset/utils/BlockingExecutorTest.java @@ -1,14 +1,12 @@ package rockset.utils; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,16 +19,13 @@ public void testBlockingExecutor() throws Exception { log.info("Running testBlockingExecutor"); BlockingExecutor executor = new BlockingExecutor(2, Executors.newFixedThreadPool(2)); - ExecutorService retryExecutorService = Mockito.mock(ExecutorService.class); CountDownLatch start1 = new CountDownLatch(1); CountDownLatch start2 = new CountDownLatch(1); // Both should go through - Future f1 = - executor.submit(new RetriableTask(executor, retryExecutorService, () -> wait(start1))); - Future f2 = - executor.submit(new RetriableTask(executor, retryExecutorService, () -> wait(start2))); + Future f1 = executor.submit(() -> wait(start1)); + Future f2 = executor.submit(() -> wait(start2)); // finish and wait for second task start2.countDown(); @@ -38,8 +33,7 @@ public void testBlockingExecutor() throws Exception { // add another task CountDownLatch start3 = new CountDownLatch(1); - Future f3 = - executor.submit(new RetriableTask(executor, retryExecutorService, () -> wait(start3))); + Future f3 = executor.submit(() -> wait(start3)); // finish and wait for third task start3.countDown(); @@ -56,10 +50,9 @@ public void testBlockingExecutor() throws Exception { public void testBlockingExecutorBlocksWhenFull() throws Exception { log.info("Running testBlockingExecutorBlocksWhenFull"); BlockingExecutor executor = new BlockingExecutor(1, Executors.newFixedThreadPool(1)); - ExecutorService retryExecutorService = Mockito.mock(ExecutorService.class); CountDownLatch waitLatch = new CountDownLatch(1); - executor.submit(new RetriableTask(executor, retryExecutorService, () -> wait(waitLatch))); + executor.submit(() -> wait(waitLatch)); // submitting a new job from a different thread should block the new thread CountDownLatch doneLatch = new CountDownLatch(1); @@ -67,12 +60,7 @@ public void testBlockingExecutorBlocksWhenFull() throws Exception { Executors.newFixedThreadPool(1) .submit( () -> { - try { - executor.submit( - new RetriableTask(executor, retryExecutorService, doneLatch::countDown)); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + executor.submit(doneLatch::countDown); }); try { diff --git a/src/test/java/rockset/utils/RetriableTaskTest.java b/src/test/java/rockset/utils/RetriableTaskTest.java deleted file mode 100644 index a996f90..0000000 --- a/src/test/java/rockset/utils/RetriableTaskTest.java +++ /dev/null @@ -1,134 +0,0 @@ -package rockset.utils; - -import static org.junit.jupiter.api.Assertions.assertThrows; - -import com.google.common.util.concurrent.MoreExecutors; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.errors.RetriableException; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RetriableTaskTest { - private static final Logger log = LoggerFactory.getLogger(RetriableTaskTest.class); - - @Test - public void testRetriableTask() throws Exception { - log.info("Running testRetriableTask"); - - BlockingExecutor blockingExecutor = Mockito.mock(BlockingExecutor.class); - ExecutorService retryExecutorService = MoreExecutors.newDirectExecutorService(); - - Runnable runnable = - () -> { - throw new RetriableException("retriable runnable"); - }; - - RetriableTask retriableTask = - new RetriableTask(blockingExecutor, retryExecutorService, runnable); - - Mockito.doAnswer( - new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) { - retriableTask.run(); - return null; - } - }) - .when(blockingExecutor) - .submit(retriableTask); - - retriableTask.run(); - assertThrows(ExecutionException.class, () -> retriableTask.get()); - - // should be retried 5 times - Mockito.verify(blockingExecutor, Mockito.times(5)).submit(retriableTask); - } - - @Test - public void testRetriableTaskNoRetries() throws Exception { - log.info("Running testRetriableTaskNoRetries"); - - BlockingExecutor blockingExecutor = Mockito.mock(BlockingExecutor.class); - ExecutorService retryExecutorService = MoreExecutors.newDirectExecutorService(); - Runnable runnable = - () -> { - log.info("Success!"); - }; - - RetriableTask retriableTask = - new RetriableTask(blockingExecutor, retryExecutorService, runnable); - - Mockito.doAnswer( - new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) { - Assertions.fail("task retried, was expected to succeed in first attempt!"); - return null; - } - }) - .when(blockingExecutor) - .submit(retriableTask); - - retriableTask.run(); - - // zero retries - Mockito.verify(blockingExecutor, Mockito.times(0)).submit(retriableTask); - } - - @Test - public void testRetriableTaskNotRetriable() throws Exception { - log.info("Running testRetriableTaskNotRetriable"); - - BlockingExecutor blockingExecutor = Mockito.mock(BlockingExecutor.class); - ExecutorService retryExecutorService = MoreExecutors.newDirectExecutorService(); - Runnable runnable = - () -> { - throw new ConnectException("non retriable runnable"); - }; - - RetriableTask retriableTask = - new RetriableTask(blockingExecutor, retryExecutorService, runnable); - - retriableTask.run(); - - assertThrows(ExecutionException.class, () -> retriableTask.get()); - - // zero retries - Mockito.verify(blockingExecutor, Mockito.times(0)).submit(retriableTask); - } - - @Test - public void testRetryRejected() throws Exception { - log.info("Running testRetryRejected"); - - BlockingExecutor blockingExecutor = Mockito.mock(BlockingExecutor.class); - ExecutorService retryExecutorService = Mockito.mock(ExecutorService.class); - - Runnable runnable = - () -> { - throw new RetriableException("retriable runnable"); - }; - - Mockito.doThrow(RejectedExecutionException.class) - .when(retryExecutorService) - .submit(Mockito.any(Runnable.class)); - - RetriableTask retriableTask = - new RetriableTask(blockingExecutor, retryExecutorService, runnable); - - retriableTask.run(); - - assertThrows(ExecutionException.class, () -> retriableTask.get()); - - // zero retries - Mockito.verify(blockingExecutor, Mockito.times(0)).submit(retriableTask); - } -}