Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: cache transformations in PersistentQueryMetadata #3708

Merged
merged 3 commits into from
Nov 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

import static java.util.Objects.requireNonNull;

import io.confluent.ksql.execution.context.QueryContext;
import com.google.common.base.Suppliers;
import io.confluent.ksql.execution.context.QueryContext.Stacker;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.execution.streams.materialization.MaterializationProvider;
Expand All @@ -30,6 +31,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;

Expand All @@ -38,13 +40,14 @@
*/
public class PersistentQueryMetadata extends QueryMetadata {

private static final String QUERY_ID_PREFIX = "PULL-query-";
private final QueryId id;
private final KsqlTopic resultTopic;
private final SourceName sinkName;
private final QuerySchemas schemas;
private final PhysicalSchema resultSchema;
private final DataSourceType dataSourceType;
private final Optional<MaterializationProvider> materializationProvider;
private final Supplier<Optional<Materialization>> materializationSupplier;

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
public PersistentQueryMetadata(
Expand Down Expand Up @@ -83,9 +86,11 @@ public PersistentQueryMetadata(
this.sinkName = Objects.requireNonNull(sinkName, "sinkName");
this.schemas = requireNonNull(schemas, "schemas");
this.resultSchema = requireNonNull(schema, "schema");
this.materializationProvider =
requireNonNull(materializationProvider, "materializationProvider");
requireNonNull(materializationProvider, "materializationProvider");
this.dataSourceType = Objects.requireNonNull(dataSourceType, "dataSourceType");
this.materializationSupplier = Suppliers.memoize(() -> materializationProvider
.map(builder -> builder.build(getPullQueryId(sinkName.name()),
new Stacker())));
}

private PersistentQueryMetadata(
Expand All @@ -98,8 +103,8 @@ private PersistentQueryMetadata(
this.sinkName = other.sinkName;
this.schemas = other.schemas;
this.resultSchema = other.resultSchema;
this.materializationProvider = other.materializationProvider;
this.dataSourceType = other.dataSourceType;
this.materializationSupplier = other.materializationSupplier;
}

public PersistentQueryMetadata copyWith(final Consumer<QueryMetadata> closeCallback) {
Expand Down Expand Up @@ -130,10 +135,11 @@ public PhysicalSchema getPhysicalSchema() {
return resultSchema;
}

public Optional<Materialization> getMaterialization(
final QueryId queryId,
final QueryContext.Stacker contextStacker
) {
return materializationProvider.map(builder -> builder.build(queryId, contextStacker));
public Optional<Materialization> getMaterialization() {
return materializationSupplier.get();
}

public static QueryId getPullQueryId(String sinkName) {
return new QueryId(QUERY_ID_PREFIX + sinkName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import com.google.common.collect.Range;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.execution.streams.materialization.MaterializedTable;
import io.confluent.ksql.execution.streams.materialization.MaterializedWindowedTable;
Expand Down Expand Up @@ -117,7 +116,6 @@ public class KsMaterializationFunctionalTest {

private String output;
private final QueryId queryId = new QueryId("static");
private final QueryContext.Stacker contextStacker = new QueryContext.Stacker();

@BeforeClass
public static void classSetUp() {
Expand Down Expand Up @@ -153,7 +151,7 @@ public void shouldReturnEmptyIfNotMaterializedTable() {
);

// When:
final Optional<Materialization> result = query.getMaterialization(queryId, contextStacker);
final Optional<Materialization> result = query.getMaterialization();

// Then:
assertThat(result, is(Optional.empty()));
Expand All @@ -168,7 +166,7 @@ public void shouldReturnEmptyIfNotMaterializedStream() {
);

// When:
final Optional<Materialization> result = query.getMaterialization(queryId, contextStacker);
final Optional<Materialization> result = query.getMaterialization();

// Then:
assertThat(result, is(Optional.empty()));
Expand All @@ -188,7 +186,7 @@ public void shouldReturnEmptyIfAppServerNotConfigured() {
);

// When:
final Optional<Materialization> result = query.getMaterialization(queryId, contextStacker);
final Optional<Materialization> result = query.getMaterialization();

// Then:
assertThat(result, is(Optional.empty()));
Expand All @@ -209,7 +207,7 @@ public void shouldQueryMaterializedTableForAggregatedTable() {
final Map<String, GenericRow> rows = waitForTableRows(STRING_DESERIALIZER, schema);

// When:
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get();
final Materialization materialization = query.getMaterialization().get();

// Then:
assertThat(materialization.windowType(), is(Optional.empty()));
Expand Down Expand Up @@ -243,7 +241,7 @@ public void shouldQueryMaterializedTableForAggregatedStream() {
final Map<String, GenericRow> rows = waitForTableRows(STRING_DESERIALIZER, schema);

// When:
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get();
final Materialization materialization = query.getMaterialization().get();

// Then:
assertThat(materialization.windowType(), is(Optional.empty()));
Expand Down Expand Up @@ -279,7 +277,7 @@ public void shouldQueryMaterializedTableForTumblingWindowed() {
waitForTableRows(TIME_WINDOWED_DESERIALIZER, schema);

// When:
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get();
final Materialization materialization = query.getMaterialization().get();

// Then:
assertThat(materialization.windowType(), is(Optional.of(WindowType.TUMBLING)));
Expand Down Expand Up @@ -327,7 +325,7 @@ public void shouldQueryMaterializedTableForHoppingWindowed() {
waitForTableRows(TIME_WINDOWED_DESERIALIZER, schema);

// When:
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get();
final Materialization materialization = query.getMaterialization().get();

// Then:
assertThat(materialization.windowType(), is(Optional.of(WindowType.HOPPING)));
Expand Down Expand Up @@ -374,7 +372,7 @@ public void shouldQueryMaterializedTableForSessionWindowed() {
waitForTableRows(SESSION_WINDOWED_DESERIALIZER, schema);

// When:
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get();
final Materialization materialization = query.getMaterialization().get();

// Then:
assertThat(materialization.windowType(), is(Optional.of(WindowType.SESSION)));
Expand Down Expand Up @@ -424,7 +422,7 @@ public void shouldQueryMaterializedTableWithKeyFieldsInProjection() {


// When:
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get();
final Materialization materialization = query.getMaterialization().get();

// Then:
assertThat(materialization.windowType(), is(Optional.empty()));
Expand Down Expand Up @@ -454,7 +452,7 @@ public void shouldQueryMaterializedTableWitMultipleAggregationColumns() {
final Map<String, GenericRow> rows = waitForTableRows(STRING_DESERIALIZER, schema);

// When:
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get();
final Materialization materialization = query.getMaterialization().get();

// Then:
assertThat(materialization.windowType(), is(Optional.empty()));
Expand Down Expand Up @@ -484,7 +482,7 @@ public void shouldIgnoreHavingClause() {
final Map<String, GenericRow> rows = waitForTableRows(STRING_DESERIALIZER, schema);

// When:
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get();
final Materialization materialization = query.getMaterialization().get();

// Then:
final MaterializedTable table = materialization.nonWindowed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class QueryExecutorTest {
private static final Set<SourceName> SOURCES
= ImmutableSet.of(SourceName.of("foo"), SourceName.of("bar"));
private static final SourceName SINK_NAME = SourceName.of("baz");
private static final QueryId PULL_QUERY_ID = PersistentQueryMetadata.getPullQueryId(SINK_NAME.name());
private static final String STORE_NAME = "store";
private static final String SUMMARY = "summary";
private static final Map<String, Object> OVERRIDES = Collections.emptyMap();
Expand Down Expand Up @@ -274,8 +275,7 @@ public void shouldBuildPersistentQueryWithCorrectMaterializationProvider() {
physicalPlan,
SUMMARY
);
final QueryContext.Stacker stacker = new Stacker();
final Optional<Materialization> result = queryMetadata.getMaterialization(QUERY_ID, stacker);
final Optional<Materialization> result = queryMetadata.getMaterialization();

// Then:
assertThat(result.get(), is(materialization));
Expand Down Expand Up @@ -317,13 +317,13 @@ public void shouldMaterializeCorrectly() {
SUMMARY
);
final QueryContext.Stacker stacker = new Stacker();
queryMetadata.getMaterialization(QUERY_ID, stacker);
queryMetadata.getMaterialization();

// Then:
verify(ksqlMaterializationFactory).create(
ksMaterialization,
materializationInfo,
QUERY_ID,
PULL_QUERY_ID,
stacker
);
}
Expand All @@ -342,8 +342,7 @@ public void shouldNotIncludeMaterializationProviderIfNoMaterialization() {
physicalPlan,
SUMMARY
);
final QueryContext.Stacker stacker = new Stacker();
final Optional<Materialization> result = queryMetadata.getMaterialization(QUERY_ID, stacker);
final Optional<Materialization> result = queryMetadata.getMaterialization();

assertThat(result, is(Optional.empty()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.analyzer.Analysis;
import io.confluent.ksql.analyzer.QueryAnalyzer;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.context.QueryContext.Stacker;
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
import io.confluent.ksql.execution.expression.tree.ComparisonExpression.Type;
Expand Down Expand Up @@ -155,11 +153,11 @@ public static TableRowsEntity execute(

final WhereInfo whereInfo = extractWhereInfo(analysis, query);

final QueryId queryId = uniqueQueryId();
final QueryContext.Stacker contextStacker = new Stacker();
final QueryId queryId = PersistentQueryMetadata.getPullQueryId(
getSourceName(analysis).name());

final Materialization mat = query
.getMaterialization(queryId, contextStacker)
.getMaterialization()
.orElseThrow(() -> notMaterializedException(getSourceName(analysis)));

final Struct rowKey = asKeyStruct(whereInfo.rowkey, query.getPhysicalSchema());
Expand Down Expand Up @@ -215,10 +213,6 @@ public static TableRowsEntity execute(
}
}

private static QueryId uniqueQueryId() {
return new QueryId("query_" + System.currentTimeMillis());
}

private static Analysis analyze(
final ConfiguredStatement<Query> statement,
final KsqlExecutionContext executionContext
Expand Down