Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Removes orphaned topics from transient queries #6714

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class KsqlEngine implements KsqlExecutionContext, Closeable {
private final String serviceId;
private final EngineContext primaryContext;
private final QueryCleanupService cleanupService;
private final OrphanedTransientQueryCleaner orphanedTransientQueryCleaner;

public KsqlEngine(
final ServiceContext serviceContext,
Expand Down Expand Up @@ -101,6 +102,7 @@ public KsqlEngine(
final QueryIdGenerator queryIdGenerator
) {
this.cleanupService = new QueryCleanupService();
this.orphanedTransientQueryCleaner = new OrphanedTransientQueryCleaner(this.cleanupService);
this.primaryContext = EngineContext.create(
serviceContext,
processingLogContext,
Expand Down Expand Up @@ -312,6 +314,14 @@ public void close() {
close(false);
}

public void cleanupOrphanedInternalTopics(
final ServiceContext serviceContext,
final Set<String> queryApplicationIds
) {
orphanedTransientQueryCleaner
.cleanupOrphanedInternalTopics(serviceContext, queryApplicationIds);
}

/**
* Determines if a statement is executable by the engine.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.engine;

import static java.util.Objects.requireNonNull;

import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OrphanedTransientQueryCleaner {

private static final Logger LOG = LoggerFactory.getLogger(OrphanedTransientQueryCleaner.class);

private final QueryCleanupService cleanupService;

public OrphanedTransientQueryCleaner(final QueryCleanupService cleanupService) {
this.cleanupService = requireNonNull(cleanupService);
}

public void cleanupOrphanedInternalTopics(
final ServiceContext serviceContext,
final Set<String> queryApplicationIds
) {
final KafkaTopicClient topicClient = serviceContext.getTopicClient();
final Set<String> topicNames;
try {
topicNames = topicClient.listTopicNames();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I recall, this is actually a somewhat expensive metadata operation (though we may want to confirm with Kafka experts) if we'll be calling it regularly. Should we flip the semantics (i.e. check whether a topic corresponding to an orphaned query exists) instead of the other way around?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, it should process just one set of commands at a time during startup (it shouldn't be possible for there to be more than one unprocessed file). When it does that, it should call this just once. So it does this list call on node startup and uses that list of topics to compare with all of the application query ids in the commands file. It would seem to be more expensive to say for every queryApplicationId, check the existence of a given topic. Not only is that more rpcs, it also might check plenty of completed transient queries the cleaned up with no issue. I considered adding a "done cleaning" entry to the commands so I could find the set of only orphaned queries, but that seems potentially error prone as well and requires writing another entry. This seemed a little simpler.

Tell me if you'd prefer the "done cleaning" approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree the existing PR approach is better. At first, I didn't realize it happens just once on startup but since that's the case this is easier and less error-prone.

} catch (KafkaResponseGetFailedException e) {
LOG.error("Couldn't fetch topic names", e);
return;
}
// Find any transient query topics
final Set<String> orphanedQueryApplicationIds = topicNames.stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but shouldn't this filter out any that are currently running? otherwise won't we delete topics for queries that are running?

EDIT: I noticed that what's passed in is only ever the orphaned queryIDs. Can we javadoc this method to explain that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we're on the same page now. This is only ever run on startup and so by definition, the query application ids, if found as topic prefixes, would be orphaned. Of course, that list is all transient queries run, not technically just the orphaned ones.

I'll improve the documentation.

.map(topicName -> queryApplicationIds.stream().filter(topicName::startsWith).findFirst())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be using startsWith? I thought that in cloud we prefix all the topics with things other than just the transient application id. contains might be a safer bet either way

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far as I can tell, nothing is very different in the cloud beyond setting some configs differently for making the query application id. After all, so far as I can tell it's streams that is creating these topics and they just append to the query application id.

I verified this manually with the transient topic names, so I'm pretty confident startsWith should work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toSet());
for (final String queryApplicationId : orphanedQueryApplicationIds) {
cleanupService.addCleanupTask(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the file is only relevant on a new startup, does it need to be added to the cleanup service or can we just have it do this cleanup on startup?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We chatted about this offline. This takes advantage of the other cleanup routines done by QueryCleanupService, so needs access to it. It does this cleanup once on startup

new QueryCleanupService.QueryCleanupTask(
serviceContext,
queryApplicationId,
true
));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ static class QueryCleanupTask implements Runnable {
this.isTransient = isTransient;
}

public String getAppId() {
return appId;
}

@Override
public void run() {
tryRun(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.engine;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.engine.QueryCleanupService.QueryCleanupTask;
import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class OrphanedTransientQueryCleanerTest {
private static final String TOPIC1
= "_confluent-ksql-default_transient_932097300573686369_1606940079718"
+ "-Aggregate-GroupBy-repartition";
private static final String TOPIC2
= "_confluent-ksql-default_transient_932097300573686369_1606940079718"
+ "-Aggregate-Aggregate-Materialize-changelog";
private static final String TOPIC3
= "_confluent-ksql-default_transient_123497300573686369_1606940012345"
+ "-Aggregate-Aggregate-Materialize-changelog";

private static final String BAD_TOPIC_NAME
= "_confluent-ksql-default_node0_transient_bad";

private static final String APP_ID_1
= "_confluent-ksql-default_transient_932097300573686369_1606940079718";
private static final String APP_ID_2
= "_confluent-ksql-default_transient_123497300573686369_1606940012345";

@Mock
private QueryCleanupService queryCleanupService;
@Mock
private ServiceContext serviceContext;
@Mock
private KafkaTopicClient topicClient;
@Captor
private ArgumentCaptor<QueryCleanupTask> taskCaptor;

private OrphanedTransientQueryCleaner cleaner;

@Before
public void setUp() {
when(serviceContext.getTopicClient()).thenReturn(topicClient);
cleaner = new OrphanedTransientQueryCleaner(queryCleanupService);
}

@Test
public void shouldCleanup_allApplicationIds() {
when(topicClient.listTopicNames()).thenReturn(ImmutableSet.of(TOPIC1, TOPIC2, TOPIC3));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we follow given/when/then convention? (it's basically already there but without the comments)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

cleaner.cleanupOrphanedInternalTopics(serviceContext, ImmutableSet.of(APP_ID_1, APP_ID_2));

verify(queryCleanupService, times(2)).addCleanupTask(taskCaptor.capture());
assertThat(taskCaptor.getAllValues().get(0).getAppId(), is(APP_ID_1));
assertThat(taskCaptor.getAllValues().get(1).getAppId(), is(APP_ID_2));
}

@Test
public void shouldCleanup_someApplicationIds() {
when(topicClient.listTopicNames()).thenReturn(ImmutableSet.of(TOPIC1, TOPIC2));
cleaner.cleanupOrphanedInternalTopics(serviceContext, ImmutableSet.of(APP_ID_1, APP_ID_2));

verify(queryCleanupService, times(1)).addCleanupTask(taskCaptor.capture());
assertThat(taskCaptor.getAllValues().get(0).getAppId(), is(APP_ID_1));
}

@Test
public void skipNonMatchingTopics() {
when(topicClient.listTopicNames()).thenReturn(ImmutableSet.of(TOPIC1, TOPIC2, TOPIC3));
cleaner.cleanupOrphanedInternalTopics(serviceContext, ImmutableSet.of(APP_ID_2));

verify(queryCleanupService, times(1)).addCleanupTask(taskCaptor.capture());
assertThat(taskCaptor.getAllValues().get(0).getAppId(), is(APP_ID_2));
}

@Test
public void shouldSkip_exception() {
when(topicClient.listTopicNames())
.thenThrow(new KafkaResponseGetFailedException("error!", new Exception()));
cleaner.cleanupOrphanedInternalTopics(serviceContext, ImmutableSet.of());

verify(queryCleanupService, never()).addCleanupTask(any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.confluent.ksql.physical.pull.PullQueryResult;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.rest.entity.TableRows;
import io.confluent.ksql.rest.server.LocalCommands;
import io.confluent.ksql.rest.server.resources.streaming.PullQueryConfigRoutingOptions;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.utils.FormatOptions;
Expand Down Expand Up @@ -61,21 +62,24 @@ public class QueryEndpoint {
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private final RateLimiter rateLimiter;
private final HARouting routing;
private final Optional<LocalCommands> localCommands;

public QueryEndpoint(
final KsqlEngine ksqlEngine,
final KsqlConfig ksqlConfig,
final RoutingFilterFactory routingFilterFactory,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final RateLimiter rateLimiter,
final HARouting routing
final HARouting routing,
final Optional<LocalCommands> localCommands
) {
this.ksqlEngine = ksqlEngine;
this.ksqlConfig = ksqlConfig;
this.routingFilterFactory = routingFilterFactory;
this.pullQueryMetrics = pullQueryMetrics;
this.rateLimiter = rateLimiter;
this.routing = routing;
this.localCommands = localCommands;
}

public QueryPublisher createQueryPublisher(
Expand Down Expand Up @@ -110,6 +114,8 @@ private QueryPublisher createPushQueryPublisher(
final TransientQueryMetadata queryMetadata = ksqlEngine
.executeQuery(serviceContext, statement, true);

localCommands.ifPresent(lc -> lc.write(queryMetadata));

publisher.setQueryHandle(new KsqlQueryHandle(queryMetadata));

return publisher;
Expand Down
Loading