diff --git a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index 572e628f0137..718bbd7c62f7 100644 --- a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -56,6 +56,7 @@ statement (WITH tableProperties)? #createTable | CREATE TABLE (IF NOT EXISTS)? qualifiedName (WITH tableProperties)? AS query #createTableAs + | CREATE (SINK | SOURCE) CONNECTOR identifier WITH tableProperties #createConnector | INSERT INTO qualifiedName query (PARTITION BY identifier)? #insertInto | INSERT INTO qualifiedName (columns)? VALUES values #insertValues | DROP STREAM (IF EXISTS)? qualifiedName (DELETE TOPIC)? #dropStream @@ -87,7 +88,7 @@ tableProperties ; tableProperty - : identifier EQ literal + : (identifier | STRING) EQ literal ; printClause @@ -316,6 +317,7 @@ nonReserved | EXPLAIN | ANALYZE | TYPE | SET | RESET | IF + | CONNECTOR | SOURCE | SINK | KEY ; @@ -428,6 +430,9 @@ RUN: 'RUN'; SCRIPT: 'SCRIPT'; DECIMAL: 'DECIMAL'; KEY: 'KEY'; +CONNECTOR: 'CONNECTOR'; +SINK: 'SINK'; +SOURCE: 'SOURCE'; IF: 'IF'; diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index 6ecb1d18c5ec..d3331fa82bd8 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.model.DataSource; +import io.confluent.ksql.parser.SqlBaseParser.CreateConnectorContext; import io.confluent.ksql.parser.SqlBaseParser.InsertValuesContext; import io.confluent.ksql.parser.SqlBaseParser.IntervalClauseContext; import io.confluent.ksql.parser.SqlBaseParser.LimitClauseContext; @@ -42,6 +43,8 @@ import io.confluent.ksql.parser.tree.BooleanLiteral; import io.confluent.ksql.parser.tree.Cast; import io.confluent.ksql.parser.tree.ComparisonExpression; +import io.confluent.ksql.parser.tree.CreateConnector; +import io.confluent.ksql.parser.tree.CreateConnector.Type; import io.confluent.ksql.parser.tree.CreateStream; import io.confluent.ksql.parser.tree.CreateStreamAsSelect; import io.confluent.ksql.parser.tree.CreateTable; @@ -195,10 +198,16 @@ private Map processTableProperties( final ImmutableMap.Builder properties = ImmutableMap.builder(); if (tablePropertiesContext != null) { for (final TablePropertyContext prop : tablePropertiesContext.tableProperty()) { - properties.put( - ParserUtil.getIdentifierText(prop.identifier()), - (Literal) visit(prop.literal()) - ); + if (prop.identifier() != null) { + properties.put( + ParserUtil.getIdentifierText(prop.identifier()), + (Literal) visit(prop.literal()) + ); + } else { + properties.put( + ParserUtil.unquote(prop.STRING().getText(), "'"), + (Literal) visit(prop.literal())); + } } } return properties.build(); @@ -265,6 +274,20 @@ public Node visitCreateTableAs(final SqlBaseParser.CreateTableAsContext context) ); } + @Override + public Node visitCreateConnector(final CreateConnectorContext context) { + final Map properties = processTableProperties(context.tableProperties()); + final String name = ParserUtil.getIdentifierText(context.identifier()); + final CreateConnector.Type type = context.SOURCE() != null ? Type.SOURCE : Type.SINK; + + return new CreateConnector( + getLocation(context), + name, + properties, + type + ); + } + @Override public Node visitInsertInto(final SqlBaseParser.InsertIntoContext context) { diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateConnector.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateConnector.java new file mode 100644 index 000000000000..c5982d7e95b3 --- /dev/null +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateConnector.java @@ -0,0 +1,98 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.parser.tree; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import jdk.nashorn.internal.ir.annotations.Immutable; + +@Immutable +public class CreateConnector extends Statement { + + public enum Type { + SOURCE, + SINK + } + + private final String name; + private final ImmutableMap config; + private final Type type; + + public CreateConnector( + final Optional location, + final String name, + final Map config, + final Type type + ) { + super(location); + this.name = Objects.requireNonNull(name, "name"); + this.config = ImmutableMap.copyOf(Objects.requireNonNull(config, "config")); + this.type = Objects.requireNonNull(type, "type"); + } + + public CreateConnector( + final String name, + final Map config, + final Type type + ) { + this(Optional.empty(), name, config, type); + } + + + public String getName() { + return name; + } + + public Map getConfig() { + return config; + } + + public Type getType() { + return type; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + final CreateConnector that = (CreateConnector) o; + return Objects.equals(name, that.name) + && Objects.equals(config, that.config) + && Objects.equals(type, that.type); + } + + @Override + public int hashCode() { + return Objects.hash(name, config, type); + } + + @Override + public String toString() { + return "CreateConnector{" + + "name='" + name + '\'' + + ", config=" + config + + ", type=" + type + + '}'; + } +} diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java index 006344d67847..d66fbf2e5009 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.hamcrest.core.IsNot.not; @@ -43,6 +44,7 @@ import io.confluent.ksql.parser.tree.AllColumns; import io.confluent.ksql.parser.tree.ArithmeticUnaryExpression; import io.confluent.ksql.parser.tree.ComparisonExpression; +import io.confluent.ksql.parser.tree.CreateConnector; import io.confluent.ksql.parser.tree.CreateStream; import io.confluent.ksql.parser.tree.CreateStreamAsSelect; import io.confluent.ksql.parser.tree.CreateTable; @@ -66,6 +68,7 @@ import io.confluent.ksql.parser.tree.SetProperty; import io.confluent.ksql.parser.tree.SingleColumn; import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.parser.tree.StringLiteral; import io.confluent.ksql.parser.tree.WithinExpression; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.SqlBaseType; @@ -1140,6 +1143,32 @@ public void shouldThrowHelpfulErrorMessageIfKeyFieldNotQuoted() { KsqlParserTestUtil.buildSingleAst("CREATE STREAM S (ID INT) WITH (KEY=ID);", metaStore); } + @Test + public void shouldBuildCreateSourceConnectorStatement() { + // When: + final PreparedStatement createExternal = + KsqlParserTestUtil.buildSingleAst( + "CREATE SOURCE CONNECTOR foo WITH ('foo.bar'='foo');", metaStore); + + // Then: + assertThat(createExternal.getStatement().getConfig(), hasEntry("foo.bar", new StringLiteral("foo"))); + assertThat(createExternal.getStatement().getName(), is("FOO")); + assertThat(createExternal.getStatement().getType(), is(CreateConnector.Type.SOURCE)); + } + + @Test + public void shouldBuildCreateSinkConnectorStatement() { + // When: + final PreparedStatement createExternal = + KsqlParserTestUtil.buildSingleAst( + "CREATE SINK CONNECTOR foo WITH (\"foo.bar\"='foo');", metaStore); + + // Then: + assertThat(createExternal.getStatement().getConfig(), hasEntry("foo.bar", new StringLiteral("foo"))); + assertThat(createExternal.getStatement().getName(), is("FOO")); + assertThat(createExternal.getStatement().getType(), is(CreateConnector.Type.SINK)); + } + private static SearchedCaseExpression getSearchedCaseExpressionFromCsas(final Statement statement) { final Query query = ((CreateStreamAsSelect) statement).getQuery(); final Expression caseExpression = ((SingleColumn) query.getSelect().getSelectItems().get(0)).getExpression(); diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateConnectorTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateConnectorTest.java new file mode 100644 index 000000000000..d95843e5338d --- /dev/null +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateConnectorTest.java @@ -0,0 +1,55 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.parser.tree; + +import com.google.common.collect.ImmutableMap; +import com.google.common.testing.EqualsTester; +import java.util.Map; +import java.util.Optional; +import org.junit.Test; + +public class CreateConnectorTest { + + private static final NodeLocation SOME_LOCATION = new NodeLocation(0, 0); + private static final NodeLocation OTHER_LOCATION = new NodeLocation(1, 0); + + private static final String NAME = "foo"; + private static final String OTHER_NAME = "bar"; + + private static final Map CONFIG = ImmutableMap.of("foo", new StringLiteral("bar")); + private static final Map OTHER_CONFIG = ImmutableMap.of("foo", new StringLiteral("baz")); + + @Test + public void testEquals() { + new EqualsTester() + .addEqualityGroup( + new CreateConnector(Optional.of(SOME_LOCATION), NAME, CONFIG, CreateConnector.Type.SOURCE), + new CreateConnector(Optional.of(OTHER_LOCATION), NAME, CONFIG, CreateConnector.Type.SOURCE), + new CreateConnector(NAME, CONFIG, CreateConnector.Type.SOURCE) + ) + .addEqualityGroup( + new CreateConnector(OTHER_NAME, CONFIG, CreateConnector.Type.SOURCE) + ) + .addEqualityGroup( + new CreateConnector(NAME, OTHER_CONFIG, CreateConnector.Type.SOURCE) + ) + .addEqualityGroup( + new CreateConnector(NAME, CONFIG, CreateConnector.Type.SINK) + ) + .testEquals(); + } + +} \ No newline at end of file