Skip to content

Commit

Permalink
fix: Use a SandboxedPersistentQueryMetadata to not interact with Kafk…
Browse files Browse the repository at this point in the history
…aStreams
  • Loading branch information
spena committed Aug 20, 2020
1 parent 8f00a5c commit 3a1b5e3
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.SandboxedPersistentQueryMetadata;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -111,7 +112,7 @@ EngineContext createSandbox(final ServiceContext serviceContext) {
persistentQueries.forEach((queryId, query) ->
sandBox.persistentQueries.put(
query.getQueryId(),
query.copyWith(sandBox::unregisterQuery)));
SandboxedPersistentQueryMetadata.of(query, sandBox::unregisterQuery)));

return sandBox;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public PersistentQueryMetadata(
.flatMap(builder -> builder.apply(getKafkaStreams()));
}

private PersistentQueryMetadata(
protected PersistentQueryMetadata(
final PersistentQueryMetadata other,
final Consumer<QueryMetadata> closeCallback
) {
Expand All @@ -116,10 +116,6 @@ private PersistentQueryMetadata(
this.materializationProviderBuilder = other.materializationProviderBuilder;
}

public PersistentQueryMetadata copyWith(final Consumer<QueryMetadata> closeCallback) {
return new PersistentQueryMetadata(this, closeCallback);
}

public DataSourceType getDataSourceType() {
return sinkDataSource.getDataSourceType();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,22 @@ protected void resetKafkaStreams(final KafkaStreams kafkaStreams) {
queryStateListener.ifPresent(this::setQueryStateListener);
}

protected void startKafkaStreams() {
kafkaStreams.start();
}

protected void closeKafkaStreams() {
kafkaStreams.close(Duration.ofMillis(closeTimeout));
}

protected void cleanUpKafkaStreams() {
kafkaStreams.cleanUp();
}

protected void closeStateListener() {
queryStateListener.ifPresent(QueryStateListener::close);
}

protected KafkaStreams buildKafkaStreams() {
return kafkaStreamsBuilder.build(topology, streamsProperties);
}
Expand Down Expand Up @@ -297,16 +309,16 @@ protected void doClose(final boolean cleanUp) {
closeKafkaStreams();

if (cleanUp) {
kafkaStreams.cleanUp();
cleanUpKafkaStreams();
}

queryStateListener.ifPresent(QueryStateListener::close);
closeStateListener();
}

public void start() {
LOG.info("Starting query with application id: {}", queryApplicationId);
everStarted = true;
kafkaStreams.start();
startKafkaStreams();
}

public void clearErrors() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.util;

import java.util.function.Consumer;

/**
* Sandboxed {@link PersistentQueryMetadata} that prevents to modify the state of the internal
* {@link org.apache.kafka.streams.KafkaStreams}.
*/
public final class SandboxedPersistentQueryMetadata extends PersistentQueryMetadata {
public static SandboxedPersistentQueryMetadata of(
final PersistentQueryMetadata queryMetadata,
final Consumer<QueryMetadata> closeCallback
) {
return new SandboxedPersistentQueryMetadata(queryMetadata, closeCallback);
}

private SandboxedPersistentQueryMetadata(
final PersistentQueryMetadata queryMetadata,
final Consumer<QueryMetadata> closeCallback
) {
super(queryMetadata, closeCallback);
}

@Override
protected void startKafkaStreams() {
// no-op
}

@Override
protected void closeKafkaStreams() {
// no-op
}

@Override
protected void cleanUpKafkaStreams() {
// no-op
}

@Override
protected void closeStateListener() {
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ public void shouldNotReplaceExistingSchemaForSchemaRegistryEnabledFormatCreateSo
// Given:
givenStatement("CREATE STREAM sink (f1 VARCHAR) WITH (kafka_topic='expectedName', value_format='AVRO', partitions=1);");
when(schemaRegistryClient.getAllSubjects()).thenReturn(ImmutableSet.of("expectedName-value"));
when(schemaRegistryClient.testCompatibility(eq("expectedName-value"), any(ParsedSchema.class))).thenReturn(true);

// When:
injector.inject(statement);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.util;

import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.streams.materialization.MaterializationProvider;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.query.KafkaStreamsBuilder;
import io.confluent.ksql.query.MaterializationProviderBuilderFactory;
import io.confluent.ksql.query.QueryErrorClassifier;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class SandboxedPersistentQueryMetadataTest {
private static final String SQL = "sql";
private static final String EXECUTION_PLAN = "execution plan";
private static final QueryId QUERY_ID = new QueryId("queryId");
private static final String APPLICATION_ID = "applicationId";
private static final long CLOSE_TIMEOUT = 10L;

@Mock
private KafkaStreamsBuilder kafkaStreamsBuilder;
@Mock
private KafkaStreams kafkaStreams;
@Mock
private PhysicalSchema physicalSchema;
@Mock
private DataSource sinkDataSource;
@Mock
private MaterializationProviderBuilderFactory.MaterializationProviderBuilder
materializationProviderBuilder;
@Mock
private MaterializationProvider materializationProvider;
@Mock
private Topology topology;
@Mock
private QuerySchemas schemas;
@Mock
private Map<String, Object> props;
@Mock
private Map<String, Object> overrides;
@Mock
private Consumer<QueryMetadata> closeCallback;
@Mock
private QueryErrorClassifier queryErrorClassifier;
@Mock
private ExecutionStep<?> physicalPlan;

private PersistentQueryMetadata query;
private SandboxedPersistentQueryMetadata sandbox;

@Before
public void setUp() {
when(kafkaStreamsBuilder.build(any(), any())).thenReturn(kafkaStreams);
when(physicalSchema.logicalSchema()).thenReturn(mock(LogicalSchema.class));
when(materializationProviderBuilder.apply(kafkaStreams))
.thenReturn(Optional.of(materializationProvider));

query = new PersistentQueryMetadata(
SQL,
physicalSchema,
Collections.emptySet(),
sinkDataSource,
EXECUTION_PLAN,
QUERY_ID,
Optional.of(materializationProviderBuilder),
APPLICATION_ID,
topology,
kafkaStreamsBuilder,
schemas,
props,
overrides,
closeCallback,
CLOSE_TIMEOUT,
queryErrorClassifier,
physicalPlan,
10
);

sandbox = SandboxedPersistentQueryMetadata.of(query, closeCallback);
reset(kafkaStreams);
}

@Test
public void shouldNotStartKafkaStreamsOnStart() {
// When:
sandbox.start();

// Then:
verifyZeroInteractions(kafkaStreams);
}

@Test
public void shouldNotCloseKafkaStreamsOnStop() {
// When:
sandbox.stop();

// Then:
verifyZeroInteractions(kafkaStreams);
}

@Test
public void shouldNotCloseKafkaStreamsOnClose() {
// When:
sandbox.close();

// Then:
verifyZeroInteractions(kafkaStreams);
}

@Test
public void shouldNotCloseStateListenerOnClose() {
// When:
sandbox.stop();

// Then:
verifyZeroInteractions(closeCallback);
}
}

0 comments on commit 3a1b5e3

Please sign in to comment.