diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/Retry.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/Retry.java index 37ee83b10563..d2023eb7c0bc 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/Retry.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/Retry.java @@ -16,9 +16,12 @@ package io.confluent.ksql.integration; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.junit.rules.TestRule; import org.junit.runner.Description; import org.junit.runners.model.Statement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The {@code Retry} rule allows you to retry a test that @@ -74,6 +77,8 @@ */ public class Retry implements TestRule { + private static final Logger LOG = LoggerFactory.getLogger(Retry.class); + public static Retry none() { return new Retry(); } @@ -123,9 +128,11 @@ public void evaluate() throws Throwable { return; } catch (final Throwable e) { retries++; - if (!(exception.isInstance(e)) || retries > maxRetries) { + if (!(exception.isInstance(e) || exception.isInstance(ExceptionUtils.getRootCause(e))) + || retries > maxRetries) { throw e; } + LOG.warn("Retrying test after {} {} due to: {}", delay, unit, e.getMessage()); unit.sleep(delay); } } diff --git a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestQueryTranslationTest.java b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestQueryTranslationTest.java index 331761093c92..c87843c166a0 100644 --- a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestQueryTranslationTest.java +++ b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestQueryTranslationTest.java @@ -36,9 +36,11 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import kafka.zookeeper.ZooKeeperClientException; +import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.streams.StreamsConfig; import org.junit.After; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; import org.junit.rules.RuleChain; import org.junit.runner.RunWith; @@ -78,6 +80,12 @@ public class RestQueryTranslationTest { .around(TEST_HARNESS) .around(REST_APP); + // there's a race condition with Kafka and deleting topics since topic deletion is asynchronous. + // if we fail due to TopicExistsException, just retry since it is likely that the topic was just + // slow to be removed + @Rule + public final Retry RETRY = Retry.of(5, TopicExistsException.class, 5, TimeUnit.SECONDS); + @Parameterized.Parameters(name = "{0}") public static Collection data() { return JsonTestLoader.of(TEST_DIR, RqttTestFile.class)