From fac3e48af9f1652c4f88d4d382cb3cadd9c9efdc Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Wed, 12 Aug 2020 19:18:13 -0700 Subject: [PATCH 1/4] feat: introduce yet-another-testing-tool (YATT) --- .../ksql/engine/InsertValuesExecutor.java | 2 +- .../io/confluent/ksql/engine/KsqlEngine.java | 18 +- .../ksql/util/PersistentQueryMetadata.java | 4 + .../io/confluent/ksql/util/QueryMetadata.java | 6 +- .../ksql/test/KsqlTestException.java | 117 ++++++ .../ksql/test/driver/AssertExecutor.java | 104 ++++++ .../ksql/test/driver/TestDriverPipeline.java | 25 +- .../ksql/test/parser/SqlTestLoader.java | 143 +++++++ .../ksql/test/parser/SqlTestReader.java | 5 +- .../ksql/test/parser/TestDirective.java | 14 +- .../ksql/test/parser/TestStatement.java | 49 +-- .../ksql/test/driver/KsqlTesterTest.java | 350 ++++++++++++++++++ .../ksql/test/parser/SqlTestReaderTest.java | 16 +- .../ksql/test/parser/TestDirectiveTest.java | 1 - .../query-validation-tests/asserts.json | 14 + .../src/test/resources/sql-tests/test.sql | 119 ++++++ .../confluent/ksql/parser/SqlFormatter.java | 79 +++- 17 files changed, 998 insertions(+), 68 deletions(-) create mode 100644 ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/KsqlTestException.java create mode 100644 ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/driver/AssertExecutor.java create mode 100644 ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/parser/SqlTestLoader.java create mode 100644 ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/driver/KsqlTesterTest.java create mode 100644 ksqldb-functional-tests/src/test/resources/query-validation-tests/asserts.json create mode 100644 ksqldb-functional-tests/src/test/resources/sql-tests/test.sql diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java index 7646a81c5bc1..f8ce3b8d5474 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java @@ -84,7 +84,7 @@ void sendRecord( } @VisibleForTesting - InsertValuesExecutor( + public InsertValuesExecutor( final boolean canBeDisabledByConfig, final RecordProducer producer ) { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java index 43467ac8929b..7932be258851 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java @@ -234,13 +234,25 @@ public TransientQueryMetadata executeQuery( } } - @Override - public void close() { - allLiveQueries.forEach(QueryMetadata::stop); + /** + * @param closeQueries whether or not to clean up the local state for any running queries + */ + public void close(final boolean closeQueries) { + if (closeQueries) { + allLiveQueries.forEach(QueryMetadata::close); + } else { + allLiveQueries.forEach(QueryMetadata::stop); + } + engineMetrics.close(); aggregateMetricsCollector.shutdown(); } + @Override + public void close() { + close(false); + } + /** * Determines if a statement is executable by the engine. * diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java index 544f59ada4c5..9e620091d260 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java @@ -145,6 +145,10 @@ public ExecutionStep getPhysicalPlan() { return physicalPlan; } + public DataSource getSink() { + return sinkDataSource; + } + @VisibleForTesting Optional getMaterializationProvider() { return materializationProvider; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java index 222ba724dbbe..c18a714254cf 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java @@ -58,7 +58,7 @@ public abstract class QueryMetadata { private final KafkaStreamsBuilder kafkaStreamsBuilder; private final Map streamsProperties; private final Map overriddenProperties; - private final Consumer closeCallback; + private Consumer closeCallback; private final Set sourceNames; private final LogicalSchema logicalSchema; private final Long closeTimeout; @@ -142,6 +142,10 @@ public void setQueryStateListener(final QueryStateListener queryStateListener) { queryStateListener.onChange(kafkaStreams.state(), kafkaStreams.state()); } + public void closeAndThen(final Consumer andThen) { + this.closeCallback = closeCallback.andThen(andThen); + } + private void uncaughtHandler(final Thread t, final Throwable e) { LOG.error("Unhandled exception caught in streams thread {}.", t.getName(), e); final QueryError queryError = diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/KsqlTestException.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/KsqlTestException.java new file mode 100644 index 000000000000..a854b3341cb1 --- /dev/null +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/KsqlTestException.java @@ -0,0 +1,117 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.test; + +import io.confluent.ksql.parser.KsqlParser.ParsedStatement; +import io.confluent.ksql.parser.NodeLocation; +import io.confluent.ksql.parser.SqlFormatter; +import io.confluent.ksql.parser.tree.AssertStatement; +import io.confluent.ksql.test.model.LocationWithinFile; +import io.confluent.ksql.test.parser.TestDirective; +import io.confluent.ksql.test.parser.TestStatement; +import io.confluent.ksql.util.ParserUtil; +import java.nio.file.Path; +import java.util.Objects; +import java.util.Optional; + +/** + * Indicates a test exception as well as where it occurred. All sql-driven + * tests should throw this exception at the top-level if possible in order + * to automatically populate the statement that produced it as well as the + * location in the file. + */ +public class KsqlTestException extends AssertionError { + + public KsqlTestException( + final TestStatement statement, + final Path file, + final Throwable cause + ) { + super(getMessage(statement, cause.getMessage(), file), cause); + } + + public KsqlTestException( + final TestStatement statement, + final Path file, + final String message + ) { + super(getMessage(statement, message, file)); + } + + private static String getMessage( + final TestStatement stmt, + final String message, + final Path file + ) { + return stmt.apply( + parsed -> engineMessage(parsed, message, file), + assertStatement -> assertMessage(assertStatement, message, file), + directive -> directiveMessage(directive, message, file) + ); + } + + private static String engineMessage( + final ParsedStatement parsedStatement, + final String message, + final Path file + ) { + final Optional loc = ParserUtil.getLocation( + parsedStatement.getStatement()); + + return String.format( + "Test failure for statement `%s` (%s):\n\t%s\n\t%s", + parsedStatement.getStatementText(), + loc.map(NodeLocation::toString).orElse("unknown"), + message, + new LocationWithinFile( + file, + loc.map(NodeLocation::getLineNumber).orElse(1)) + ); + } + + private static String assertMessage( + final AssertStatement assertStatement, + final String message, + final Path file + ) { + return String.format( + "Test failure for assert `%s` (%s):\n\t%s\n\t%s", + SqlFormatter.formatSql(assertStatement), + assertStatement.getLocation().map(Objects::toString).orElse("unknown"), + message, + new LocationWithinFile( + file, + assertStatement.getLocation().map(NodeLocation::getLineNumber).orElse(1)) + ); + } + + private static String directiveMessage( + final TestDirective directive, + final String message, + final Path file + ) { + return String.format( + "Test failure during directive evaluation `%s` (%s):\n\t%s\t%s", + directive, + directive.getLocation(), + message, + new LocationWithinFile( + file, + directive.getLocation().getLineNumber()) + ); + } + +} diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/driver/AssertExecutor.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/driver/AssertExecutor.java new file mode 100644 index 000000000000..8b23cabcc2f9 --- /dev/null +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/driver/AssertExecutor.java @@ -0,0 +1,104 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.test.driver; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.KsqlExecutionContext; +import io.confluent.ksql.engine.generic.GenericRecordFactory; +import io.confluent.ksql.engine.generic.KsqlGenericRecord; +import io.confluent.ksql.metastore.model.DataSource; +import io.confluent.ksql.parser.AssertTable; +import io.confluent.ksql.parser.tree.AssertStream; +import io.confluent.ksql.parser.tree.AssertValues; +import io.confluent.ksql.parser.tree.InsertValues; +import io.confluent.ksql.schema.ksql.SystemColumns; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlException; +import java.util.Iterator; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.test.TestRecord; + +/** + * {@code AssertExecutor} handles the assertion statements for the Sql-based + * testing tool. + */ +public final class AssertExecutor { + + private AssertExecutor() { + } + + public static void assertValues( + final KsqlExecutionContext engine, + final KsqlConfig config, + final AssertValues assertValues, + final TestDriverPipeline driverPipeline + ) { + final InsertValues values = assertValues.getStatement(); + final boolean compareTimestamp = values + .getColumns() + .stream() + .anyMatch(SystemColumns.ROWTIME_NAME::equals); + + final DataSource dataSource = engine.getMetaStore().getSource(values.getTarget()); + final KsqlGenericRecord expected = new GenericRecordFactory( + config, engine.getMetaStore(), System::currentTimeMillis + ).build( + values.getColumns(), + values.getValues(), + dataSource.getSchema(), + dataSource.getDataSourceType() + ); + + final Iterator> records = driverPipeline + .getRecordsForTopic(dataSource.getKafkaTopicName()); + if (!records.hasNext()) { + throw new KsqlException( + String.format( + "Expected another record (%s) for %s but already read all records: %s", + expected, + dataSource.getName(), + driverPipeline.getAllRecordsForTopic(dataSource.getKafkaTopicName()) + ) + ); + } + + final TestRecord actualTestRecord = records.next(); + final KsqlGenericRecord actual = KsqlGenericRecord.of( + actualTestRecord.key(), + actualTestRecord.value(), + compareTimestamp ? actualTestRecord.timestamp() : expected.ts + ); + + if (!actual.equals(expected)) { + throw new KsqlException( + String.format( + "Expected record does not match actual. Expected: %s vs. Actual: %s", + expected, + actual + ) + ); + } + } + + public static void assertStream(final AssertStream assertStatement) { + throw new UnsupportedOperationException(); + } + + public static void assertTable(final AssertTable assertStatement) { + throw new UnsupportedOperationException(); + } + +} diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/driver/TestDriverPipeline.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/driver/TestDriverPipeline.java index de4b8eef44e9..e1c9bb9ff1c4 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/driver/TestDriverPipeline.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/driver/TestDriverPipeline.java @@ -111,13 +111,18 @@ public TopicInfo( private final ListMultimap inputs; private final ListMultimap outputs; private final ListMultimap> outputCache; - private final Map>> outputIterators; + + // this map indexes into the outputCache to track which records we've already + // read - we don't need to worry about concurrent modification while iterating + // because appends only happen at the end of the outputCache + private final Map assertPositions; public TestDriverPipeline() { inputs = ArrayListMultimap.create(); outputs = ArrayListMultimap.create(); outputCache = ArrayListMultimap.create(); - outputIterators = new HashMap<>(); + + assertPositions = new HashMap<>(); } public void addDriver( @@ -182,7 +187,7 @@ private void pipeInput( for (final Input input : inputs) { input.topic.pipeInput(key, value, timestampMs); - // handle the fallout of piping in a record (propegation) + // handle the fallout of piping in a record (propagation) for (final Output receiver : input.receivers) { for (final TestRecord record : receiver.topic.readRecordsToList()) { outputCache.put(receiver.name, record); @@ -207,7 +212,19 @@ public List> getAllRecordsForTopic(final String t } public Iterator> getRecordsForTopic(final String topic) { - return outputIterators.computeIfAbsent(topic, name -> outputCache.get(topic).iterator()); + return new Iterator>() { + @Override + public boolean hasNext() { + final int idx = assertPositions.getOrDefault(topic, 0); + return outputCache.get(topic).size() > idx; + } + + @Override + public TestRecord next() { + final int idx = assertPositions.merge(topic, 0, (old, zero) -> old + 1); + return outputCache.get(topic).get(idx); + } + }; } } diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/parser/SqlTestLoader.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/parser/SqlTestLoader.java new file mode 100644 index 000000000000..b9539effc39f --- /dev/null +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/parser/SqlTestLoader.java @@ -0,0 +1,143 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.test.parser; + +import com.google.common.collect.ImmutableList; +import io.confluent.ksql.test.KsqlTestException; +import io.confluent.ksql.test.parser.TestDirective.Type; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * The {@code SqlTestLoader} loads the test files that should be run + * by the Ksql testing tool based on a path and optional filters. + */ +public class SqlTestLoader { + + private final Predicate shouldRun; + + public SqlTestLoader() { + this(t -> true); + } + + /** + * @param testFilter filters out which tests to run + */ + public SqlTestLoader(final Predicate testFilter) { + this.shouldRun = Objects.requireNonNull(testFilter, "testFilter"); + } + + /** + * @param path a directory containing all test files to run + * + * @return a list of tests to run + */ + public List loadDirectory(final Path path) throws IOException { + final SqlTestLoader loader = new SqlTestLoader(); + final List files = Files + .find(path, Integer.MAX_VALUE, (filePath, fileAttr) -> fileAttr.isRegularFile()) + .collect(Collectors.toList()); + + final ImmutableList.Builder builder = ImmutableList.builder(); + for (final Path file : files) { + builder.addAll(loader.loadTest(file)); + } + + return builder.build(); + } + + /** + * @param path a single sql test file, containing possibly many tests + * + * @return the list of tests to run + */ + public List loadTest(final Path path) throws IOException { + final ImmutableList.Builder builder = ImmutableList.builder(); + + List statements = null; + String name = null; + + final SqlTestReader reader = SqlTestReader.of(path); + while (reader.hasNext()) { + final TestStatement statement = reader.next(); + final Optional nextName = statement.consumeDirective( + directive -> directive.getType() == Type.TEST ? directive.getContents() : null + ); + + if (nextName.isPresent()) { + // flush the previous test + if (statements != null) { + builder.add(new Test(path, name, statements)); + } + + statements = new ArrayList<>(); + name = nextName.get(); + } else if (statements == null) { + throw new KsqlTestException(statement, path, "Exepcted test to start with --@test."); + } + + statements.add(statement); + } + + builder.add(new Test(path, name, statements)); + return builder.build().stream().filter(shouldRun).collect(ImmutableList.toImmutableList()); + } + + /** + * Represents a tuple of (test name, file, test statements) that constitute a ksql + * test. + */ + public static class Test { + + private final Path file; + private final String name; + private final List statements; + + public Test(final Path file, final String name, final List statements) { + this.file = file; + this.name = name; + this.statements = statements; + } + + public Path getFile() { + return file; + } + + public String getName() { + return name; + } + + public List getStatements() { + return statements; + } + + /** + * @return an {@code Object[]} representation of this class used for Parameterized + * JUnit testing. The representation is [name, file, statements] + */ + public Object[] asObjectArray() { + return new Object[]{name, file, statements}; + } + } + +} diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/parser/SqlTestReader.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/parser/SqlTestReader.java index c61614393d21..d70be7701575 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/parser/SqlTestReader.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/parser/SqlTestReader.java @@ -78,7 +78,7 @@ public static SqlTestReader of( } /** - * @param file the test file + * @param file the test file */ public static SqlTestReader of( final Path file @@ -128,7 +128,8 @@ public TestStatement next() { cachedStatement = true; } - // if there's no cachedStatement at this point, we've hit EOF + // if there's no cachedStatement at this point, then all that's left is the directives + // so we can just use EOF (tks.size()) as our stopping point final int currIdx = cachedStatement ? testStatement.getStart().getTokenIndex() : tks.size(); while (directiveIdx < currIdx) { final Token tok = tks.get(directiveIdx++); diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/parser/TestDirective.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/parser/TestDirective.java index 3133e0f1ee95..531e838ea50f 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/parser/TestDirective.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/parser/TestDirective.java @@ -57,6 +57,10 @@ public String getContents() { return contents; } + public NodeLocation getLocation() { + return location; + } + @Override public boolean equals(final Object o) { if (this == o) { @@ -79,21 +83,13 @@ public int hashCode() { @Override public String toString() { - return "TestDirective{" - + "type=" + type - + ", contents='" + contents + '\'' - + '}'; - } - - public NodeLocation getLocation() { - return location; + return "--@" + type.getTypeName() + ": " + contents; } public enum Type { TEST("test"), EXPECTED_ERROR("expected.error"), EXPECTED_MESSAGE("expected.message"), - ENABLED("enabled"), UNKNOWN("UNKNOWN") ; diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/parser/TestStatement.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/parser/TestStatement.java index 4173d1acd6d1..e40cf5355200 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/parser/TestStatement.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/parser/TestStatement.java @@ -17,9 +17,10 @@ import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.tree.AssertStatement; -import java.util.NoSuchElementException; import java.util.Objects; +import java.util.Optional; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Stream; /** @@ -71,7 +72,7 @@ private TestStatement( } } - public void handle( + public void consume( final Consumer parsedStatementConsumer, final Consumer assertStatementConsumer, final Consumer testDirectiveConsumer @@ -85,40 +86,28 @@ public void handle( } } - public boolean hasEngineStatement() { - return engineStatement != null; - } - - public ParsedStatement getEngineStatement() { - if (!hasEngineStatement()) { - throw new NoSuchElementException("engineStatement"); - } - - return engineStatement; - } - - public boolean hasAssertStatement() { - return assertStatement != null; - } - - public AssertStatement getAssertStatement() { - if (!hasAssertStatement()) { - throw new NoSuchElementException("assertStatement"); + public T apply( + final Function parsedStatementFunction, + final Function assertStatementFunction, + final Function testDirectiveFunction + ) { + if (engineStatement != null) { + return parsedStatementFunction.apply(engineStatement); + } else if (assertStatement != null) { + return assertStatementFunction.apply(assertStatement); + } else if (directive != null) { + return testDirectiveFunction.apply(directive); } - return assertStatement; - } - - public boolean hasDirective() { - return directive != null; + throw new IllegalStateException("Should not happen if TestStatement is properly implemented"); } - public TestDirective getDirective() { - if (!hasDirective()) { - throw new NoSuchElementException("directive"); + public Optional consumeDirective(final Function fun) { + if (directive == null) { + return Optional.empty(); } - return directive; + return Optional.ofNullable(fun.apply(directive)); } @Override diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/driver/KsqlTesterTest.java b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/driver/KsqlTesterTest.java new file mode 100644 index 000000000000..af700969ec13 --- /dev/null +++ b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/driver/KsqlTesterTest.java @@ -0,0 +1,350 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.test.driver; + +import com.google.common.collect.ImmutableMap; +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.KsqlExecutionContext.ExecuteResult; +import io.confluent.ksql.ServiceInfo; +import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.engine.generic.GenericRecordFactory; +import io.confluent.ksql.engine.generic.KsqlGenericRecord; +import io.confluent.ksql.logging.processing.NoopProcessingLogContext; +import io.confluent.ksql.metastore.MetaStoreImpl; +import io.confluent.ksql.metastore.model.DataSource; +import io.confluent.ksql.parser.AssertTable; +import io.confluent.ksql.parser.KsqlParser.ParsedStatement; +import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.parser.tree.AssertStatement; +import io.confluent.ksql.parser.tree.AssertStream; +import io.confluent.ksql.parser.tree.AssertValues; +import io.confluent.ksql.parser.tree.CreateAsSelect; +import io.confluent.ksql.parser.tree.CreateSource; +import io.confluent.ksql.parser.tree.InsertValues; +import io.confluent.ksql.parser.tree.SetProperty; +import io.confluent.ksql.parser.tree.UnsetProperty; +import io.confluent.ksql.properties.PropertyOverrider; +import io.confluent.ksql.query.id.SequentialQueryIdGenerator; +import io.confluent.ksql.schema.ksql.PersistenceSchema; +import io.confluent.ksql.schema.utils.FormatOptions; +import io.confluent.ksql.serde.GenericRowSerDe; +import io.confluent.ksql.serde.SerdeOption; +import io.confluent.ksql.services.FakeKafkaTopicClient; +import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.services.TestServiceContext; +import io.confluent.ksql.statement.ConfiguredStatement; +import io.confluent.ksql.test.KsqlTestException; +import io.confluent.ksql.test.driver.TestDriverPipeline.TopicInfo; +import io.confluent.ksql.test.parser.SqlTestLoader; +import io.confluent.ksql.test.parser.TestDirective; +import io.confluent.ksql.test.parser.TestStatement; +import io.confluent.ksql.test.tools.TestFunctionRegistry; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.PersistentQueryMetadata; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class KsqlTesterTest { + + private static final String TEST_DIR = "/sql-tests"; + + private static final ImmutableMap BASE_CONFIG = ImmutableMap + .builder() + .put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:0") + .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 0) + .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + .put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0) + .put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 0L) + .put(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "some.ksql.service.id") + .build(); + + // parameterized + private final Path file; + private final List statements; + + // initialized in setUp + private ServiceContext serviceContext; + private KsqlEngine engine; + private KsqlConfig config; + private TestDriverPipeline driverPipeline; + private FakeKafkaTopicClient topicClient; + + // populated during run + private Map overrides; + + // populated during execution to handle the expected exception + // scenario - don't use Matchers because they do not create very + // user friendly error messages + private Class expectedException; + private String expectedMessage; + + @Parameterized.Parameters(name = "{0}") + public static Object[][] data() throws IOException { + final Path testDir = Paths.get(KsqlTesterTest.class.getResource(TEST_DIR).getFile()); + final SqlTestLoader loader = new SqlTestLoader(); + return loader.loadDirectory(testDir) + .stream() + .map(SqlTestLoader.Test::asObjectArray) + .toArray(Object[][]::new); + } + + public KsqlTesterTest(final String testCase, final Path file, final List statements) { + this.file = Objects.requireNonNull(file, "file"); + this.statements = statements; + } + + @Before + public void setUp() { + final MockSchemaRegistryClient srClient = new MockSchemaRegistryClient(); + this.topicClient = new FakeKafkaTopicClient(); + this.serviceContext = TestServiceContext.create(topicClient, () -> srClient); + this.config = new KsqlConfig(BASE_CONFIG); + + final MetaStoreImpl metaStore = new MetaStoreImpl(TestFunctionRegistry.INSTANCE.get()); + this.engine = new KsqlEngine( + serviceContext, + NoopProcessingLogContext.INSTANCE, + metaStore, + ServiceInfo.create(config), + new SequentialQueryIdGenerator() + ); + + this.expectedException = null; + this.expectedMessage = null; + + this.overrides = new HashMap<>(); + this.driverPipeline = new TestDriverPipeline(); + } + + @After + public void close() { + engine.close(true); + serviceContext.close(); + } + + @Test + public void test() { + for (final TestStatement testStatement : statements) { + try { + testStatement.consume(this::execute, this::doAssert, this::directive); + } catch (final Exception e) { + handleExpectedException(testStatement, e); + return; + } + } + } + + @SuppressWarnings("unchecked") + private void execute(final ParsedStatement parsedStatement) { + final PreparedStatement engineStatement = engine.prepare(parsedStatement); + final ConfiguredStatement configured = ConfiguredStatement.of( + engineStatement, overrides, config); + + createTopics(engineStatement); + + if (engineStatement.getStatement() instanceof InsertValues) { + pipeInput((ConfiguredStatement) configured); + return; + } else if (engineStatement.getStatement() instanceof SetProperty) { + PropertyOverrider.set((ConfiguredStatement) configured, overrides); + return; + } else if (engineStatement.getStatement() instanceof UnsetProperty) { + PropertyOverrider.unset((ConfiguredStatement) configured, overrides); + return; + } + + final ExecuteResult result = engine.execute( + serviceContext, + configured); + + // is DDL statement + if (!result.getQuery().isPresent()) { + return; + } + + final PersistentQueryMetadata query = (PersistentQueryMetadata) result.getQuery().get(); + final Topology topology = query.getTopology(); + final Properties properties = new Properties(); + properties.putAll(query.getStreamsProperties()); + + final TopologyTestDriver driver = new TopologyTestDriver(topology, properties); + query.closeAndThen(qm -> driver.close()); + + final List inputTopics = query + .getSourceNames() + .stream() + .map(sn -> engine.getMetaStore().getSource(sn)) + .map(ds -> new TopicInfo(ds.getKafkaTopicName(), keySerde(ds), valueSerde(ds))) + .collect(Collectors.toList()); + + final DataSource output = engine.getMetaStore().getSource(query.getSinkName()); + final TopicInfo outputInfo = new TopicInfo( + output.getKafkaTopicName(), + keySerde(output), + valueSerde(output) + ); + + driverPipeline.addDriver(driver, inputTopics, outputInfo); + } + + private void createTopics(final PreparedStatement engineStatement) { + if (engineStatement.getStatement() instanceof CreateSource) { + final CreateSource statement = (CreateSource) engineStatement.getStatement(); + topicClient.preconditionTopicExists( + statement.getProperties().getKafkaTopic(), + statement.getProperties().getPartitions().orElse(1), + statement.getProperties().getReplicas().orElse((short) 1), + ImmutableMap.of() + ); + } else if (engineStatement.getStatement() instanceof CreateAsSelect) { + final CreateAsSelect statement = (CreateAsSelect) engineStatement.getStatement(); + topicClient.preconditionTopicExists( + statement.getProperties().getKafkaTopic() + .orElse(statement.getName().toString(FormatOptions.noEscape()).toUpperCase()), + statement.getProperties().getPartitions().orElse(1), + statement.getProperties().getReplicas().orElse((short) 1), + ImmutableMap.of() + ); + } + } + + private void pipeInput(final ConfiguredStatement statement) { + final InsertValues insertValues = statement.getStatement(); + final DataSource dataSource = engine.getMetaStore().getSource(insertValues.getTarget()); + if (dataSource == null) { + throw new KsqlException("Unknown data source " + insertValues.getTarget()); + } + + final KsqlGenericRecord record = new GenericRecordFactory( + config, engine.getMetaStore(), System::currentTimeMillis + ).build( + insertValues.getColumns(), + insertValues.getValues(), + dataSource.getSchema(), + dataSource.getDataSourceType() + ); + driverPipeline.pipeInput( + dataSource.getKafkaTopicName(), + record.key, + record.value, + record.ts + ); + } + + private void doAssert(final AssertStatement assertStatement) { + if (assertStatement instanceof AssertValues) { + AssertExecutor.assertValues(engine, config, (AssertValues) assertStatement, driverPipeline); + } else if (assertStatement instanceof AssertStream) { + AssertExecutor.assertStream(((AssertStream) assertStatement)); + } else if (assertStatement instanceof AssertTable) { + AssertExecutor.assertTable(((AssertTable) assertStatement)); + } + } + + private Serde keySerde(final DataSource sinkSource) { + return sinkSource.getKsqlTopic().getKeyFormat() + .getFormat() + .getSerdeFactory( + sinkSource.getKsqlTopic().getKeyFormat().getFormatInfo() + ).createSerde( + PersistenceSchema.from( + sinkSource.getSchema().keyConnectSchema(), + sinkSource.getSerdeOptions().contains(SerdeOption.UNWRAP_SINGLE_VALUES)), + config, + serviceContext.getSchemaRegistryClientFactory(), + Struct.class + ); + } + + private Serde valueSerde(final DataSource sinkSource) { + return GenericRowSerDe.from( + sinkSource.getKsqlTopic().getValueFormat().getFormatInfo(), + PersistenceSchema.from( + sinkSource.getSchema().valueConnectSchema(), + sinkSource.getSerdeOptions().contains(SerdeOption.UNWRAP_SINGLE_VALUES)), + config, + serviceContext.getSchemaRegistryClientFactory(), + "", + NoopProcessingLogContext.INSTANCE + ); + } + + private void directive(final TestDirective directive) { + try { + switch (directive.getType()) { + case EXPECTED_ERROR: + handleExpectedClass(directive); + break; + case EXPECTED_MESSAGE: + handleExpectedMessage(directive); + break; + default: + } + } catch (final Exception e) { + throw new KsqlException("Failed to handle directive " + directive, e); + } + } + + private void handleExpectedException(final TestStatement testStatement, final Exception e) { + if (expectedException == null && expectedMessage == null) { + throw new KsqlTestException(testStatement, file, e); + } + + if (!e.getMessage().contains(expectedMessage)) { + throw new KsqlTestException( + testStatement, + file, + "Expected exception with message \"" + expectedMessage + "\" but got " + e); + } + + if (!expectedException.isInstance(e)) { + throw new KsqlTestException( + testStatement, + file, + "Expected exception with class " + expectedException + " but got " + e); + } + } + + @SuppressWarnings("unchecked") + private void handleExpectedClass(final TestDirective directive) throws ClassNotFoundException { + expectedException = (Class) Class.forName(directive.getContents()); + } + + private void handleExpectedMessage(final TestDirective directive) { + expectedMessage = directive.getContents(); + } +} diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/parser/SqlTestReaderTest.java b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/parser/SqlTestReaderTest.java index 0241dc534998..9e3bf3ac558e 100644 --- a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/parser/SqlTestReaderTest.java +++ b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/parser/SqlTestReaderTest.java @@ -50,28 +50,28 @@ public void shouldParseBasicTest() { assertThat(reader.next(), is(TestStatement.of(new TestDirective(Type.TEST, "test1", LOC)))); assertThat(reader.hasNext(), is(true)); - reader.next().handle( + reader.next().consume( s -> assertThat(s.getStatementText(), containsString("CREATE STREAM foo")), s -> assertThat("unexpected statement " + s, false), s -> assertThat("unexpected statement " + s, false) ); assertThat(reader.hasNext(), is(true)); - reader.next().handle( + reader.next().consume( s -> assertThat(s.getStatementText(), containsString("CREATE STREAM bar")), s -> assertThat("unexpected statement " + s, false), s -> assertThat("unexpected statement " + s, false) ); assertThat(reader.hasNext(), is(true)); - reader.next().handle( + reader.next().consume( s -> assertThat(s.getStatementText(), containsString("INSERT INTO")), s -> assertThat("unexpected statement " + s, false), s -> assertThat("unexpected statement " + s, false) ); assertThat(reader.hasNext(), is(true)); - reader.next().handle( + reader.next().consume( s -> assertThat("unexpected statement " + s, false), s -> assertThat(s, instanceOf(AssertValues.class)), s -> assertThat("unexpected statement " + s, false) @@ -93,14 +93,14 @@ public void shouldHandleMultilineStatements() { assertThat(reader.next(), is(TestStatement.of(new TestDirective(Type.TEST, "test1", LOC)))); assertThat(reader.hasNext(), is(true)); - reader.next().handle( + reader.next().consume( s -> assertThat(s.getStatementText(), containsString("CREATE STREAM foo")), s -> assertThat("unexpected statement " + s, false), s -> assertThat("unexpected statement " + s, false) ); assertThat(reader.hasNext(), is(true)); - reader.next().handle( + reader.next().consume( s -> assertThat(s.getStatementText(), containsString("CREATE STREAM bar")), s -> assertThat("unexpected statement " + s, false), s -> assertThat("unexpected statement " + s, false) @@ -124,7 +124,7 @@ public void shouldReadDirectivesAtEnd() { assertThat(reader.next(), is(TestStatement.of(new TestDirective(Type.TEST, "test1", LOC)))); assertThat(reader.hasNext(), is(true)); - reader.next().handle( + reader.next().consume( s -> assertThat(s.getStatementText(), containsString("CREATE STREAM foo")), s -> assertThat("unexpected statement " + s, false), s -> assertThat("unexpected statement " + s, false) @@ -151,7 +151,7 @@ public void shouldIgnoreComments() { assertThat(reader.next(), is(TestStatement.of(new TestDirective(Type.TEST, "test1", LOC)))); assertThat(reader.hasNext(), is(true)); - reader.next().handle( + reader.next().consume( s -> assertThat(s.getStatementText(), containsString("CREATE STREAM foo")), s -> assertThat("unexpected statement " + s, false), s -> assertThat("unexpected statement " + s, false) diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/parser/TestDirectiveTest.java b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/parser/TestDirectiveTest.java index 9cb13e69dc3b..2d3ca79b7848 100644 --- a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/parser/TestDirectiveTest.java +++ b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/parser/TestDirectiveTest.java @@ -52,7 +52,6 @@ public void shouldParseKnownDirectivesAtLocation() { // Then: assertThat(directive, Matchers.is(new TestDirective(Type.TEST, "bar", LOC))); - assertThat(directive.getLocation(), Matchers.is(new NodeLocation(1, 10))); } @Test diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/asserts.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/asserts.json new file mode 100644 index 000000000000..aee91ece7b08 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/asserts.json @@ -0,0 +1,14 @@ +{ + "tests": [ + { + "name": "should not parse ASSERT", + "statements": [ + "ASSERT VALUES foo (id INT) VALUES (123);" + ], + "expectedException": { + "type": "io.confluent.ksql.parser.exception.ParseFailedException", + "message": "mismatched input 'ASSERT'" + } + } + ] +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/sql-tests/test.sql b/ksqldb-functional-tests/src/test/resources/sql-tests/test.sql new file mode 100644 index 000000000000..341635b02587 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/sql-tests/test.sql @@ -0,0 +1,119 @@ +-- this test file is a "meta-test" that tests the basic functionality of the KsqlTester +-- eventually, we plan to remove this test in favor of migrating the QTTs over to use this format +-- directly + +---------------------------------------------------------------------------------------------------- +--@test: basic test +---------------------------------------------------------------------------------------------------- +CREATE STREAM foo (id INT KEY, col1 INT) WITH (kafka_topic='foo', value_format='JSON'); +CREATE STREAM bar AS SELECT * FROM foo; + +INSERT INTO foo (rowtime, id, col1) VALUES (1, 1, 1); +ASSERT VALUES bar (rowtime, id, col1) VALUES (1, 1, 1); + +---------------------------------------------------------------------------------------------------- +--@test: basic test with tables +---------------------------------------------------------------------------------------------------- +CREATE TABLE foo (id INT PRIMARY KEY, col1 INT) WITH (kafka_topic='foo', value_format='JSON'); +CREATE TABLE bar AS SELECT * FROM foo; + +INSERT INTO foo (rowtime, id, col1) VALUES (1, 1, 1); +ASSERT VALUES bar (rowtime, id, col1) VALUES (1, 1, 1); + +---------------------------------------------------------------------------------------------------- +--@test: basic test without rowtime comparison +---------------------------------------------------------------------------------------------------- +CREATE STREAM foo (id INT KEY, col1 INT) WITH (kafka_topic='foo', value_format='JSON'); +CREATE STREAM bar AS SELECT * FROM foo; + +INSERT INTO foo (id, col1) VALUES (1, 1); +ASSERT VALUES bar (id, col1) VALUES (1, 1); + +---------------------------------------------------------------------------------------------------- +--@test: basic test with aggregation +---------------------------------------------------------------------------------------------------- +CREATE STREAM foo (id INT KEY, col1 INT) WITH (kafka_topic='foo', value_format='JSON'); +CREATE TABLE bar AS SELECT id, COUNT(*) as count FROM foo GROUP BY id; + +INSERT INTO foo (rowtime, id, col1) VALUES (1, 1, 1); +ASSERT VALUES bar (rowtime, id, count) VALUES (1, 1, 1); + +---------------------------------------------------------------------------------------------------- +--@test: basic test (format AVRO) +---------------------------------------------------------------------------------------------------- +CREATE STREAM foo (id INT KEY, col1 INT) WITH (kafka_topic='foo', value_format='AVRO'); +CREATE STREAM bar AS SELECT * FROM foo; + +INSERT INTO foo (rowtime, id, col1) VALUES (1, 1, 1); +ASSERT VALUES bar (rowtime, id, col1) VALUES (1, 1, 1); + +---------------------------------------------------------------------------------------------------- +--@test: basic chained test +---------------------------------------------------------------------------------------------------- +CREATE STREAM foo (id INT KEY, col1 INT) WITH (kafka_topic='foo', value_format='JSON'); +CREATE STREAM bar AS SELECT * FROM foo; +CREATE STREAM baz AS SELECT * FROM bar; + +INSERT INTO foo (rowtime, id, col1) VALUES (1, 1, 1); +ASSERT VALUES baz (rowtime, id, col1) VALUES (1, 1, 1); + +---------------------------------------------------------------------------------------------------- +--@test: basic join test +---------------------------------------------------------------------------------------------------- +CREATE STREAM s (id INT KEY, foo INT) WITH (kafka_topic='s', value_format='JSON'); +CREATE TABLE t (id INT PRIMARY KEY, bar INT) WITH (kafka_topic='t', value_format='JSON'); + +CREATE STREAM j AS SELECT s.id, s.foo, t.bar FROM s JOIN t ON s.id = t.id; + +INSERT INTO t (rowtime, id, bar) VALUES (1, 1, 1); +INSERT INTO s (rowtime, id, foo) VALUES (1, 1, 2); + +ASSERT VALUES j (rowtime, s_id, foo, bar) VALUES (1, 1, 2, 1); + +---------------------------------------------------------------------------------------------------- +--@test: basic create or replace test +---------------------------------------------------------------------------------------------------- +CREATE STREAM foo (id INT KEY, col1 INT) WITH (kafka_topic='foo', value_format='JSON'); +CREATE STREAM bar AS SELECT * FROM foo; + +INSERT INTO foo (rowtime, id, col1) VALUES (1, 1, 1); +ASSERT VALUES bar (rowtime, id, col1) VALUES (1, 1, 1); + +SET 'ksql.create.or.replace.enabled'='true'; +CREATE OR REPLACE STREAM bar AS SELECT id, col1 + 1 as col1 FROM foo; + +INSERT INTO foo (rowtime, id, col1) VALUES (1, 1, 1); +ASSERT VALUES bar (rowtime, id, col1) VALUES (1, 1, 2); + +---------------------------------------------------------------------------------------------------- +--@test: make sure properties do not carry over +--@expected.error: io.confluent.ksql.util.KsqlStatementException +--@expected.message: Cannot add stream 'BAR': A stream with the same name already exists +---------------------------------------------------------------------------------------------------- +CREATE STREAM foo (id INT KEY, col1 INT) WITH (kafka_topic='foo', value_format='JSON'); +CREATE STREAM bar AS SELECT * FROM foo; + +INSERT INTO foo (rowtime, id, col1) VALUES (1, 1, 1); +ASSERT VALUES bar (rowtime, id, col1) VALUES (1, 1, 1); + +CREATE OR REPLACE STREAM bar AS SELECT id, col1 + 1 as col1 FROM foo; + +---------------------------------------------------------------------------------------------------- +--@test: bad assert statement should fail + +--@expected.error: io.confluent.ksql.util.KsqlException +--@expected.message: Expected record does not match actual +---------------------------------------------------------------------------------------------------- +CREATE STREAM foo (id INT KEY, col1 INT) WITH (kafka_topic='foo', value_format='JSON'); +CREATE STREAM bar AS SELECT * FROM foo; + +INSERT INTO foo (rowtime, id, col1) VALUES (1, 1, 1); +ASSERT VALUES bar (rowtime, id, col1) VALUES (2, 2, 2); + +---------------------------------------------------------------------------------------------------- +--@test: bad engine statement should fail + +--@expected.error: io.confluent.ksql.util.KsqlException +--@expected.message: Exception while preparing statement: FOO does not exist. +---------------------------------------------------------------------------------------------------- +CREATE STREAM bar AS SELECT * FROM foo; diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java index 6816112a5979..fad00be142ca 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java @@ -20,9 +20,13 @@ import com.google.common.base.Strings; import io.confluent.ksql.execution.expression.formatter.ExpressionFormatter; import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.Name; +import io.confluent.ksql.parser.properties.with.CreateSourceProperties; import io.confluent.ksql.parser.tree.AliasedRelation; import io.confluent.ksql.parser.tree.AllColumns; +import io.confluent.ksql.parser.tree.AssertStream; +import io.confluent.ksql.parser.tree.AssertValues; import io.confluent.ksql.parser.tree.AstNode; import io.confluent.ksql.parser.tree.AstVisitor; import io.confluent.ksql.parser.tree.CreateAsSelect; @@ -56,6 +60,7 @@ import io.confluent.ksql.parser.tree.SingleColumn; import io.confluent.ksql.parser.tree.Table; import io.confluent.ksql.parser.tree.TableElement; +import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.parser.tree.TerminateQuery; import io.confluent.ksql.parser.tree.UnsetProperty; import io.confluent.ksql.query.QueryId; @@ -249,6 +254,18 @@ protected Void visitCreateTable(final CreateTable node, final Integer indent) { return null; } + @Override + public Void visitAssertStream(final AssertStream node, final Integer context) { + formatAssertSource(node.getStatement(), "STREAM"); + return null; + } + + @Override + public Void visitAssertTable(final AssertTable node, final Integer context) { + formatAssertSource(node.getStatement(), "TABLE"); + return null; + } + @Override protected Void visitExplain(final Explain node, final Integer indent) { builder.append("EXPLAIN "); @@ -314,22 +331,45 @@ protected Void visitInsertValues(final InsertValues node, final Integer context) builder.append(escapedName(node.getTarget())); builder.append(" "); - if (!node.getColumns().isEmpty()) { - builder.append(node.getColumns() + visitColumns(node.getColumns()); + + builder.append("VALUES "); + + visitExpressionList(node.getValues()); + + return null; + } + + private void visitColumns(final List columns) { + if (!columns.isEmpty()) { + builder.append(columns .stream() .map(SqlFormatter::escapedName) .collect(Collectors.joining(", ", "(", ") "))); } + } - builder.append("VALUES "); - + private void visitExpressionList(final List expressions) { builder.append("("); builder.append( - node.getValues() + expressions .stream() .map(SqlFormatter::formatExpression) .collect(Collectors.joining(", "))); builder.append(")"); + } + + @Override + public Void visitAssertValues(final AssertValues node, final Integer context) { + builder.append("ASSERT VALUES "); + builder.append(escapedName(node.getStatement().getTarget())); + builder.append(" "); + + visitColumns(node.getStatement().getColumns()); + + builder.append("VALUES "); + + visitExpressionList(node.getStatement().getValues()); return null; } @@ -469,7 +509,28 @@ private void formatCreate(final CreateSource node, final String type) { builder.append(escapedName(node.getName())); - final String elements = node.getElements().stream() + formatTableElements(node.getElements()); + formatTableProperties(node.getProperties()); + + builder.append(";"); + } + + private void formatAssertSource(final CreateSource node, final String type) { + builder.append("ASSERT "); + + builder.append(type); + builder.append(" "); + + builder.append(escapedName(node.getName())); + + formatTableElements(node.getElements()); + formatTableProperties(node.getProperties()); + + builder.append(";"); + } + + private void formatTableElements(final TableElements tableElements) { + final String elements = tableElements.stream() .map(Formatter::formatTableElement) .collect(Collectors.joining(", ")); @@ -479,16 +540,16 @@ private void formatCreate(final CreateSource node, final String type) { .append(elements) .append(")"); } + } - final String tableProps = node.getProperties().toString(); + private void formatTableProperties(final CreateSourceProperties properties) { + final String tableProps = properties.toString(); if (!tableProps.isEmpty()) { builder .append(" WITH (") .append(tableProps) .append(")"); } - - builder.append(";"); } private void formatCreateAs( From 216d8640707cf0ed3e2c08881531730f4507d8c8 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Wed, 19 Aug 2020 11:36:35 -0700 Subject: [PATCH 2/4] refactor: use TestLoader --- .../ksql/test/loader/TestLoader.java | 3 +- .../ksql/test/parser/SqlTestLoader.java | 53 +++++++++++-------- .../ksql/test/driver/KsqlTesterTest.java | 8 +-- 3 files changed, 38 insertions(+), 26 deletions(-) diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/loader/TestLoader.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/loader/TestLoader.java index eaf2f8b0ffaf..2e6fb3386ee0 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/loader/TestLoader.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/loader/TestLoader.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableList; import io.confluent.ksql.test.tools.Test; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.stream.Stream; @@ -34,7 +35,7 @@ public interface TestLoader { // mvn test -pl ksql-engine -Dtest=QueryTranslationTest -Dksql.test.files=test1.json,test2,json String KSQL_TEST_FILES = "ksql.test.files"; - Stream load(); + Stream load() throws IOException; static List getWhiteList() { final String ksqlTestFiles = System.getProperty(KSQL_TEST_FILES, "").trim(); diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/parser/SqlTestLoader.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/parser/SqlTestLoader.java index b9539effc39f..e422e73dd6c1 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/parser/SqlTestLoader.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/parser/SqlTestLoader.java @@ -17,7 +17,11 @@ import com.google.common.collect.ImmutableList; import io.confluent.ksql.test.KsqlTestException; +import io.confluent.ksql.test.loader.TestLoader; +import io.confluent.ksql.test.model.TestLocation; +import io.confluent.ksql.test.parser.SqlTestLoader.SqlTest; import io.confluent.ksql.test.parser.TestDirective.Type; +import io.confluent.ksql.test.tools.Test; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -27,43 +31,45 @@ import java.util.Optional; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * The {@code SqlTestLoader} loads the test files that should be run * by the Ksql testing tool based on a path and optional filters. */ -public class SqlTestLoader { +public class SqlTestLoader implements TestLoader { - private final Predicate shouldRun; + private final Predicate shouldRun; + private final Path path; - public SqlTestLoader() { - this(t -> true); + public SqlTestLoader(final Path path) { + this(t -> true, path); } /** * @param testFilter filters out which tests to run + * @param path the top-level dir to load */ - public SqlTestLoader(final Predicate testFilter) { + public SqlTestLoader(final Predicate testFilter, final Path path) { this.shouldRun = Objects.requireNonNull(testFilter, "testFilter"); + this.path = Objects.requireNonNull(path, "path"); } - /** - * @param path a directory containing all test files to run - * - * @return a list of tests to run - */ - public List loadDirectory(final Path path) throws IOException { - final SqlTestLoader loader = new SqlTestLoader(); + @Override + public Stream load() throws IOException { final List files = Files .find(path, Integer.MAX_VALUE, (filePath, fileAttr) -> fileAttr.isRegularFile()) .collect(Collectors.toList()); - final ImmutableList.Builder builder = ImmutableList.builder(); + final ImmutableList.Builder builder = ImmutableList.builder(); + final List whiteList = TestLoader.getWhiteList(); for (final Path file : files) { - builder.addAll(loader.loadTest(file)); + if (whiteList.isEmpty() || whiteList.stream().anyMatch(file::endsWith)) { + builder.addAll(loadTest(file)); + } } - return builder.build(); + return builder.build().stream(); } /** @@ -71,8 +77,8 @@ public List loadDirectory(final Path path) throws IOException { * * @return the list of tests to run */ - public List loadTest(final Path path) throws IOException { - final ImmutableList.Builder builder = ImmutableList.builder(); + public List loadTest(final Path path) throws IOException { + final ImmutableList.Builder builder = ImmutableList.builder(); List statements = null; String name = null; @@ -87,7 +93,7 @@ public List loadTest(final Path path) throws IOException { if (nextName.isPresent()) { // flush the previous test if (statements != null) { - builder.add(new Test(path, name, statements)); + builder.add(new SqlTest(path, name, statements)); } statements = new ArrayList<>(); @@ -99,7 +105,7 @@ public List loadTest(final Path path) throws IOException { statements.add(statement); } - builder.add(new Test(path, name, statements)); + builder.add(new SqlTest(path, name, statements)); return builder.build().stream().filter(shouldRun).collect(ImmutableList.toImmutableList()); } @@ -107,13 +113,13 @@ public List loadTest(final Path path) throws IOException { * Represents a tuple of (test name, file, test statements) that constitute a ksql * test. */ - public static class Test { + public static class SqlTest implements Test { private final Path file; private final String name; private final List statements; - public Test(final Path file, final String name, final List statements) { + public SqlTest(final Path file, final String name, final List statements) { this.file = file; this.name = name; this.statements = statements; @@ -127,6 +133,11 @@ public String getName() { return name; } + @Override + public TestLocation getTestLocation() { + return () -> file; + } + public List getStatements() { return statements; } diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/driver/KsqlTesterTest.java b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/driver/KsqlTesterTest.java index af700969ec13..cc76e3911ea2 100644 --- a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/driver/KsqlTesterTest.java +++ b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/driver/KsqlTesterTest.java @@ -50,6 +50,7 @@ import io.confluent.ksql.test.KsqlTestException; import io.confluent.ksql.test.driver.TestDriverPipeline.TopicInfo; import io.confluent.ksql.test.parser.SqlTestLoader; +import io.confluent.ksql.test.parser.SqlTestLoader.SqlTest; import io.confluent.ksql.test.parser.TestDirective; import io.confluent.ksql.test.parser.TestStatement; import io.confluent.ksql.test.tools.TestFunctionRegistry; @@ -115,10 +116,9 @@ public class KsqlTesterTest { @Parameterized.Parameters(name = "{0}") public static Object[][] data() throws IOException { final Path testDir = Paths.get(KsqlTesterTest.class.getResource(TEST_DIR).getFile()); - final SqlTestLoader loader = new SqlTestLoader(); - return loader.loadDirectory(testDir) - .stream() - .map(SqlTestLoader.Test::asObjectArray) + final SqlTestLoader loader = new SqlTestLoader(testDir); + return loader.load() + .map(SqlTest::asObjectArray) .toArray(Object[][]::new); } From 96abb8c59f8f9a4d6c8f208659f783bc2d2afd97 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Wed, 19 Aug 2020 12:50:28 -0700 Subject: [PATCH 3/4] chore: fix findbugs --- .../main/java/io/confluent/ksql/test/KsqlTestException.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/KsqlTestException.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/KsqlTestException.java index a854b3341cb1..e9ebbb2c4901 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/KsqlTestException.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/KsqlTestException.java @@ -22,6 +22,7 @@ import io.confluent.ksql.test.model.LocationWithinFile; import io.confluent.ksql.test.parser.TestDirective; import io.confluent.ksql.test.parser.TestStatement; +import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.ParserUtil; import java.nio.file.Path; import java.util.Objects; @@ -33,7 +34,7 @@ * to automatically populate the statement that produced it as well as the * location in the file. */ -public class KsqlTestException extends AssertionError { +public class KsqlTestException extends KsqlException { public KsqlTestException( final TestStatement statement, @@ -72,7 +73,7 @@ private static String engineMessage( parsedStatement.getStatement()); return String.format( - "Test failure for statement `%s` (%s):\n\t%s\n\t%s", + "Test failure for statement `%s` (%s):%n\t%s%n\t%s", parsedStatement.getStatementText(), loc.map(NodeLocation::toString).orElse("unknown"), message, From 1a16081468fac86ad36891ffd253b6088cc690ea Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Wed, 19 Aug 2020 12:51:11 -0700 Subject: [PATCH 4/4] chore: actually fix findbugs --- .../main/java/io/confluent/ksql/test/KsqlTestException.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/KsqlTestException.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/KsqlTestException.java index e9ebbb2c4901..c9736ac790d0 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/KsqlTestException.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/KsqlTestException.java @@ -89,7 +89,7 @@ private static String assertMessage( final Path file ) { return String.format( - "Test failure for assert `%s` (%s):\n\t%s\n\t%s", + "Test failure for assert `%s` (%s):%n\t%s%n\t%s", SqlFormatter.formatSql(assertStatement), assertStatement.getLocation().map(Objects::toString).orElse("unknown"), message, @@ -105,7 +105,7 @@ private static String directiveMessage( final Path file ) { return String.format( - "Test failure during directive evaluation `%s` (%s):\n\t%s\t%s", + "Test failure during directive evaluation `%s` (%s):%n\t%s%n\t%s", directive, directive.getLocation(), message,