-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: drop succeeds even if missing topic or schema #3131
fix: drop succeeds even if missing topic or schema #3131
Conversation
@confluentinc It looks like @stevenpyzhang just signed our Contributor License Agreement. 👍 Always at your service, clabot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for picking up this bug @stevenpyzhang and the test coverage!
As an implementation detail, I'm not a fan of using exceptions as control flow for normal operations (google this if you're interested, there's lots of literature around it). Since this method returns void
, my preference would be a refactor to return some type of status object that represents whether or not there was a failure.
If these two helper functions were to return a status object, I think the cyclomatic complexity issue would pop up again. I would still need to add in if statements to check the contents of the status objects and respond accordingly. Unless I'm missing something, I'm not sure how to avoid adding any more if statements to inject() with the status objects refactor approach. |
Sorry if I was unclear, I meant that Also we could refactor various other things in that method to reduce the cyclomatic complexity (e.g. moving the checks on whether the source is valid and the first two if conditions to other methods) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in this case it would be ok to just have deleteTopics
check internally for an UnknownTopicOrPartitionException, and consider that a successful delete (e.g. just return). There's no case where the caller cares whether the topic was there before the call to delete or not. If we really want to consider that case an error, I don't see why throwing an exception is an issue.
ksql-engine/src/main/java/io/confluent/ksql/exception/KafkaDeleteTopicsException.java
Outdated
Show resolved
Hide resolved
@@ -99,19 +103,17 @@ public TopicDeleteInjector( | |||
() -> topicClient.deleteTopics(ImmutableList.of(source.getKafkaTopicName())), | |||
ExecutorUtil.RetryBehaviour.ALWAYS); | |||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just catch KafkaDeleteTopicException
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleteTopics() can also throw TopicDeletionDisabledException
ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java
Outdated
Show resolved
Hide resolved
ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
private void checkSchemaError(final Exception error, final String sourceName) { | ||
if (!error.getMessage().contains("Subject not found")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What kind of exception implementation do you actually get thrown here? There should be a better way to check for a missing subject.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://docs.confluent.io/current/schema-registry/develop/api.html
Some other exceptions besides RestClient can occur such as when the schema registry is down, so I first check to make sure the exception is a RestClient one before checking the error code that corresponds to a missing schema.
ksql-engine/src/test/java/io/confluent/ksql/topic/TopicDeleteInjectorTest.java
Outdated
Show resolved
Hide resolved
final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient); | ||
|
||
// When: | ||
kafkaTopicClient.deleteTopics(Collections.singletonList(topicName1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't this throw?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had originally based this off a similar test in this folder, but then I realized that the other test was not in a very good state already. I fixed both of them now so they actually throw the expected errors.
fcd9d3d
to
2219773
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @stevenpyzhang - a few comments inline
import java.util.List; | ||
import javafx.util.Pair; | ||
|
||
public class KafkaDeleteTopicsException extends KafkaTopicClientException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused. It looks like we now throw this Exception from KafkaTopicCLientImpl
but we don't check for it anywhere (other than unit tests). Do we still need it? If all you want to do is have the ability to throw multiple exceptions, use Exception#addSuppressed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the thought was that there could be use for this in the future if when we delete multiple topics at once and there are multiple errors, the details for each topic's deletion error would be available. For example, in ClusterTerminator.java, the error message given in that files deleteTopics() function is very vague and only lists the topic names.
throw new KsqlException("Exception while deleting topics: " + StringUtils.join(topicsToBeDeleted, ", "));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a perfect use case for Exception#addSuppressed
, but that's personal style so feel free to choose what you like more.
ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java
Show resolved
Hide resolved
ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java
Outdated
Show resolved
Hide resolved
ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - thanks @stevenpyzhang
2219773
to
ff1628c
Compare
ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java
Outdated
Show resolved
Hide resolved
ff1628c
to
d836961
Compare
Description
#3048
Added new exception class that can store corresponding error messages that can occur while trying to delete a topic.
Wrote some helper functions in TopicDeleteInjector in order to reduce cyclomatic complexity.
Testing done
Unit tests
mvn clean install
Testing using KSQL CLI
Deleting subjects with schema registry and using kafka-topics to delete topics.
Reviewer checklist