Skip to content

Commit

Permalink
Writes transient queries to disk so they can be cleaned up
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanConfluent committed Dec 8, 2020
1 parent d256fec commit 79fdf06
Show file tree
Hide file tree
Showing 21 changed files with 736 additions and 167 deletions.
14 changes: 0 additions & 14 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -366,13 +366,6 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_VARIABLE_SUBSTITUTION_ENABLE_DOC
= "Enable variable substitution on SQL statements.";

public static final String KSQL_NODE_ID_CONFIG = "ksql.node.id";
public static final String KSQL_NODE_ID_DEFAULT = "";
public static final String KSQL_NODE_ID_DOC
= "An identifier for this ksql application node."
+ "Setting this property makes it easier to automatically clean up resources."
+ "Concurrently running applications should all have unique ids.";

private enum ConfigGeneration {
LEGACY,
CURRENT
Expand Down Expand Up @@ -849,13 +842,6 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_VARIABLE_SUBSTITUTION_ENABLE_DOC
)
.define(
KSQL_NODE_ID_CONFIG,
Type.STRING,
KSQL_NODE_ID_DEFAULT,
Importance.LOW,
KSQL_NODE_ID_DOC
)
.withClientSslSupport();

for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@

package io.confluent.ksql.util;

import com.google.common.base.Strings;
import io.confluent.ksql.query.QueryId;
import java.util.Optional;

/**
* Util to build query application ids.
Expand All @@ -27,44 +25,22 @@ public final class QueryApplicationId {
private QueryApplicationId() {
}

/**
* Builds the portion of the id unique to this service, node, and query type, but does not include
* an identifiers for an individual query.
* @param config The ksql configuration
* @param persistent If the query is persistent or not
* @return
*/
public static String buildPrefix(
public static String build(
final KsqlConfig config,
final boolean persistent
final boolean persistent,
final QueryId queryId
) {
final String serviceId = config.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);

// If node id is set and this is a transient query, include it as part of the application id
final String nodeId = Optional.ofNullable(
Strings.emptyToNull(config.getString(KsqlConfig.KSQL_NODE_ID_CONFIG)))
.filter(id -> !persistent)
.map(id -> id.endsWith("_") ? id : id + "_")
.orElse("");

final String configName = persistent
? KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG
: KsqlConfig.KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG;

final String queryPrefix = config.getString(configName);

return ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX
final String queryAppId = ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX
+ serviceId
+ nodeId
+ queryPrefix;
}

public static String build(
final KsqlConfig config,
final boolean persistent,
final QueryId queryId
) {
final String queryAppId = buildPrefix(config, persistent)
+ queryPrefix
+ queryId;
if (persistent) {
return queryAppId;
Expand Down
22 changes: 0 additions & 22 deletions ksqldb-common/src/test/java/QueryApplicationIdTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,32 +41,10 @@ public void shouldBuildPersistentQueryApplicationId() {
assertEquals("_confluent-ksql-s1query_q1", queryAppId);
}

@Test
public void shouldBuildPersistentQueryApplicationId_ignoreNodeId() {
final Properties props = new Properties();
props.put(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "s1");
props.put(KsqlConfig.KSQL_NODE_ID_CONFIG, "node0");
config = new KsqlConfig(props);
final String queryAppId =
QueryApplicationId.build(config, true, new QueryId("q1"));
assertEquals("_confluent-ksql-s1query_q1", queryAppId);
}

@Test
public void shouldBuildTransientQueryApplicationId() {
final String queryAppId =
QueryApplicationId.build(config, false, new QueryId("q1"));
assertTrue(queryAppId.startsWith("_confluent-ksql-s1transient_q1_"));
}

@Test
public void shouldBuildTransientQueryApplicationId_nodeId() {
final Properties props = new Properties();
props.put(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "s1");
props.put(KsqlConfig.KSQL_NODE_ID_CONFIG, "node0");
config = new KsqlConfig(props);
final String queryAppId =
QueryApplicationId.build(config, false, new QueryId("q1"));
assertTrue(queryAppId.startsWith("_confluent-ksql-s1node0_transient_q1_"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.function.FunctionRegistry;
Expand Down Expand Up @@ -314,9 +313,9 @@ public void close() {

public void cleanupOrphanedInternalTopics(
final ServiceContext serviceContext,
final SessionConfig config
final Set<String> queryApplicationIds
) {
orphanedTransientQueryCleaner.cleanupOrphanedInternalTopics(serviceContext, config);
orphanedTransientQueryCleaner.cleanupOrphanedInternalTopics(serviceContext, queryApplicationIds);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,9 @@
import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.QueryApplicationId;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Matcher;
Expand All @@ -38,8 +33,6 @@ public class OrphanedTransientQueryCleaner {

private static final Logger LOG = LoggerFactory.getLogger(OrphanedTransientQueryCleaner.class);

private static Pattern TRANSIENT_QUERY_APP_ID = Pattern.compile("(.*\\d+_\\d+)(-[a-zA-Z]+)+");

private final QueryCleanupService cleanupService;

public OrphanedTransientQueryCleaner(final QueryCleanupService cleanupService) {
Expand All @@ -48,14 +41,8 @@ public OrphanedTransientQueryCleaner(final QueryCleanupService cleanupService) {

public void cleanupOrphanedInternalTopics(
final ServiceContext serviceContext,
final SessionConfig config
final Set<String> queryApplicationIds
) {
final KsqlConfig ksqlConfig = config.getConfig(false);
final String nodeId = ksqlConfig.getString(KsqlConfig.KSQL_NODE_ID_CONFIG);
if (Strings.isNullOrEmpty(nodeId)) {
// If no node id is set, there are no internal topics to clean up
return;
}
final KafkaTopicClient topicClient = serviceContext.getTopicClient();
final Set<String> topicNames;
try {
Expand All @@ -65,23 +52,12 @@ public void cleanupOrphanedInternalTopics(
return;
}
// Find any transient query topics
final String queryApplicationIdPrefix = QueryApplicationId.buildPrefix(ksqlConfig, false);
final List<String> orphanedTopics = topicNames.stream()
.filter(topicName -> topicName.startsWith(queryApplicationIdPrefix))
.collect(Collectors.toList());
final Set<String> queryApplicationIds = orphanedTopics.stream()
.map(topicName -> {
final Optional<String> queryApplicationId
= parseTransientQueryApplicationIdFromTopicName(topicName);
if (!queryApplicationId.isPresent()) {
LOG.error("Couldn't parse application id from topic " + topicName);
}
return queryApplicationId;
})
final Set<String> orphanedQueryApplicationIds = topicNames.stream()
.map(topicName -> queryApplicationIds.stream().filter(topicName::startsWith).findFirst())
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toSet());
for (final String queryApplicationId : queryApplicationIds) {
for (final String queryApplicationId : orphanedQueryApplicationIds) {
cleanupService.addCleanupTask(
new QueryCleanupService.QueryCleanupTask(
serviceContext,
Expand All @@ -90,13 +66,4 @@ public void cleanupOrphanedInternalTopics(
));
}
}

@VisibleForTesting
static Optional<String> parseTransientQueryApplicationIdFromTopicName(final String topicName) {
final Matcher matcher = TRANSIENT_QUERY_APP_ID.matcher(topicName);
if (matcher.matches()) {
return Optional.of(matcher.group(1));
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package io.confluent.ksql.engine;

import static io.confluent.ksql.engine.OrphanedTransientQueryCleaner.parseTransientQueryApplicationIdFromTopicName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -24,14 +23,11 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.QueryCleanupService.QueryCleanupTask;
import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -43,22 +39,22 @@
@RunWith(MockitoJUnitRunner.class)
public class OrphanedTransientQueryCleanerTest {
private static final String TOPIC1
= "_confluent-ksql-default_node0_transient_932097300573686369_1606940079718"
= "_confluent-ksql-default_transient_932097300573686369_1606940079718"
+ "-Aggregate-GroupBy-repartition";
private static final String TOPIC2
= "_confluent-ksql-default_node0_transient_932097300573686369_1606940079718"
= "_confluent-ksql-default_transient_932097300573686369_1606940079718"
+ "-Aggregate-Aggregate-Materialize-changelog";
private static final String TOPIC3
= "_confluent-ksql-default_node0_transient_123497300573686369_1606940012345"
= "_confluent-ksql-default_transient_123497300573686369_1606940012345"
+ "-Aggregate-Aggregate-Materialize-changelog";

private static final String BAD_TOPIC_NAME
= "_confluent-ksql-default_node0_transient_bad";

private static final String APP_ID_1
= "_confluent-ksql-default_node0_transient_932097300573686369_1606940079718";
= "_confluent-ksql-default_transient_932097300573686369_1606940079718";
private static final String APP_ID_2
= "_confluent-ksql-default_node0_transient_123497300573686369_1606940012345";
= "_confluent-ksql-default_transient_123497300573686369_1606940012345";

@Mock
private QueryCleanupService queryCleanupService;
Expand All @@ -78,52 +74,38 @@ public void setUp() {
}

@Test
public void shouldParseAppId() {
// Then:
assertThat(parseTransientQueryApplicationIdFromTopicName(TOPIC1).get(), is(APP_ID_1));
assertThat(parseTransientQueryApplicationIdFromTopicName(TOPIC2).get(), is(APP_ID_1));
assertThat(parseTransientQueryApplicationIdFromTopicName(TOPIC3).get(), is(APP_ID_2));
}

@Test
public void shouldCleanup() {
public void shouldCleanup_allApplicationIds() {
when(topicClient.listTopicNames()).thenReturn(ImmutableSet.of(TOPIC1, TOPIC2, TOPIC3));
cleaner.cleanupOrphanedInternalTopics(serviceContext, SessionConfig.of(
new KsqlConfig(ImmutableMap.of(KsqlConfig.KSQL_NODE_ID_CONFIG, "node0")),
ImmutableMap.of()));
cleaner.cleanupOrphanedInternalTopics(serviceContext, ImmutableSet.of(APP_ID_1, APP_ID_2));

verify(queryCleanupService, times(2)).addCleanupTask(taskCaptor.capture());
assertThat(taskCaptor.getAllValues().get(0).getAppId(), is(APP_ID_1));
assertThat(taskCaptor.getAllValues().get(1).getAppId(), is(APP_ID_2));
}

@Test
public void shouldSkip_noNodeId() {
cleaner.cleanupOrphanedInternalTopics(serviceContext, SessionConfig.of(
new KsqlConfig(ImmutableMap.of()),
ImmutableMap.of()));
public void shouldCleanup_someApplicationIds() {
when(topicClient.listTopicNames()).thenReturn(ImmutableSet.of(TOPIC1, TOPIC2));
cleaner.cleanupOrphanedInternalTopics(serviceContext, ImmutableSet.of(APP_ID_1, APP_ID_2));

verify(queryCleanupService, never()).addCleanupTask(any());
verify(queryCleanupService, times(1)).addCleanupTask(taskCaptor.capture());
assertThat(taskCaptor.getAllValues().get(0).getAppId(), is(APP_ID_1));
}

@Test
public void shouldSkip_badTopicName() {
when(topicClient.listTopicNames()).thenReturn(ImmutableSet.of(TOPIC1, BAD_TOPIC_NAME));
cleaner.cleanupOrphanedInternalTopics(serviceContext, SessionConfig.of(
new KsqlConfig(ImmutableMap.of(KsqlConfig.KSQL_NODE_ID_CONFIG, "node0")),
ImmutableMap.of()));
public void skipNonMatchingTopics() {
when(topicClient.listTopicNames()).thenReturn(ImmutableSet.of(TOPIC1, TOPIC2, TOPIC3));
cleaner.cleanupOrphanedInternalTopics(serviceContext, ImmutableSet.of(APP_ID_2));

verify(queryCleanupService, times(1)).addCleanupTask(taskCaptor.capture());
assertThat(taskCaptor.getAllValues().get(0).getAppId(), is(APP_ID_1));
assertThat(taskCaptor.getAllValues().get(0).getAppId(), is(APP_ID_2));
}

@Test
public void shouldSkip_exception() {
when(topicClient.listTopicNames())
.thenThrow(new KafkaResponseGetFailedException("error!", new Exception()));
cleaner.cleanupOrphanedInternalTopics(serviceContext, SessionConfig.of(
new KsqlConfig(ImmutableMap.of(KsqlConfig.KSQL_NODE_ID_CONFIG, "node0")),
ImmutableMap.of()));
cleaner.cleanupOrphanedInternalTopics(serviceContext, ImmutableSet.of());

verify(queryCleanupService, never()).addCleanupTask(any());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.confluent.ksql.physical.pull.PullQueryResult;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.rest.entity.TableRows;
import io.confluent.ksql.rest.server.LocalCommands;
import io.confluent.ksql.rest.server.resources.streaming.PullQueryConfigRoutingOptions;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.utils.FormatOptions;
Expand All @@ -59,19 +60,22 @@ public class QueryEndpoint {
private final RoutingFilterFactory routingFilterFactory;
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private final RateLimiter rateLimiter;
private final Optional<LocalCommands> localCommands;

public QueryEndpoint(
final KsqlEngine ksqlEngine,
final KsqlConfig ksqlConfig,
final RoutingFilterFactory routingFilterFactory,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final RateLimiter rateLimiter
final RateLimiter rateLimiter,
final Optional<LocalCommands> localCommands
) {
this.ksqlEngine = ksqlEngine;
this.ksqlConfig = ksqlConfig;
this.routingFilterFactory = routingFilterFactory;
this.pullQueryMetrics = pullQueryMetrics;
this.rateLimiter = rateLimiter;
this.localCommands = localCommands;
}

public QueryPublisher createQueryPublisher(
Expand Down Expand Up @@ -106,6 +110,8 @@ private QueryPublisher createPushQueryPublisher(
final TransientQueryMetadata queryMetadata = ksqlEngine
.executeQuery(serviceContext, statement, true);

localCommands.ifPresent(lc -> lc.write(queryMetadata));

publisher.setQueryHandle(new KsqlQueryHandle(queryMetadata));

return publisher;
Expand Down
Loading

0 comments on commit 79fdf06

Please sign in to comment.