Skip to content

Commit

Permalink
feat(static): static select support (#3369)
Browse files Browse the repository at this point in the history
* feat(static): static select support

static queries now support arbitrary select expressions.
  • Loading branch information
big-andy-coates authored Sep 19, 2019
1 parent a7a7309 commit e4b3275
Show file tree
Hide file tree
Showing 15 changed files with 1,355 additions and 582 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public final class SchemaUtil {

public static final String ROWKEY_NAME = "ROWKEY";
public static final String ROWTIME_NAME = "ROWTIME";
public static final String WINDOWSTART_NAME = "WINDOWSTART";

public static final int ROWKEY_INDEX = 1;

Expand Down
22 changes: 17 additions & 5 deletions ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ Analysis analyze(
final Query query,
final Optional<Sink> sink
) {
final Visitor visitor = new Visitor();
final Visitor visitor = new Visitor(query.isStatic());
visitor.process(query, null);

sink.ifPresent(visitor::analyzeNonStdOutSink);
Expand All @@ -146,9 +146,14 @@ private final class Visitor extends DefaultTraversalVisitor<AstNode, Void> {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private final Analysis analysis = new Analysis();
private final boolean staticQuery;
private boolean isJoin = false;
private boolean isGroupBy = false;

Visitor(final boolean staticQuery) {
this.staticQuery = staticQuery;
}

private void analyzeNonStdOutSink(final Sink sink) {
analysis.setProperties(sink.getProperties());
sink.getPartitionBy().ifPresent(analysis::setPartitionBy);
Expand Down Expand Up @@ -301,19 +306,26 @@ private void throwOnUnknownColumnReference() {
new ExpressionAnalyzer(analysis.getFromSourceSchemas());

for (final Expression selectExpression : analysis.getSelectExpressions()) {
expressionAnalyzer.analyzeExpression(selectExpression);
expressionAnalyzer.analyzeExpression(selectExpression, false);
}

if (analysis.getWhereExpression() != null) {
expressionAnalyzer.analyzeExpression(analysis.getWhereExpression());
final boolean allowWindowMetaFields = staticQuery
&& analysis.getFromDataSources().get(0)
.getDataSource()
.getKsqlTopic()
.getKeyFormat()
.isWindowed();

expressionAnalyzer.analyzeExpression(analysis.getWhereExpression(), allowWindowMetaFields);
}

for (final Expression expression : analysis.getGroupByExpressions()) {
expressionAnalyzer.analyzeExpression(expression);
expressionAnalyzer.analyzeExpression(expression, false);
}

if (analysis.getHavingExpression() != null) {
expressionAnalyzer.analyzeExpression(analysis.getHavingExpression());
expressionAnalyzer.analyzeExpression(analysis.getHavingExpression(), false);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.analyzer;

import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.ResultMaterialization;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.util.KsqlException;
import java.util.Optional;

public class ContinuousQueryValidator implements QueryValidator {

@Override
public void preValidate(
final Query query,
final Optional<Sink> sink
) {
if (query.isStatic()) {
throw new IllegalArgumentException("static");
}

if (query.getResultMaterialization() != ResultMaterialization.CHANGES) {
throw new KsqlException("Continuous queries do not yet support `EMIT FINAL`. "
+ "Consider changing to `EMIT CHANGES`."
+ QueryAnalyzer.NEW_QUERY_SYNTAX_HELP
);
}
}

@Override
public void postValidate(final Analysis analysis) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,34 +45,18 @@ class ExpressionAnalyzer {
this.sourceSchemas = Objects.requireNonNull(sourceSchemas, "sourceSchemas");
}

void analyzeExpression(final Expression expression) {
final Visitor visitor = new Visitor();
void analyzeExpression(final Expression expression, final boolean allowWindowMetaFields) {
final Visitor visitor = new Visitor(allowWindowMetaFields);
visitor.process(expression, null);
}

private void throwOnUnknownField(final QualifiedName name) {
final Set<String> sourcesWithField = sourceSchemas.sourcesWithField(name.name());
if (sourcesWithField.isEmpty()) {
throw new KsqlException("Field '" + name + "' cannot be resolved.");
}
private final class Visitor extends VisitParentExpressionVisitor<Object, Object> {

if (name.qualifier().isPresent()) {
if (!sourcesWithField.contains(name.qualifier().get())) {
throw new KsqlException("Source '" + name.qualifier() + "', "
+ "used in '" + name + "' cannot be resolved.");
}
} else if (sourcesWithField.size() > 1) {
final String possibilities = sourcesWithField.stream()
.sorted()
.map(source -> SchemaUtil.buildAliasedFieldName(source, name.name()))
.collect(Collectors.joining(","));

throw new KsqlException("Field '" + name + "' is ambiguous. "
+ "Could be any of: " + possibilities);
}
}
private final boolean allowWindowMetaFields;

private class Visitor extends VisitParentExpressionVisitor<Object, Object> {
Visitor(final boolean allowWindowMetaFields) {
this.allowWindowMetaFields = allowWindowMetaFields;
}

public Object visitLikePredicate(final LikePredicate node, final Object context) {
process(node.getValue(), null);
Expand Down Expand Up @@ -138,5 +122,32 @@ public Object visitQualifiedNameReference(
throwOnUnknownField(node.getName());
return null;
}

private void throwOnUnknownField(final QualifiedName name) {
final Set<String> sourcesWithField = sourceSchemas.sourcesWithField(name.name());
if (sourcesWithField.isEmpty()) {
if (allowWindowMetaFields && name.name().equals(SchemaUtil.WINDOWSTART_NAME)) {
return;
}

throw new KsqlException("Field '" + name + "' cannot be resolved.");
}

if (name.qualifier().isPresent()) {
final String qualifier = name.qualifier().get();
if (!sourcesWithField.contains(qualifier)) {
throw new KsqlException("Source '" + qualifier + "', "
+ "used in '" + name + "' cannot be resolved.");
}
} else if (sourcesWithField.size() > 1) {
final String possibilities = sourcesWithField.stream()
.sorted()
.map(source -> SchemaUtil.buildAliasedFieldName(source, name.name()))
.collect(Collectors.joining(", "));

throw new KsqlException("Field '" + name + "' is ambiguous. "
+ "Could be any of: " + possibilities);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
Expand All @@ -27,7 +28,6 @@
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.rewrite.ExpressionTreeRewriter;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.ResultMaterialization;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.util.AggregateExpressionRewriter;
Expand All @@ -40,57 +40,70 @@

public class QueryAnalyzer {

private static final String NEW_QUERY_SYNTAX_HELP =
"'EMIT CHANGES' is used to indicate a query is continuous and outputs all changes."
+ System.lineSeparator()
+ "'Bare queries, e.g. those in the format 'SELECT * FROM X ...' are now, by default, "
+ "static queries, i.e. they query the current state of the system and return a final "
+ "result."
+ System.lineSeparator()
+ "To turn a static query into a streaming query, as was the default in older versions "
+ "of KSQL, add `EMIT CHANGES` to the end of the statement, before any limit clause."
+ System.lineSeparator()
+ "Persistent queries, e.g. `CREATE STREAM AS ...`, currently have an implicit "
+ "`EMIT CHANGES`. However, it is recommended to add `EMIT CHANGES` to such statements "
+ "as a this will be required in a future release.";

static final String NEW_QUERY_SYNTAX_HELP = System.lineSeparator()
+ "'EMIT CHANGES' is used to indicate a query is continuous and outputs all changes."
+ System.lineSeparator()
+ "'Bare queries, e.g. those in the format 'SELECT * FROM X ...' are now, by default, "
+ "static queries, i.e. they query the current state of the system and return a final "
+ "result."
+ System.lineSeparator()
+ "To turn a static query into a streaming query, as was the default in older versions "
+ "of KSQL, add `EMIT CHANGES` to the end of the statement, before any limit clause."
+ System.lineSeparator()
+ "Persistent queries, e.g. `CREATE STREAM AS ...`, currently have an implicit "
+ "`EMIT CHANGES`. However, it is recommended to add `EMIT CHANGES` to such statements "
+ "as a this will be required in a future release.";

private final Analyzer analyzer;
private final MetaStore metaStore;
private final String outputTopicPrefix;
private final Set<SerdeOption> defaultSerdeOptions;
private final QueryValidator continuousValidator;
private final QueryValidator staticValidator;

public QueryAnalyzer(
final MetaStore metaStore,
final String outputTopicPrefix,
final Set<SerdeOption> defaultSerdeOptions
) {
this(
metaStore,
new Analyzer(metaStore, outputTopicPrefix, defaultSerdeOptions),
new ContinuousQueryValidator(),
new StaticQueryValidator()
);
}

@VisibleForTesting
QueryAnalyzer(
final MetaStore metaStore,
final Analyzer analyzer,
final QueryValidator continuousValidator,
final QueryValidator staticValidator
) {
this.metaStore = requireNonNull(metaStore, "metaStore");
this.outputTopicPrefix = requireNonNull(outputTopicPrefix, "outputTopicPrefix");
this.defaultSerdeOptions = ImmutableSet.copyOf(
requireNonNull(defaultSerdeOptions, "defaultSerdeOptions"));
this.analyzer = requireNonNull(analyzer, "analyzer");
this.continuousValidator = requireNonNull(continuousValidator, "continuousValidator");
this.staticValidator = requireNonNull(staticValidator, "staticValidator");
}

public Analysis analyze(
final Query query,
final Optional<Sink> sink
) {
if (query.isStatic()) {
throw new KsqlException("Static queries are not yet supported. "
+ "Consider adding 'EMIT CHANGES' to any bare query, "
+ System.lineSeparator()
+ NEW_QUERY_SYNTAX_HELP
);
staticValidator.preValidate(query, sink);
} else {
continuousValidator.preValidate(query, sink);
}

if (query.getResultMaterialization() != ResultMaterialization.CHANGES) {
throw new KsqlException("Continous queries do not yet support `EMIT FINAL`. "
+ "Consider changing to `EMIT CHANGES`."
+ System.lineSeparator()
+ NEW_QUERY_SYNTAX_HELP
);
final Analysis analysis = analyzer.analyze(query, sink);

if (query.isStatic()) {
staticValidator.postValidate(analysis);
} else {
continuousValidator.postValidate(analysis);
}

return new Analyzer(metaStore, outputTopicPrefix, defaultSerdeOptions)
.analyze(query, sink);
return analysis;
}

public AggregateAnalysis analyzeAggregate(final Query query, final Analysis analysis) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.analyzer;

import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Sink;
import java.util.Optional;

/**
* Validator used by {@link QueryAnalyzer}.
*/
interface QueryValidator {

void preValidate(Query query, Optional<Sink> sink);

void postValidate(Analysis analysis);
}
Loading

0 comments on commit e4b3275

Please sign in to comment.