From e000b419ac84d7ee25f47aae62758dce01fbe3de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Tue, 25 Aug 2020 11:48:07 -0500 Subject: [PATCH] fix: Use a SandboxedPersistentQueryMetadata to not interact with KafkStreams (#6066) --- .../confluent/ksql/engine/EngineContext.java | 3 +- .../ksql/util/PersistentQueryMetadata.java | 6 +- .../io/confluent/ksql/util/QueryMetadata.java | 4 +- .../SandboxedPersistentQueryMetadata.java | 54 ++++++ .../confluent/ksql/engine/KsqlEngineTest.java | 44 +++++ .../inference/SchemaRegisterInjectorTest.java | 1 - .../SandboxedPersistentQueryMetadataTest.java | 154 ++++++++++++++++++ .../ksql/rest/server/TestKsqlRestApp.java | 53 ++++-- 8 files changed, 293 insertions(+), 26 deletions(-) create mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadata.java create mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataTest.java diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineContext.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineContext.java index 1239488ba1b2..c07ce18db4d9 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineContext.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineContext.java @@ -42,6 +42,7 @@ import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; +import io.confluent.ksql.util.SandboxedPersistentQueryMetadata; import io.confluent.ksql.util.TransientQueryMetadata; import java.util.Collections; import java.util.List; @@ -115,7 +116,7 @@ EngineContext createSandbox(final ServiceContext serviceContext) { persistentQueries.forEach((queryId, query) -> sandBox.persistentQueries.put( query.getQueryId(), - query.copyWith(sandBox::closeQuery))); + SandboxedPersistentQueryMetadata.of(query, sandBox::closeQuery))); return sandBox; } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java index 1644bea07078..dd6ee72a3321 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java @@ -103,7 +103,7 @@ public PersistentQueryMetadata( .flatMap(builder -> builder.apply(getKafkaStreams())); } - private PersistentQueryMetadata( + protected PersistentQueryMetadata( final PersistentQueryMetadata other, final Consumer closeCallback ) { @@ -116,10 +116,6 @@ private PersistentQueryMetadata( this.materializationProviderBuilder = other.materializationProviderBuilder; } - public PersistentQueryMetadata copyWith(final Consumer closeCallback) { - return new PersistentQueryMetadata(this, closeCallback); - } - public DataSourceType getDataSourceType() { return sinkDataSource.getDataSourceType(); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java index ad5255530dff..558da91b27ca 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java @@ -58,7 +58,7 @@ public abstract class QueryMetadata { private final KafkaStreamsBuilder kafkaStreamsBuilder; private final Map streamsProperties; private final Map overriddenProperties; - private final Consumer closeCallback; + protected final Consumer closeCallback; private final Set sourceNames; private final LogicalSchema logicalSchema; private final Long closeTimeout; @@ -68,7 +68,7 @@ public abstract class QueryMetadata { private Optional queryStateListener = Optional.empty(); private boolean everStarted = false; - private boolean closed = false; + protected boolean closed = false; private UncaughtExceptionHandler uncaughtExceptionHandler = this::uncaughtHandler; private KafkaStreams kafkaStreams; private Consumer onStop = (ignored) -> { }; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadata.java new file mode 100644 index 000000000000..a7f9a6fe8e5a --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadata.java @@ -0,0 +1,54 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.util; + +import java.util.function.Consumer; + +/** + * Sandboxed {@link PersistentQueryMetadata} that prevents to modify the state of the internal + * {@link org.apache.kafka.streams.KafkaStreams}. + */ +public final class SandboxedPersistentQueryMetadata extends PersistentQueryMetadata { + public static SandboxedPersistentQueryMetadata of( + final PersistentQueryMetadata queryMetadata, + final Consumer closeCallback + ) { + return new SandboxedPersistentQueryMetadata(queryMetadata, closeCallback); + } + + private SandboxedPersistentQueryMetadata( + final PersistentQueryMetadata queryMetadata, + final Consumer closeCallback + ) { + super(queryMetadata, closeCallback); + } + + @Override + public void close() { + closed = true; + closeCallback.accept(this); + } + + @Override + public void start() { + // no-op + } + + @Override + protected void closeKafkaStreams() { + // no-op + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java index 61baedcebeab..3f34092103f2 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java @@ -1351,6 +1351,50 @@ public void shouldRegisterPersistentQueriesOnlyInSandbox() { is(Optional.empty())); } + @Test + public void shouldNotStartKafkaStreamsInSandbox() { + // Given: + givenSqlAlreadyExecuted("create table bar as select * from test2;"); + final QueryMetadata query = ksqlEngine.getPersistentQueries().get(0); + + // When: + sandbox.getPersistentQuery(query.getQueryId()).get().start(); + + // Then: + assertThat(ksqlEngine.getPersistentQuery(query.getQueryId()).get().getState(), + is(KafkaStreams.State.CREATED)); + } + + @Test + public void shouldNotStopKafkaStreamsInSandbox() { + // Given: + givenSqlAlreadyExecuted("create table bar as select * from test2;"); + final QueryMetadata query = ksqlEngine.getPersistentQueries().get(0); + ksqlEngine.getPersistentQuery(query.getQueryId()).get().start(); + + // When: + sandbox.getPersistentQuery(query.getQueryId()).get().stop(); + + // Then: + assertThat(ksqlEngine.getPersistentQuery(query.getQueryId()).get().getState(), + is(KafkaStreams.State.REBALANCING)); + } + + @Test + public void shouldNotCloseKafkaStreamsInSandbox() { + // Given: + givenSqlAlreadyExecuted("create table bar as select * from test2;"); + final QueryMetadata query = ksqlEngine.getPersistentQueries().get(0); + ksqlEngine.getPersistentQuery(query.getQueryId()).get().start(); + + // When: + sandbox.getPersistentQuery(query.getQueryId()).get().close(); + + // Then: + assertThat(ksqlEngine.getPersistentQuery(query.getQueryId()).get().getState(), + is(KafkaStreams.State.REBALANCING)); + } + @Test public void shouldExecuteDdlStatement() { // Given: diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjectorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjectorTest.java index 5184cb68a5bb..9ce4ed58dee1 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjectorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjectorTest.java @@ -192,7 +192,6 @@ public void shouldNotReplaceExistingSchemaForSchemaRegistryEnabledFormatCreateSo // Given: givenStatement("CREATE STREAM sink (f1 VARCHAR) WITH (kafka_topic='expectedName', value_format='AVRO', partitions=1);"); when(schemaRegistryClient.getAllSubjects()).thenReturn(ImmutableSet.of("expectedName-value")); - when(schemaRegistryClient.testCompatibility(eq("expectedName-value"), any(ParsedSchema.class))).thenReturn(true); // When: injector.inject(statement); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataTest.java new file mode 100644 index 000000000000..bd389b35389a --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataTest.java @@ -0,0 +1,154 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.util; + +import io.confluent.ksql.execution.plan.ExecutionStep; +import io.confluent.ksql.execution.streams.materialization.MaterializationProvider; +import io.confluent.ksql.metastore.model.DataSource; +import io.confluent.ksql.query.KafkaStreamsBuilder; +import io.confluent.ksql.query.MaterializationProviderBuilderFactory; +import io.confluent.ksql.query.QueryErrorClassifier; +import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.PhysicalSchema; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.Topology; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class SandboxedPersistentQueryMetadataTest { + private static final String SQL = "sql"; + private static final String EXECUTION_PLAN = "execution plan"; + private static final QueryId QUERY_ID = new QueryId("queryId"); + private static final String APPLICATION_ID = "applicationId"; + private static final long CLOSE_TIMEOUT = 10L; + + @Mock + private KafkaStreamsBuilder kafkaStreamsBuilder; + @Mock + private KafkaStreams kafkaStreams; + @Mock + private PhysicalSchema physicalSchema; + @Mock + private DataSource sinkDataSource; + @Mock + private MaterializationProviderBuilderFactory.MaterializationProviderBuilder + materializationProviderBuilder; + @Mock + private MaterializationProvider materializationProvider; + @Mock + private Topology topology; + @Mock + private QuerySchemas schemas; + @Mock + private Map props; + @Mock + private Map overrides; + @Mock + private Consumer closeCallback; + @Mock + private QueryErrorClassifier queryErrorClassifier; + @Mock + private ExecutionStep physicalPlan; + + private PersistentQueryMetadata query; + private SandboxedPersistentQueryMetadata sandbox; + + @Before + public void setUp() { + when(kafkaStreamsBuilder.build(any(), any())).thenReturn(kafkaStreams); + when(physicalSchema.logicalSchema()).thenReturn(mock(LogicalSchema.class)); + when(materializationProviderBuilder.apply(kafkaStreams)) + .thenReturn(Optional.of(materializationProvider)); + + query = new PersistentQueryMetadata( + SQL, + physicalSchema, + Collections.emptySet(), + sinkDataSource, + EXECUTION_PLAN, + QUERY_ID, + Optional.of(materializationProviderBuilder), + APPLICATION_ID, + topology, + kafkaStreamsBuilder, + schemas, + props, + overrides, + closeCallback, + CLOSE_TIMEOUT, + queryErrorClassifier, + physicalPlan, + 10 + ); + + sandbox = SandboxedPersistentQueryMetadata.of(query, closeCallback); + reset(kafkaStreams); + } + + @Test + public void shouldNotStartKafkaStreamsOnStart() { + // When: + sandbox.start(); + + // Then: + verifyZeroInteractions(kafkaStreams); + } + + @Test + public void shouldNotCloseKafkaStreamsOnStop() { + // When: + sandbox.stop(); + + // Then: + verifyZeroInteractions(kafkaStreams); + } + + @Test + public void shouldNotCloseKafkaStreamsOnClose() { + // When: + sandbox.close(); + + // Then: + verifyZeroInteractions(kafkaStreams); + } + + @Test + public void shouldNotCloseStateListenerOnClose() { + // When: + sandbox.stop(); + + // Then: + verifyZeroInteractions(closeCallback); + } +} diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java index 6d0578a00bf8..c3dd38f26c3f 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java @@ -25,6 +25,7 @@ import io.confluent.ksql.rest.client.BasicCredentials; import io.confluent.ksql.rest.client.KsqlRestClient; import io.confluent.ksql.rest.client.RestResponse; +import io.confluent.ksql.rest.entity.CommandStatusEntity; import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.entity.Queries; @@ -93,6 +94,7 @@ public class TestKsqlRestApp extends ExternalResource { protected Optional internalListener; protected KsqlExecutionContext ksqlEngine; protected KsqlRestApplication ksqlRestApplication; + protected long lastCommandSequenceNumber = -1L; static { // Increase the default - it's low (100) @@ -332,16 +334,16 @@ private URI getInternalListener(final String protocol) { } } - private static Set getPersistentQueries(final KsqlRestClient client) { + private Set getPersistentQueries(final KsqlRestClient client) { return getQueries(client, KsqlQueryType.PERSISTENT); } - private static Set getTransientQueries(final KsqlRestClient client) { + private Set getTransientQueries(final KsqlRestClient client) { return getQueries(client, KsqlQueryType.PUSH); } - private static Set getQueries(final KsqlRestClient client, final KsqlQueryType queryType) { - final RestResponse response = client.makeKsqlRequest("SHOW QUERIES;"); + private Set getQueries(final KsqlRestClient client, final KsqlQueryType queryType) { + final RestResponse response = makeKsqlRequest(client, "SHOW QUERIES;"); if (response.isErroneous()) { throw new AssertionError("Failed to get persistent queries." + " msg:" + response.getErrorMessage()); @@ -355,15 +357,15 @@ private static Set getQueries(final KsqlRestClient client, final KsqlQue .collect(Collectors.toSet()); } - private static void terminateQueries(final Set queryIds, final KsqlRestClient client) { + private void terminateQueries(final Set queryIds, final KsqlRestClient client) { final HashSet remaining = new HashSet<>(queryIds); while (!remaining.isEmpty()) { KsqlErrorMessage lastError = null; final Set toRemove = new HashSet<>(); for (final String queryId : remaining) { - final RestResponse response = client - .makeKsqlRequest("TERMINATE " + queryId + ";"); + final RestResponse response = + makeKsqlRequest(client, "TERMINATE " + queryId + ";"); if (response.isSuccessful()) { toRemove.add(queryId); @@ -380,8 +382,8 @@ private static void terminateQueries(final Set queryIds, final KsqlRestC } } - private static Set getStreams(final KsqlRestClient client) { - final RestResponse res = client.makeKsqlRequest("SHOW STREAMS;"); + private Set getStreams(final KsqlRestClient client) { + final RestResponse res = makeKsqlRequest(client, "SHOW STREAMS;"); if (res.isErroneous()) { throw new AssertionError("Failed to get streams." + " msg:" + res.getErrorMessage()); @@ -392,8 +394,8 @@ private static Set getStreams(final KsqlRestClient client) { .collect(Collectors.toSet()); } - private static Set getTables(final KsqlRestClient client) { - final RestResponse res = client.makeKsqlRequest("SHOW TABLES;"); + private Set getTables(final KsqlRestClient client) { + final RestResponse res = makeKsqlRequest(client, "SHOW TABLES;"); if (res.isErroneous()) { throw new AssertionError("Failed to get tables." + " msg:" + res.getErrorMessage()); @@ -404,10 +406,10 @@ private static Set getTables(final KsqlRestClient client) { .collect(Collectors.toSet()); } - private static void dropStreams(final Set streams, final KsqlRestClient client) { + private void dropStreams(final Set streams, final KsqlRestClient client) { for (final String stream : streams) { - final RestResponse res = client - .makeKsqlRequest("DROP STREAM `" + stream + "`;"); + final RestResponse res = + makeKsqlRequest(client, "DROP STREAM `" + stream + "`;"); if (res.isErroneous()) { throw new AssertionError("Failed to drop stream " + stream + "." @@ -416,10 +418,10 @@ private static void dropStreams(final Set streams, final KsqlRestClient } } - private static void dropTables(final Set tables, final KsqlRestClient client) { + private void dropTables(final Set tables, final KsqlRestClient client) { for (final String table : tables) { - final RestResponse res = client - .makeKsqlRequest("DROP TABLE `" + table + "`;"); + final RestResponse res = + makeKsqlRequest(client, "DROP TABLE `" + table + "`;"); if (res.isErroneous()) { throw new AssertionError("Failed to drop table " + table + "." @@ -428,6 +430,23 @@ private static void dropTables(final Set tables, final KsqlRestClient cl } } + private RestResponse makeKsqlRequest( + final KsqlRestClient client, + final String request + ) { + final RestResponse response = + client.makeKsqlRequest(request, lastCommandSequenceNumber); + + lastCommandSequenceNumber = response.getResponse().stream() + .filter(entity -> entity instanceof CommandStatusEntity) + .map(entity -> (CommandStatusEntity)entity) + .mapToLong(CommandStatusEntity::getCommandSequenceNumber) + .max() + .orElse(lastCommandSequenceNumber); + + return response; + } + private static KsqlRestConfig buildConfig( final Supplier bootstrapServers, final Map baseConfig) {