From 4284b8c41e3b6a3295760fdd024d36866b20e179 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Wed, 28 Aug 2019 10:57:58 -0500 Subject: [PATCH] fix: error message with DROP DELETE TOPIC is invalid (#3279) --- .../ksql/topic/TopicDeleteInjector.java | 2 +- .../ksql/topic/TopicDeleteInjectorTest.java | 26 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java b/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java index 50ef894b9799..a275b92ac2b2 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java @@ -103,7 +103,7 @@ public ConfiguredStatement inject( ExecutorUtil.RetryBehaviour.ALWAYS); } catch (Exception e) { throw new RuntimeException("Could not delete the corresponding kafka topic: " - + sourceName, e); + + source.getKafkaTopicName(), e); } try { diff --git a/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicDeleteInjectorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicDeleteInjectorTest.java index 11fe14df37b7..ba1a4686d01d 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicDeleteInjectorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicDeleteInjectorTest.java @@ -49,6 +49,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; + +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -219,6 +221,30 @@ public void shouldThrowExceptionIfOtherSourcesUsingTopic() { deleteInjector.inject(dropStatement); } + @Test + public void shouldThrowIfTopicDoesNotExist() { + // Given: + final String STREAM_1 = "stream1"; + final DataSource other1 = givenSource(STREAM_1, "topicName"); + when(metaStore.getSource(STREAM_1)).thenAnswer(inv -> other1); + when(other1.getKafkaTopicName()).thenReturn("topicName"); + final ConfiguredStatement dropStatement = givenStatement( + "DROP stream1 DELETE TOPIC;", + new DropStream(QualifiedName.of("stream1"), + true, + true) + ); + doThrow(RuntimeException.class).when(topicClient).deleteTopics(ImmutableList.of("topicName")); + + // Expect: + expectedException.expect(RuntimeException.class); + expectedException.expectMessage("" + + "Could not delete the corresponding kafka topic: topicName"); + + // When: + deleteInjector.inject(dropStatement); + } + @Test public void shouldNotThrowIfSchemaIsMissing() throws IOException, RestClientException { // Given: