Skip to content

Commit

Permalink
fix: Use a SandboxedPersistentQueryMetadata to not interact with Kafk…
Browse files Browse the repository at this point in the history
…Streams (#6066)
  • Loading branch information
spena authored Aug 25, 2020
1 parent 8ab98a5 commit e000b41
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.SandboxedPersistentQueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -115,7 +116,7 @@ EngineContext createSandbox(final ServiceContext serviceContext) {
persistentQueries.forEach((queryId, query) ->
sandBox.persistentQueries.put(
query.getQueryId(),
query.copyWith(sandBox::closeQuery)));
SandboxedPersistentQueryMetadata.of(query, sandBox::closeQuery)));

return sandBox;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public PersistentQueryMetadata(
.flatMap(builder -> builder.apply(getKafkaStreams()));
}

private PersistentQueryMetadata(
protected PersistentQueryMetadata(
final PersistentQueryMetadata other,
final Consumer<QueryMetadata> closeCallback
) {
Expand All @@ -116,10 +116,6 @@ private PersistentQueryMetadata(
this.materializationProviderBuilder = other.materializationProviderBuilder;
}

public PersistentQueryMetadata copyWith(final Consumer<QueryMetadata> closeCallback) {
return new PersistentQueryMetadata(this, closeCallback);
}

public DataSourceType getDataSourceType() {
return sinkDataSource.getDataSourceType();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public abstract class QueryMetadata {
private final KafkaStreamsBuilder kafkaStreamsBuilder;
private final Map<String, Object> streamsProperties;
private final Map<String, Object> overriddenProperties;
private final Consumer<QueryMetadata> closeCallback;
protected final Consumer<QueryMetadata> closeCallback;
private final Set<SourceName> sourceNames;
private final LogicalSchema logicalSchema;
private final Long closeTimeout;
Expand All @@ -68,7 +68,7 @@ public abstract class QueryMetadata {

private Optional<QueryStateListener> queryStateListener = Optional.empty();
private boolean everStarted = false;
private boolean closed = false;
protected boolean closed = false;
private UncaughtExceptionHandler uncaughtExceptionHandler = this::uncaughtHandler;
private KafkaStreams kafkaStreams;
private Consumer<Boolean> onStop = (ignored) -> { };
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.util;

import java.util.function.Consumer;

/**
* Sandboxed {@link PersistentQueryMetadata} that prevents to modify the state of the internal
* {@link org.apache.kafka.streams.KafkaStreams}.
*/
public final class SandboxedPersistentQueryMetadata extends PersistentQueryMetadata {
public static SandboxedPersistentQueryMetadata of(
final PersistentQueryMetadata queryMetadata,
final Consumer<QueryMetadata> closeCallback
) {
return new SandboxedPersistentQueryMetadata(queryMetadata, closeCallback);
}

private SandboxedPersistentQueryMetadata(
final PersistentQueryMetadata queryMetadata,
final Consumer<QueryMetadata> closeCallback
) {
super(queryMetadata, closeCallback);
}

@Override
public void close() {
closed = true;
closeCallback.accept(this);
}

@Override
public void start() {
// no-op
}

@Override
protected void closeKafkaStreams() {
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1351,6 +1351,50 @@ public void shouldRegisterPersistentQueriesOnlyInSandbox() {
is(Optional.empty()));
}

@Test
public void shouldNotStartKafkaStreamsInSandbox() {
// Given:
givenSqlAlreadyExecuted("create table bar as select * from test2;");
final QueryMetadata query = ksqlEngine.getPersistentQueries().get(0);

// When:
sandbox.getPersistentQuery(query.getQueryId()).get().start();

// Then:
assertThat(ksqlEngine.getPersistentQuery(query.getQueryId()).get().getState(),
is(KafkaStreams.State.CREATED));
}

@Test
public void shouldNotStopKafkaStreamsInSandbox() {
// Given:
givenSqlAlreadyExecuted("create table bar as select * from test2;");
final QueryMetadata query = ksqlEngine.getPersistentQueries().get(0);
ksqlEngine.getPersistentQuery(query.getQueryId()).get().start();

// When:
sandbox.getPersistentQuery(query.getQueryId()).get().stop();

// Then:
assertThat(ksqlEngine.getPersistentQuery(query.getQueryId()).get().getState(),
is(KafkaStreams.State.REBALANCING));
}

@Test
public void shouldNotCloseKafkaStreamsInSandbox() {
// Given:
givenSqlAlreadyExecuted("create table bar as select * from test2;");
final QueryMetadata query = ksqlEngine.getPersistentQueries().get(0);
ksqlEngine.getPersistentQuery(query.getQueryId()).get().start();

// When:
sandbox.getPersistentQuery(query.getQueryId()).get().close();

// Then:
assertThat(ksqlEngine.getPersistentQuery(query.getQueryId()).get().getState(),
is(KafkaStreams.State.REBALANCING));
}

@Test
public void shouldExecuteDdlStatement() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ public void shouldNotReplaceExistingSchemaForSchemaRegistryEnabledFormatCreateSo
// Given:
givenStatement("CREATE STREAM sink (f1 VARCHAR) WITH (kafka_topic='expectedName', value_format='AVRO', partitions=1);");
when(schemaRegistryClient.getAllSubjects()).thenReturn(ImmutableSet.of("expectedName-value"));
when(schemaRegistryClient.testCompatibility(eq("expectedName-value"), any(ParsedSchema.class))).thenReturn(true);

// When:
injector.inject(statement);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.util;

import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.streams.materialization.MaterializationProvider;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.query.KafkaStreamsBuilder;
import io.confluent.ksql.query.MaterializationProviderBuilderFactory;
import io.confluent.ksql.query.QueryErrorClassifier;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class SandboxedPersistentQueryMetadataTest {
private static final String SQL = "sql";
private static final String EXECUTION_PLAN = "execution plan";
private static final QueryId QUERY_ID = new QueryId("queryId");
private static final String APPLICATION_ID = "applicationId";
private static final long CLOSE_TIMEOUT = 10L;

@Mock
private KafkaStreamsBuilder kafkaStreamsBuilder;
@Mock
private KafkaStreams kafkaStreams;
@Mock
private PhysicalSchema physicalSchema;
@Mock
private DataSource sinkDataSource;
@Mock
private MaterializationProviderBuilderFactory.MaterializationProviderBuilder
materializationProviderBuilder;
@Mock
private MaterializationProvider materializationProvider;
@Mock
private Topology topology;
@Mock
private QuerySchemas schemas;
@Mock
private Map<String, Object> props;
@Mock
private Map<String, Object> overrides;
@Mock
private Consumer<QueryMetadata> closeCallback;
@Mock
private QueryErrorClassifier queryErrorClassifier;
@Mock
private ExecutionStep<?> physicalPlan;

private PersistentQueryMetadata query;
private SandboxedPersistentQueryMetadata sandbox;

@Before
public void setUp() {
when(kafkaStreamsBuilder.build(any(), any())).thenReturn(kafkaStreams);
when(physicalSchema.logicalSchema()).thenReturn(mock(LogicalSchema.class));
when(materializationProviderBuilder.apply(kafkaStreams))
.thenReturn(Optional.of(materializationProvider));

query = new PersistentQueryMetadata(
SQL,
physicalSchema,
Collections.emptySet(),
sinkDataSource,
EXECUTION_PLAN,
QUERY_ID,
Optional.of(materializationProviderBuilder),
APPLICATION_ID,
topology,
kafkaStreamsBuilder,
schemas,
props,
overrides,
closeCallback,
CLOSE_TIMEOUT,
queryErrorClassifier,
physicalPlan,
10
);

sandbox = SandboxedPersistentQueryMetadata.of(query, closeCallback);
reset(kafkaStreams);
}

@Test
public void shouldNotStartKafkaStreamsOnStart() {
// When:
sandbox.start();

// Then:
verifyZeroInteractions(kafkaStreams);
}

@Test
public void shouldNotCloseKafkaStreamsOnStop() {
// When:
sandbox.stop();

// Then:
verifyZeroInteractions(kafkaStreams);
}

@Test
public void shouldNotCloseKafkaStreamsOnClose() {
// When:
sandbox.close();

// Then:
verifyZeroInteractions(kafkaStreams);
}

@Test
public void shouldNotCloseStateListenerOnClose() {
// When:
sandbox.stop();

// Then:
verifyZeroInteractions(closeCallback);
}
}
Loading

0 comments on commit e000b41

Please sign in to comment.