-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Removes orphaned topics from transient queries #6714
Changes from 7 commits
a4c72e5
ac7fa9e
00285af
0c572b5
c4598b9
a9a671b
92cb462
4c89d54
1f73ce5
484484a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
/* | ||
* Copyright 2020 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"); you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.engine; | ||
|
||
import static java.util.Objects.requireNonNull; | ||
|
||
import io.confluent.ksql.exception.KafkaResponseGetFailedException; | ||
import io.confluent.ksql.services.KafkaTopicClient; | ||
import io.confluent.ksql.services.ServiceContext; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class OrphanedTransientQueryCleaner { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(OrphanedTransientQueryCleaner.class); | ||
|
||
private final QueryCleanupService cleanupService; | ||
|
||
public OrphanedTransientQueryCleaner(final QueryCleanupService cleanupService) { | ||
this.cleanupService = requireNonNull(cleanupService); | ||
} | ||
|
||
public void cleanupOrphanedInternalTopics( | ||
final ServiceContext serviceContext, | ||
final Set<String> queryApplicationIds | ||
) { | ||
final KafkaTopicClient topicClient = serviceContext.getTopicClient(); | ||
final Set<String> topicNames; | ||
try { | ||
topicNames = topicClient.listTopicNames(); | ||
} catch (KafkaResponseGetFailedException e) { | ||
LOG.error("Couldn't fetch topic names", e); | ||
return; | ||
} | ||
// Find any transient query topics | ||
final Set<String> orphanedQueryApplicationIds = topicNames.stream() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe I'm missing something, but shouldn't this filter out any that are currently running? otherwise won't we delete topics for queries that are running? EDIT: I noticed that what's passed in is only ever the orphaned queryIDs. Can we javadoc this method to explain that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I think we're on the same page now. This is only ever run on startup and so by definition, the query application ids, if found as topic prefixes, would be orphaned. Of course, that list is all transient queries run, not technically just the orphaned ones. I'll improve the documentation. |
||
.map(topicName -> queryApplicationIds.stream().filter(topicName::startsWith).findFirst()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we be using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So far as I can tell, nothing is very different in the cloud beyond setting some configs differently for making the query application id. After all, so far as I can tell it's streams that is creating these topics and they just append to the query application id. I verified this manually with the transient topic names, so I'm pretty confident startsWith should work. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ack |
||
.filter(Optional::isPresent) | ||
.map(Optional::get) | ||
.collect(Collectors.toSet()); | ||
for (final String queryApplicationId : orphanedQueryApplicationIds) { | ||
cleanupService.addCleanupTask( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since the file is only relevant on a new startup, does it need to be added to the cleanup service or can we just have it do this cleanup on startup? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We chatted about this offline. This takes advantage of the other cleanup routines done by |
||
new QueryCleanupService.QueryCleanupTask( | ||
serviceContext, | ||
queryApplicationId, | ||
true | ||
)); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
/* | ||
* Copyright 2020 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"); you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.engine; | ||
|
||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.is; | ||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.Mockito.never; | ||
import static org.mockito.Mockito.times; | ||
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.when; | ||
|
||
import com.google.common.collect.ImmutableSet; | ||
import io.confluent.ksql.engine.QueryCleanupService.QueryCleanupTask; | ||
import io.confluent.ksql.exception.KafkaResponseGetFailedException; | ||
import io.confluent.ksql.services.KafkaTopicClient; | ||
import io.confluent.ksql.services.ServiceContext; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
import org.junit.runner.RunWith; | ||
import org.mockito.ArgumentCaptor; | ||
import org.mockito.Captor; | ||
import org.mockito.Mock; | ||
import org.mockito.junit.MockitoJUnitRunner; | ||
|
||
@RunWith(MockitoJUnitRunner.class) | ||
public class OrphanedTransientQueryCleanerTest { | ||
private static final String TOPIC1 | ||
= "_confluent-ksql-default_transient_932097300573686369_1606940079718" | ||
+ "-Aggregate-GroupBy-repartition"; | ||
private static final String TOPIC2 | ||
= "_confluent-ksql-default_transient_932097300573686369_1606940079718" | ||
+ "-Aggregate-Aggregate-Materialize-changelog"; | ||
private static final String TOPIC3 | ||
= "_confluent-ksql-default_transient_123497300573686369_1606940012345" | ||
+ "-Aggregate-Aggregate-Materialize-changelog"; | ||
|
||
private static final String BAD_TOPIC_NAME | ||
= "_confluent-ksql-default_node0_transient_bad"; | ||
|
||
private static final String APP_ID_1 | ||
= "_confluent-ksql-default_transient_932097300573686369_1606940079718"; | ||
private static final String APP_ID_2 | ||
= "_confluent-ksql-default_transient_123497300573686369_1606940012345"; | ||
|
||
@Mock | ||
private QueryCleanupService queryCleanupService; | ||
@Mock | ||
private ServiceContext serviceContext; | ||
@Mock | ||
private KafkaTopicClient topicClient; | ||
@Captor | ||
private ArgumentCaptor<QueryCleanupTask> taskCaptor; | ||
|
||
private OrphanedTransientQueryCleaner cleaner; | ||
|
||
@Before | ||
public void setUp() { | ||
when(serviceContext.getTopicClient()).thenReturn(topicClient); | ||
cleaner = new OrphanedTransientQueryCleaner(queryCleanupService); | ||
} | ||
|
||
@Test | ||
public void shouldCleanup_allApplicationIds() { | ||
when(topicClient.listTopicNames()).thenReturn(ImmutableSet.of(TOPIC1, TOPIC2, TOPIC3)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can we follow given/when/then convention? (it's basically already there but without the comments) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
cleaner.cleanupOrphanedInternalTopics(serviceContext, ImmutableSet.of(APP_ID_1, APP_ID_2)); | ||
|
||
verify(queryCleanupService, times(2)).addCleanupTask(taskCaptor.capture()); | ||
assertThat(taskCaptor.getAllValues().get(0).getAppId(), is(APP_ID_1)); | ||
assertThat(taskCaptor.getAllValues().get(1).getAppId(), is(APP_ID_2)); | ||
} | ||
|
||
@Test | ||
public void shouldCleanup_someApplicationIds() { | ||
when(topicClient.listTopicNames()).thenReturn(ImmutableSet.of(TOPIC1, TOPIC2)); | ||
cleaner.cleanupOrphanedInternalTopics(serviceContext, ImmutableSet.of(APP_ID_1, APP_ID_2)); | ||
|
||
verify(queryCleanupService, times(1)).addCleanupTask(taskCaptor.capture()); | ||
assertThat(taskCaptor.getAllValues().get(0).getAppId(), is(APP_ID_1)); | ||
} | ||
|
||
@Test | ||
public void skipNonMatchingTopics() { | ||
when(topicClient.listTopicNames()).thenReturn(ImmutableSet.of(TOPIC1, TOPIC2, TOPIC3)); | ||
cleaner.cleanupOrphanedInternalTopics(serviceContext, ImmutableSet.of(APP_ID_2)); | ||
|
||
verify(queryCleanupService, times(1)).addCleanupTask(taskCaptor.capture()); | ||
assertThat(taskCaptor.getAllValues().get(0).getAppId(), is(APP_ID_2)); | ||
} | ||
|
||
@Test | ||
public void shouldSkip_exception() { | ||
when(topicClient.listTopicNames()) | ||
.thenThrow(new KafkaResponseGetFailedException("error!", new Exception())); | ||
cleaner.cleanupOrphanedInternalTopics(serviceContext, ImmutableSet.of()); | ||
|
||
verify(queryCleanupService, never()).addCleanupTask(any()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if I recall, this is actually a somewhat expensive metadata operation (though we may want to confirm with Kafka experts) if we'll be calling it regularly. Should we flip the semantics (i.e. check whether a topic corresponding to an orphaned query exists) instead of the other way around?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory, it should process just one set of commands at a time during startup (it shouldn't be possible for there to be more than one unprocessed file). When it does that, it should call this just once. So it does this list call on node startup and uses that list of topics to compare with all of the application query ids in the commands file. It would seem to be more expensive to say for every queryApplicationId, check the existence of a given topic. Not only is that more rpcs, it also might check plenty of completed transient queries the cleaned up with no issue. I considered adding a "done cleaning" entry to the commands so I could find the set of only orphaned queries, but that seems potentially error prone as well and requires writing another entry. This seemed a little simpler.
Tell me if you'd prefer the "done cleaning" approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree the existing PR approach is better. At first, I didn't realize it happens just once on startup but since that's the case this is easier and less error-prone.