Skip to content

Commit

Permalink
feat(static): initial syntax for static queries (#3300)
Browse files Browse the repository at this point in the history
* feat(static): initial syntax for static queries

The proposed syntax includes allowing queries to have one of the following:
 * EMIT CHANGES - marking it as continuous, outputting intermediate results
 * EMIT FINAL   - marking it as continuous, outputting final results
 * WITH CHANGES - marking it as static, outputting intermediate results
 * WITH FINAL   - marking it as static, outputting final results

Where:
* Persistent queries, (CSAS, CTAS + INSERT INTO), default to `EMIT CHANGES`.
* Bare queries, (SELECT * FROM X;), default to `WITH FINAL`.

However, this change introduces the minimum set of changes needed for KLIP-8, which is the addition of `EMIT CHANGES` only.

* Bare queries without `EMIT CHANGES` will be static queries
* Bare queries with `EMIT CHANGES` will be continuous queries
* Persistent queries will implicitly be `EMIT CHANGES` until the next major release. The `EMIT CHANGES` is optional, but encouraged as it provides future proofing.
  • Loading branch information
big-andy-coates authored Sep 10, 2019
1 parent 92b03ec commit 8917e48
Show file tree
Hide file tree
Showing 51 changed files with 706 additions and 347 deletions.
6 changes: 3 additions & 3 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ public void testTransientSelect() {
final List<Object> row3 = streamData.get("3").getColumns();

selectWithLimit(
"SELECT ORDERID, ITEMID FROM " + orderDataProvider.kstreamName(),
"SELECT ORDERID, ITEMID FROM " + orderDataProvider.kstreamName() + " EMIT CHANGES",
3,
containsRows(
row(row1.get(1).toString(), row1.get(2).toString()),
Expand All @@ -601,7 +601,7 @@ public void testTransientSelectStar() {
final List<Object> row3 = streamData.get("3").getColumns();

selectWithLimit(
"SELECT * FROM " + orderDataProvider.kstreamName(),
"SELECT * FROM " + orderDataProvider.kstreamName() + " EMIT CHANGES",
3,
containsRows(
row(prependWithRowTimeAndKey(row1)),
Expand All @@ -614,7 +614,7 @@ public void testTransientSelectStar() {
public void testTransientHeader() {
// When:
rowCaptor.resetTestResult();
run("SELECT * FROM " + orderDataProvider.kstreamName() + " LIMIT 1", localCli);
run("SELECT * FROM " + orderDataProvider.kstreamName() + " EMIT CHANGES LIMIT 1", localCli);

// Then: (note that some of these are truncated because of header wrapping)
assertThat(terminal.getOutputString(), containsString("ROWTIME"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.rewrite.ExpressionTreeRewriter;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.ResultMaterialization;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.util.AggregateExpressionRewriter;
Expand All @@ -37,6 +38,21 @@
import java.util.stream.Collectors;

public class QueryAnalyzer {

private static final String NEW_QUERY_SYNTAX_HELP =
"'EMIT CHANGES' is used to indicate a query is continuous and outputs all changes."
+ System.lineSeparator()
+ "'Bare queries, e.g. those in the format 'SELECT * FROM X ...' are now, by default, "
+ "static queries, i.e. they query the current state of the system and return a final "
+ "result."
+ System.lineSeparator()
+ "To turn a static query into a streaming query, as was the default in older versions "
+ "of KSQL, add `EMIT CHANGES` to the end of the statement, before any limit clause."
+ System.lineSeparator()
+ "Persistent queries, e.g. `CREATE STREAM AS ...`, currently have an implicit "
+ "`EMIT CHANGES`. However, it is recommended to add `EMIT CHANGES` to such statements "
+ "as a this will be required in a future release.";

private final MetaStore metaStore;
private final String outputTopicPrefix;
private final Set<SerdeOption> defaultSerdeOptions;
Expand All @@ -56,6 +72,22 @@ public Analysis analyze(
final Query query,
final Optional<Sink> sink
) {
if (query.isStatic()) {
throw new KsqlException("Static queries are not yet supported. "
+ "Consider adding 'EMIT CHANGES' to any bare query, "
+ System.lineSeparator()
+ NEW_QUERY_SYNTAX_HELP
);
}

if (query.getResultMaterialization() != ResultMaterialization.CHANGES) {
throw new KsqlException("Continous queries do not yet support `EMIT FINAL`. "
+ "Consider changing to `EMIT CHANGES`."
+ System.lineSeparator()
+ NEW_QUERY_SYNTAX_HELP
);
}

return new Analyzer(metaStore, outputTopicPrefix, defaultSerdeOptions)
.analyze(query, sink);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.confluent.ksql.analyzer.Analysis.Into;
import io.confluent.ksql.analyzer.Analysis.JoinInfo;
import io.confluent.ksql.analyzer.Analyzer.SerdeOptionsSupplier;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.expression.tree.BooleanLiteral;
import io.confluent.ksql.execution.expression.tree.Literal;
import io.confluent.ksql.execution.expression.tree.StringLiteral;
Expand All @@ -40,7 +41,6 @@
import io.confluent.ksql.metastore.MutableMetaStore;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.metastore.model.KsqlStream;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.parser.ExpressionFormatterUtil;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.KsqlParserTestUtil;
Expand Down Expand Up @@ -124,7 +124,7 @@ public void init() {

@Test
public void testSimpleQueryAnalysis() {
final String simpleQuery = "SELECT col0, col2, col3 FROM test1 WHERE col0 > 100;";
final String simpleQuery = "SELECT col0, col2, col3 FROM test1 WHERE col0 > 100 EMIT CHANGES;";
final Analysis analysis = analyzeQuery(simpleQuery, jsonMetaStore);
Assert.assertNotNull("INTO is null", analysis.getInto());
Assert.assertNotNull("SELECT is null", analysis.getSelectExpressions());
Expand Down Expand Up @@ -167,7 +167,7 @@ public void testSimpleLeftJoinAnalysis() {
final Analysis analysis = analyzeQuery(
"SELECT t1.col1, t2.col1, t2.col4, col5, t2.col2 "
+ "FROM test1 t1 LEFT JOIN test2 t2 "
+ "ON t1.col1 = t2.col1;", jsonMetaStore);
+ "ON t1.col1 = t2.col1 EMIT CHANGES;", jsonMetaStore);

// Then:
assertThat(analysis.getFromDataSources(), hasSize(2));
Expand All @@ -194,7 +194,7 @@ public void testSimpleLeftJoinAnalysis() {
public void shouldHandleJoinOnRowKey() {
// When:
final Optional<JoinInfo> join = analyzeQuery(
"SELECT * FROM test1 t1 LEFT JOIN test2 t2 ON t1.ROWKEY = t2.ROWKEY;",
"SELECT * FROM test1 t1 LEFT JOIN test2 t2 ON t1.ROWKEY = t2.ROWKEY EMIT CHANGES;",
jsonMetaStore)
.getJoin();

Expand All @@ -207,7 +207,7 @@ public void shouldHandleJoinOnRowKey() {

@Test
public void testBooleanExpressionAnalysis() {
final String queryStr = "SELECT col0 = 10, col2, col3 > col1 FROM test1;";
final String queryStr = "SELECT col0 = 10, col2, col3 > col1 FROM test1 EMIT CHANGES;";
final Analysis analysis = analyzeQuery(queryStr, jsonMetaStore);

Assert.assertNotNull("INTO is null", analysis.getInto());
Expand Down Expand Up @@ -236,7 +236,7 @@ public void testBooleanExpressionAnalysis() {

@Test
public void testFilterAnalysis() {
final String queryStr = "SELECT col0 = 10, col2, col3 > col1 FROM test1 WHERE col0 > 20;";
final String queryStr = "SELECT col0 = 10, col2, col3 > col1 FROM test1 WHERE col0 > 20 EMIT CHANGES;";
final Analysis analysis = analyzeQuery(queryStr, jsonMetaStore);

Assert.assertNotNull("INTO is null", analysis.getInto());
Expand Down
Loading

0 comments on commit 8917e48

Please sign in to comment.