From e8222bc6b4768defed6ad32e4c4d7ef831690485 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Fri, 26 Jul 2019 11:58:20 -0700 Subject: [PATCH 1/3] feat(ksql-connect): introduce syntax for CREATE EXTERNAL (syntax only) --- .../io/confluent/ksql/parser/SqlBase.g4 | 5 + .../io/confluent/ksql/parser/AstBuilder.java | 17 ++++ .../ksql/parser/tree/CreateConnector.java | 98 +++++++++++++++++++ .../confluent/ksql/parser/KsqlParserTest.java | 29 ++++++ .../ksql/parser/tree/CreateConnectorTest.java | 55 +++++++++++ 5 files changed, 204 insertions(+) create mode 100644 ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateConnector.java create mode 100644 ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateConnectorTest.java diff --git a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index 572e628f0137..8ef0da321bba 100644 --- a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -56,6 +56,7 @@ statement (WITH tableProperties)? #createTable | CREATE TABLE (IF NOT EXISTS)? qualifiedName (WITH tableProperties)? AS query #createTableAs + | CREATE (SINK | SOURCE) CONNECTOR identifier WITH tableProperties #createConnector | INSERT INTO qualifiedName query (PARTITION BY identifier)? #insertInto | INSERT INTO qualifiedName (columns)? VALUES values #insertValues | DROP STREAM (IF EXISTS)? qualifiedName (DELETE TOPIC)? #dropStream @@ -316,6 +317,7 @@ nonReserved | EXPLAIN | ANALYZE | TYPE | SET | RESET | IF + | CONNECTOR | SOURCE | SINK | KEY ; @@ -428,6 +430,9 @@ RUN: 'RUN'; SCRIPT: 'SCRIPT'; DECIMAL: 'DECIMAL'; KEY: 'KEY'; +CONNECTOR: 'CONNECTOR'; +SINK: 'SINK'; +SOURCE: 'SOURCE'; IF: 'IF'; diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index 6ecb1d18c5ec..934c6e12443a 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.model.DataSource; +import io.confluent.ksql.parser.SqlBaseParser.CreateConnectorContext; import io.confluent.ksql.parser.SqlBaseParser.InsertValuesContext; import io.confluent.ksql.parser.SqlBaseParser.IntervalClauseContext; import io.confluent.ksql.parser.SqlBaseParser.LimitClauseContext; @@ -42,6 +43,8 @@ import io.confluent.ksql.parser.tree.BooleanLiteral; import io.confluent.ksql.parser.tree.Cast; import io.confluent.ksql.parser.tree.ComparisonExpression; +import io.confluent.ksql.parser.tree.CreateConnector; +import io.confluent.ksql.parser.tree.CreateConnector.Type; import io.confluent.ksql.parser.tree.CreateStream; import io.confluent.ksql.parser.tree.CreateStreamAsSelect; import io.confluent.ksql.parser.tree.CreateTable; @@ -265,6 +268,20 @@ public Node visitCreateTableAs(final SqlBaseParser.CreateTableAsContext context) ); } + @Override + public Node visitCreateConnector(final CreateConnectorContext context) { + final Map properties = processTableProperties(context.tableProperties()); + final String name = ParserUtil.getIdentifierText(context.identifier()); + final CreateConnector.Type type = context.SOURCE() != null ? Type.SOURCE : Type.SINK; + + return new CreateConnector( + getLocation(context), + name, + properties, + type + ); + } + @Override public Node visitInsertInto(final SqlBaseParser.InsertIntoContext context) { diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateConnector.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateConnector.java new file mode 100644 index 000000000000..c5982d7e95b3 --- /dev/null +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateConnector.java @@ -0,0 +1,98 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.parser.tree; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import jdk.nashorn.internal.ir.annotations.Immutable; + +@Immutable +public class CreateConnector extends Statement { + + public enum Type { + SOURCE, + SINK + } + + private final String name; + private final ImmutableMap config; + private final Type type; + + public CreateConnector( + final Optional location, + final String name, + final Map config, + final Type type + ) { + super(location); + this.name = Objects.requireNonNull(name, "name"); + this.config = ImmutableMap.copyOf(Objects.requireNonNull(config, "config")); + this.type = Objects.requireNonNull(type, "type"); + } + + public CreateConnector( + final String name, + final Map config, + final Type type + ) { + this(Optional.empty(), name, config, type); + } + + + public String getName() { + return name; + } + + public Map getConfig() { + return config; + } + + public Type getType() { + return type; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + final CreateConnector that = (CreateConnector) o; + return Objects.equals(name, that.name) + && Objects.equals(config, that.config) + && Objects.equals(type, that.type); + } + + @Override + public int hashCode() { + return Objects.hash(name, config, type); + } + + @Override + public String toString() { + return "CreateConnector{" + + "name='" + name + '\'' + + ", config=" + config + + ", type=" + type + + '}'; + } +} diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java index 006344d67847..42d3e9fc89c1 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.hamcrest.core.IsNot.not; @@ -43,6 +44,7 @@ import io.confluent.ksql.parser.tree.AllColumns; import io.confluent.ksql.parser.tree.ArithmeticUnaryExpression; import io.confluent.ksql.parser.tree.ComparisonExpression; +import io.confluent.ksql.parser.tree.CreateConnector; import io.confluent.ksql.parser.tree.CreateStream; import io.confluent.ksql.parser.tree.CreateStreamAsSelect; import io.confluent.ksql.parser.tree.CreateTable; @@ -66,6 +68,7 @@ import io.confluent.ksql.parser.tree.SetProperty; import io.confluent.ksql.parser.tree.SingleColumn; import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.parser.tree.StringLiteral; import io.confluent.ksql.parser.tree.WithinExpression; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.SqlBaseType; @@ -1140,6 +1143,32 @@ public void shouldThrowHelpfulErrorMessageIfKeyFieldNotQuoted() { KsqlParserTestUtil.buildSingleAst("CREATE STREAM S (ID INT) WITH (KEY=ID);", metaStore); } + @Test + public void shouldBuildCreateExternalSourceStatement() { + // When: + final PreparedStatement createExternal = + KsqlParserTestUtil.buildSingleAst( + "CREATE SOURCE CONNECTOR foo WITH (\"foo.bar\"='foo');", metaStore); + + // Then: + assertThat(createExternal.getStatement().getConfig(), hasEntry("foo.bar", new StringLiteral("foo"))); + assertThat(createExternal.getStatement().getName(), is("FOO")); + assertThat(createExternal.getStatement().getType(), is(CreateConnector.Type.SOURCE)); + } + + @Test + public void shouldBuildCreateExternalSinkStatement() { + // When: + final PreparedStatement createExternal = + KsqlParserTestUtil.buildSingleAst( + "CREATE SINK CONNECTOR foo WITH (\"foo.bar\"='foo');", metaStore); + + // Then: + assertThat(createExternal.getStatement().getConfig(), hasEntry("foo.bar", new StringLiteral("foo"))); + assertThat(createExternal.getStatement().getName(), is("FOO")); + assertThat(createExternal.getStatement().getType(), is(CreateConnector.Type.SINK)); + } + private static SearchedCaseExpression getSearchedCaseExpressionFromCsas(final Statement statement) { final Query query = ((CreateStreamAsSelect) statement).getQuery(); final Expression caseExpression = ((SingleColumn) query.getSelect().getSelectItems().get(0)).getExpression(); diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateConnectorTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateConnectorTest.java new file mode 100644 index 000000000000..d95843e5338d --- /dev/null +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateConnectorTest.java @@ -0,0 +1,55 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.parser.tree; + +import com.google.common.collect.ImmutableMap; +import com.google.common.testing.EqualsTester; +import java.util.Map; +import java.util.Optional; +import org.junit.Test; + +public class CreateConnectorTest { + + private static final NodeLocation SOME_LOCATION = new NodeLocation(0, 0); + private static final NodeLocation OTHER_LOCATION = new NodeLocation(1, 0); + + private static final String NAME = "foo"; + private static final String OTHER_NAME = "bar"; + + private static final Map CONFIG = ImmutableMap.of("foo", new StringLiteral("bar")); + private static final Map OTHER_CONFIG = ImmutableMap.of("foo", new StringLiteral("baz")); + + @Test + public void testEquals() { + new EqualsTester() + .addEqualityGroup( + new CreateConnector(Optional.of(SOME_LOCATION), NAME, CONFIG, CreateConnector.Type.SOURCE), + new CreateConnector(Optional.of(OTHER_LOCATION), NAME, CONFIG, CreateConnector.Type.SOURCE), + new CreateConnector(NAME, CONFIG, CreateConnector.Type.SOURCE) + ) + .addEqualityGroup( + new CreateConnector(OTHER_NAME, CONFIG, CreateConnector.Type.SOURCE) + ) + .addEqualityGroup( + new CreateConnector(NAME, OTHER_CONFIG, CreateConnector.Type.SOURCE) + ) + .addEqualityGroup( + new CreateConnector(NAME, CONFIG, CreateConnector.Type.SINK) + ) + .testEquals(); + } + +} \ No newline at end of file From 7ad3f2ff2522e610554b6e0c2c2ae6a11358f89e Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Fri, 26 Jul 2019 16:20:43 -0700 Subject: [PATCH 2/3] feat: allow single quotes for quoted identifiers --- .../src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 | 1 + .../src/main/java/io/confluent/ksql/util/ParserUtil.java | 3 +++ .../src/test/java/io/confluent/ksql/parser/KsqlParserTest.java | 2 +- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index 8ef0da321bba..d6ca667dd7b3 100644 --- a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -292,6 +292,7 @@ qualifiedName identifier : IDENTIFIER #unquotedIdentifier | QUOTED_IDENTIFIER #quotedIdentifierAlternative + | STRING #stringIdentifier | nonReserved #unquotedIdentifier | BACKQUOTED_IDENTIFIER #backQuotedIdentifier | DIGIT_IDENTIFIER #digitIdentifier diff --git a/ksql-parser/src/main/java/io/confluent/ksql/util/ParserUtil.java b/ksql-parser/src/main/java/io/confluent/ksql/util/ParserUtil.java index 1f8b826b29e9..f2b1a63c33eb 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/util/ParserUtil.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/util/ParserUtil.java @@ -27,6 +27,7 @@ import io.confluent.ksql.parser.SqlBaseParser; import io.confluent.ksql.parser.SqlBaseParser.IntegerLiteralContext; import io.confluent.ksql.parser.SqlBaseParser.NumberContext; +import io.confluent.ksql.parser.SqlBaseParser.StringIdentifierContext; import io.confluent.ksql.parser.exception.ParseFailedException; import io.confluent.ksql.parser.tree.DoubleLiteral; import io.confluent.ksql.parser.tree.IntegerLiteral; @@ -90,6 +91,8 @@ public static String getIdentifierText(final SqlBaseParser.IdentifierContext con return unquote(context.getText(), "\""); } else if (context instanceof SqlBaseParser.BackQuotedIdentifierContext) { return unquote(context.getText(), "`"); + } else if (context instanceof StringIdentifierContext) { + return unquote(context.getText(), "'"); } else { return context.getText().toUpperCase(); } diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java index 42d3e9fc89c1..1319a8106887 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java @@ -1148,7 +1148,7 @@ public void shouldBuildCreateExternalSourceStatement() { // When: final PreparedStatement createExternal = KsqlParserTestUtil.buildSingleAst( - "CREATE SOURCE CONNECTOR foo WITH (\"foo.bar\"='foo');", metaStore); + "CREATE SOURCE CONNECTOR foo WITH ('foo.bar'='foo');", metaStore); // Then: assertThat(createExternal.getStatement().getConfig(), hasEntry("foo.bar", new StringLiteral("foo"))); From 2849b1bafed60e1f0d34f7f11939ede8d6f9c4f0 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Mon, 29 Jul 2019 13:08:49 -0700 Subject: [PATCH 3/3] fix: change way we parse strings in table properties --- .../antlr4/io/confluent/ksql/parser/SqlBase.g4 | 3 +-- .../java/io/confluent/ksql/parser/AstBuilder.java | 14 ++++++++++---- .../java/io/confluent/ksql/util/ParserUtil.java | 3 --- .../io/confluent/ksql/parser/KsqlParserTest.java | 4 ++-- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index d6ca667dd7b3..718bbd7c62f7 100644 --- a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -88,7 +88,7 @@ tableProperties ; tableProperty - : identifier EQ literal + : (identifier | STRING) EQ literal ; printClause @@ -292,7 +292,6 @@ qualifiedName identifier : IDENTIFIER #unquotedIdentifier | QUOTED_IDENTIFIER #quotedIdentifierAlternative - | STRING #stringIdentifier | nonReserved #unquotedIdentifier | BACKQUOTED_IDENTIFIER #backQuotedIdentifier | DIGIT_IDENTIFIER #digitIdentifier diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index 934c6e12443a..d3331fa82bd8 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -198,10 +198,16 @@ private Map processTableProperties( final ImmutableMap.Builder properties = ImmutableMap.builder(); if (tablePropertiesContext != null) { for (final TablePropertyContext prop : tablePropertiesContext.tableProperty()) { - properties.put( - ParserUtil.getIdentifierText(prop.identifier()), - (Literal) visit(prop.literal()) - ); + if (prop.identifier() != null) { + properties.put( + ParserUtil.getIdentifierText(prop.identifier()), + (Literal) visit(prop.literal()) + ); + } else { + properties.put( + ParserUtil.unquote(prop.STRING().getText(), "'"), + (Literal) visit(prop.literal())); + } } } return properties.build(); diff --git a/ksql-parser/src/main/java/io/confluent/ksql/util/ParserUtil.java b/ksql-parser/src/main/java/io/confluent/ksql/util/ParserUtil.java index f2b1a63c33eb..1f8b826b29e9 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/util/ParserUtil.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/util/ParserUtil.java @@ -27,7 +27,6 @@ import io.confluent.ksql.parser.SqlBaseParser; import io.confluent.ksql.parser.SqlBaseParser.IntegerLiteralContext; import io.confluent.ksql.parser.SqlBaseParser.NumberContext; -import io.confluent.ksql.parser.SqlBaseParser.StringIdentifierContext; import io.confluent.ksql.parser.exception.ParseFailedException; import io.confluent.ksql.parser.tree.DoubleLiteral; import io.confluent.ksql.parser.tree.IntegerLiteral; @@ -91,8 +90,6 @@ public static String getIdentifierText(final SqlBaseParser.IdentifierContext con return unquote(context.getText(), "\""); } else if (context instanceof SqlBaseParser.BackQuotedIdentifierContext) { return unquote(context.getText(), "`"); - } else if (context instanceof StringIdentifierContext) { - return unquote(context.getText(), "'"); } else { return context.getText().toUpperCase(); } diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java index 1319a8106887..d66fbf2e5009 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java @@ -1144,7 +1144,7 @@ public void shouldThrowHelpfulErrorMessageIfKeyFieldNotQuoted() { } @Test - public void shouldBuildCreateExternalSourceStatement() { + public void shouldBuildCreateSourceConnectorStatement() { // When: final PreparedStatement createExternal = KsqlParserTestUtil.buildSingleAst( @@ -1157,7 +1157,7 @@ public void shouldBuildCreateExternalSourceStatement() { } @Test - public void shouldBuildCreateExternalSinkStatement() { + public void shouldBuildCreateSinkConnectorStatement() { // When: final PreparedStatement createExternal = KsqlParserTestUtil.buildSingleAst(