diff --git a/ksqldb-parser/pom.xml b/ksqldb-parser/pom.xml index 7d66f222a4f4..234a2454ada4 100644 --- a/ksqldb-parser/pom.xml +++ b/ksqldb-parser/pom.xml @@ -85,6 +85,12 @@ test + + com.approvaltests + approvaltests + 9.5.0 + + diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/DefaultKsqlParser.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/DefaultKsqlParser.java index 9cc3b5755cd1..0d62d7df9c3d 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/DefaultKsqlParser.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/DefaultKsqlParser.java @@ -82,7 +82,7 @@ public PreparedStatement prepare( } } - private static SqlBaseParser.StatementsContext getParseTree(final String sql) { + public static SqlBaseParser.StatementsContext getParseTree(final String sql) { final SqlBaseLexer sqlBaseLexer = new SqlBaseLexer( new CaseInsensitiveStream(CharStreams.fromString(sql))); diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/util/QueryAnonymizer.java b/ksqldb-parser/src/main/java/io/confluent/ksql/util/QueryAnonymizer.java new file mode 100644 index 000000000000..633a14d6482d --- /dev/null +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/util/QueryAnonymizer.java @@ -0,0 +1,802 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.util; + +import io.confluent.ksql.execution.expression.tree.FunctionCall; +import io.confluent.ksql.metastore.TypeRegistry; +import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.parser.AstBuilder; +import io.confluent.ksql.parser.DefaultKsqlParser; +import io.confluent.ksql.parser.SqlBaseBaseVisitor; +import io.confluent.ksql.parser.SqlBaseParser.AlterOptionContext; +import io.confluent.ksql.parser.SqlBaseParser.AlterSourceContext; +import io.confluent.ksql.parser.SqlBaseParser.BooleanDefaultContext; +import io.confluent.ksql.parser.SqlBaseParser.BooleanLiteralContext; +import io.confluent.ksql.parser.SqlBaseParser.CreateConnectorContext; +import io.confluent.ksql.parser.SqlBaseParser.CreateStreamAsContext; +import io.confluent.ksql.parser.SqlBaseParser.CreateStreamContext; +import io.confluent.ksql.parser.SqlBaseParser.CreateTableAsContext; +import io.confluent.ksql.parser.SqlBaseParser.CreateTableContext; +import io.confluent.ksql.parser.SqlBaseParser.DefineVariableContext; +import io.confluent.ksql.parser.SqlBaseParser.DescribeConnectorContext; +import io.confluent.ksql.parser.SqlBaseParser.DescribeFunctionContext; +import io.confluent.ksql.parser.SqlBaseParser.DescribeStreamsContext; +import io.confluent.ksql.parser.SqlBaseParser.DropConnectorContext; +import io.confluent.ksql.parser.SqlBaseParser.DropStreamContext; +import io.confluent.ksql.parser.SqlBaseParser.DropTableContext; +import io.confluent.ksql.parser.SqlBaseParser.DropTypeContext; +import io.confluent.ksql.parser.SqlBaseParser.ExplainContext; +import io.confluent.ksql.parser.SqlBaseParser.ExpressionContext; +import io.confluent.ksql.parser.SqlBaseParser.GroupByContext; +import io.confluent.ksql.parser.SqlBaseParser.InsertIntoContext; +import io.confluent.ksql.parser.SqlBaseParser.InsertValuesContext; +import io.confluent.ksql.parser.SqlBaseParser.IntegerLiteralContext; +import io.confluent.ksql.parser.SqlBaseParser.ListConnectorsContext; +import io.confluent.ksql.parser.SqlBaseParser.ListFunctionsContext; +import io.confluent.ksql.parser.SqlBaseParser.ListPropertiesContext; +import io.confluent.ksql.parser.SqlBaseParser.ListQueriesContext; +import io.confluent.ksql.parser.SqlBaseParser.ListStreamsContext; +import io.confluent.ksql.parser.SqlBaseParser.ListTopicsContext; +import io.confluent.ksql.parser.SqlBaseParser.ListTypesContext; +import io.confluent.ksql.parser.SqlBaseParser.ListVariablesContext; +import io.confluent.ksql.parser.SqlBaseParser.LogicalBinaryContext; +import io.confluent.ksql.parser.SqlBaseParser.NumericLiteralContext; +import io.confluent.ksql.parser.SqlBaseParser.PartitionByContext; +import io.confluent.ksql.parser.SqlBaseParser.PrintTopicContext; +import io.confluent.ksql.parser.SqlBaseParser.QueryContext; +import io.confluent.ksql.parser.SqlBaseParser.RegisterTypeContext; +import io.confluent.ksql.parser.SqlBaseParser.SelectItemContext; +import io.confluent.ksql.parser.SqlBaseParser.SelectSingleContext; +import io.confluent.ksql.parser.SqlBaseParser.SetPropertyContext; +import io.confluent.ksql.parser.SqlBaseParser.ShowColumnsContext; +import io.confluent.ksql.parser.SqlBaseParser.SingleExpressionContext; +import io.confluent.ksql.parser.SqlBaseParser.SingleStatementContext; +import io.confluent.ksql.parser.SqlBaseParser.StatementsContext; +import io.confluent.ksql.parser.SqlBaseParser.StringLiteralContext; +import io.confluent.ksql.parser.SqlBaseParser.TableElementContext; +import io.confluent.ksql.parser.SqlBaseParser.TableElementsContext; +import io.confluent.ksql.parser.SqlBaseParser.TablePropertiesContext; +import io.confluent.ksql.parser.SqlBaseParser.TablePropertyContext; +import io.confluent.ksql.parser.SqlBaseParser.TerminateQueryContext; +import io.confluent.ksql.parser.SqlBaseParser.TypeContext; +import io.confluent.ksql.parser.SqlBaseParser.UndefineVariableContext; +import io.confluent.ksql.parser.SqlBaseParser.UnquotedIdentifierContext; +import io.confluent.ksql.parser.SqlBaseParser.UnsetPropertyContext; +import io.confluent.ksql.parser.SqlBaseParser.ValueExpressionContext; +import java.util.ArrayList; +import java.util.Hashtable; +import java.util.List; +import java.util.stream.Collectors; +import org.antlr.v4.runtime.ParserRuleContext; +import org.antlr.v4.runtime.tree.TerminalNode; +import org.apache.commons.lang3.StringUtils; + +public class QueryAnonymizer { + + public String anonymize(final ParserRuleContext tree) { + return build(tree); + } + + public String anonymize(final String query) { + final ParserRuleContext tree = DefaultKsqlParser.getParseTree(query); + return build(tree); + } + + private String build(final ParserRuleContext parseTree) { + return new Visitor().visit(parseTree); + } + + private static final class Visitor extends SqlBaseBaseVisitor { + private int streamCount = 1; + private int columnCount = 1; + private int tableCount = 1; + private int udfCount = 1; + private final Hashtable anonTable = new Hashtable<>(); + + @Override + public String visitStatements(final StatementsContext context) { + final List statementList = new ArrayList<>(); + for (final SingleStatementContext stmtContext : context.singleStatement()) { + final String statement = visitSingleStatement(stmtContext); + statementList.add(statement); + } + return StringUtils.join(statementList, ""); + } + + @Override + public String visitSingleStatement(final SingleStatementContext context) { + return visit(context.statement()); + } + + @Override + public String visitType(final TypeContext context) { + if (context.type().isEmpty()) { + return context.getText(); + } + final List typeList = new ArrayList<>(); + for (TypeContext typeContext : context.type()) { + typeList.add(visit(typeContext)); + } + + return String.format("STRUCT<%s>", StringUtils.join(typeList, ", ")); + } + + @Override + public String visitAlterSource(final AlterSourceContext context) { + final StringBuilder stringBuilder = new StringBuilder("ALTER"); + + // anonymize stream or table name + final String streamTable = ParserUtil.getIdentifierText(context.sourceName().identifier()); + if (context.STREAM() != null) { + stringBuilder.append(String.format(" STREAM %s", getAnonStreamName(streamTable))); + } else { + stringBuilder.append(String.format(" TABLE %s", getAnonTableName(streamTable))); + } + + // alter option + final List alterOptions = new ArrayList<>(); + for (AlterOptionContext alterOption : context.alterOption()) { + alterOptions.add(visit(alterOption)); + } + stringBuilder.append(String.format(" (%s)", StringUtils.join(alterOptions, ", "))); + + return String.format("%s;", stringBuilder.toString()); + } + + @Override + public String visitAlterOption(final AlterOptionContext context) { + final String columnName = context.identifier().getText(); + final String anonColumnName = getAnonColumnName(columnName); + final String typeName = context.type().getText(); + + return String.format("ADD COLUMN %1$s %2$s", anonColumnName, typeName); + } + + @Override + public String visitRegisterType(final RegisterTypeContext context) { + final StringBuilder stringBuilder = new StringBuilder("CREATE TYPE"); + + // optional if not exists + if (context.EXISTS() != null) { + stringBuilder.append(" IF NOT EXISTS"); + } + + // anonymize type + stringBuilder.append(String.format(" type AS %s", visit(context.type()))); + + return String.format("%s;", stringBuilder.toString()); + } + + @Override + public String visitCreateConnector(final CreateConnectorContext context) { + final StringBuilder stringBuilder = new StringBuilder("CREATE"); + + if (context.SOURCE() != null) { + stringBuilder.append(" SOURCE"); + } else if (context.SINK() != null) { + stringBuilder.append(" SINK"); + } + + stringBuilder.append(" CONNECTOR"); + + // optional if not exists + if (context.EXISTS() != null) { + stringBuilder.append(" IF NOT EXISTS"); + } + + stringBuilder.append(" connector "); + + // anonymize properties + if (context.tableProperties() != null) { + stringBuilder.append(visit(context.tableProperties())); + } + + return String.format("%s;", stringBuilder.toString()); + } + + @Override + public String visitInsertInto(final InsertIntoContext context) { + final StringBuilder stringBuilder = new StringBuilder("INSERT INTO "); + + // anonymize stream name + final String streamName = ParserUtil.getIdentifierText(context.sourceName().identifier()); + stringBuilder.append(getAnonStreamName(streamName)); + + // anonymize properties + if (context.tableProperties() != null) { + stringBuilder.append(visit(context.tableProperties())); + } + + // anonymize with query + if (context.query() != null) { + stringBuilder.append(String.format(" SELECT %s", getQuery(context.query(), true))); + } + + return String.format("%s;", stringBuilder.toString()); + } + + @Override + public String visitInsertValues(final InsertValuesContext context) { + final StringBuilder stringBuilder = new StringBuilder("INSERT INTO "); + + // anonymize stream name + final String streamName = ParserUtil.getIdentifierText(context.sourceName().identifier()); + stringBuilder.append(getAnonStreamName(streamName)); + + // visit columns + if (context.columns() != null) { + final List columns = context.columns().identifier() + .stream() + .map(ParserUtil::getIdentifierText) + .map(ColumnName::of) + .map(ColumnName::toString) + .map(this::getAnonColumnName) + .collect(Collectors.toList()); + stringBuilder.append(String.format(" (%s)", StringUtils.join(columns, ", "))); + } + + // visit values + final List values = new ArrayList<>(); + for (ValueExpressionContext value : context.values().valueExpression()) { + values.add(visit(value)); + } + stringBuilder.append(String.format(" VALUES (%s)", StringUtils.join(values, " ,"))); + + return String.format("%s;", stringBuilder.toString()); + } + + @Override + public String visitListConnectors(final ListConnectorsContext context) { + final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW(); + final StringBuilder stringBuilder = new StringBuilder(listOrVisit.toString()); + + if (context.SOURCE() != null) { + stringBuilder.append(" SOURCE"); + } else if (context.SINK() != null) { + stringBuilder.append(" SINK"); + } + + stringBuilder.append(" CONNECTORS"); + + return String.format("%s;", stringBuilder.toString()); + } + + @Override + public String visitListStreams(final ListStreamsContext context) { + final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW(); + final StringBuilder stringBuilder = new StringBuilder(listOrVisit.toString() + " STREAMS"); + + if (context.EXTENDED() != null) { + stringBuilder.append(" EXTENDED"); + } + + return String.format("%s;", stringBuilder.toString()); + } + + @Override + public String visitListFunctions(final ListFunctionsContext context) { + final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW(); + return String.format("%s FUNCTIONS;", listOrVisit.toString()); + } + + @Override + public String visitListProperties(final ListPropertiesContext context) { + final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW(); + return String.format("%s PROPERTIES;", listOrVisit.toString()); + } + + @Override + public String visitListTypes(final ListTypesContext context) { + final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW(); + return String.format("%s TYPES;", listOrVisit.toString()); + } + + @Override + public String visitListVariables(final ListVariablesContext context) { + final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW(); + return String.format("%s VARIABLES;", listOrVisit.toString()); + } + + @Override + public String visitListQueries(final ListQueriesContext context) { + final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW(); + final StringBuilder stringBuilder = new StringBuilder(listOrVisit.toString() + " QUERIES"); + + if (context.EXTENDED() != null) { + stringBuilder.append(" EXTENDED"); + } + + return String.format("%s;", stringBuilder.toString()); + } + + @Override + public String visitListTopics(final ListTopicsContext context) { + final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW(); + final StringBuilder stringBuilder = new StringBuilder(listOrVisit.toString()); + + if (context.ALL() != null) { + stringBuilder.append(" ALL"); + } + + stringBuilder.append(" TOPICS"); + + if (context.EXTENDED() != null) { + stringBuilder.append(" EXTENDED"); + } + + return String.format("%s;", stringBuilder.toString()); + } + + @Override + public String visitDescribeFunction(final DescribeFunctionContext context) { + return "DESCRIBE FUNCTION function;"; + } + + @Override + public String visitDescribeConnector(final DescribeConnectorContext context) { + return "DESCRIBE CONNECTOR connector;"; + } + + @Override + public String visitPrintTopic(final PrintTopicContext context) { + final StringBuilder stringBuilder = new StringBuilder("PRINT topic"); + + if (context.printClause().FROM() != null) { + stringBuilder.append(" FROM BEGINNING"); + } + + if (context.printClause().intervalClause() != null) { + stringBuilder.append(" INTERVAL '0'"); + } + + if (context.printClause().limitClause() != null) { + stringBuilder.append(" LIMIT '0'"); + } + + return String.format("%s;", stringBuilder.toString()); + } + + @Override + public String visitTerminateQuery(final TerminateQueryContext context) { + if (context.ALL() != null) { + return "TERMINATE ALL;"; + } + return "TERMINATE query;"; + } + + + @Override + public String visitDescribeStreams(final DescribeStreamsContext context) { + final StringBuilder stringBuilder = new StringBuilder("DESCRIBE STREAMS "); + + if (context.EXTENDED() != null) { + stringBuilder.append("EXTENDED"); + } + + return String.format("%s;", stringBuilder.toString()); + } + + @Override + public String visitShowColumns(final ShowColumnsContext context) { + final StringBuilder stringBuilder = new StringBuilder("DESCRIBE "); + + final String streamTable = ParserUtil.getIdentifierText(context.sourceName().identifier()); + if (context.sourceName().identifier() instanceof UnquotedIdentifierContext + && context.sourceName().getText().equalsIgnoreCase("TABLES")) { + stringBuilder.append(getAnonTableName(streamTable)); + } else { + stringBuilder.append(getAnonStreamName(streamTable)); + } + + if (context.EXTENDED() != null) { + stringBuilder.append(" EXTENDED"); + } + + return String.format("%s;", stringBuilder.toString()); + } + + @Override + public String visitSetProperty(final SetPropertyContext context) { + final String propertyName = context.STRING(0).getText(); + return String.format("SET %s='[string]';", propertyName); + } + + @Override + public String visitUnsetProperty(final UnsetPropertyContext context) { + final String propertyName = context.STRING().getText(); + return String.format("UNSET %s;", propertyName); + } + + @Override + public String visitDefineVariable(final DefineVariableContext context) { + return "DEFINE variable='[string]';"; + } + + @Override + public String visitUndefineVariable(final UndefineVariableContext context) { + return "UNDEFINE variable;"; + } + + @Override + public String visitExplain(final ExplainContext context) { + return "EXPLAIN query"; + } + + @Override + public String visitExpression(final ExpressionContext context) { + final String columnName = context.getText(); + + // check if it's an udf + if (new AstBuilder(TypeRegistry.EMPTY).buildExpression(context) instanceof FunctionCall) { + return getAnonUdfName(columnName); + } + + return getAnonColumnName(columnName); + } + + @Override + public String visitSelectSingle(final SelectSingleContext context) { + return visit(context.expression()); + } + + @Override + public String visitSingleExpression(final SingleExpressionContext context) { + return visit(context.expression()); + } + + @Override + public String visitBooleanDefault(final BooleanDefaultContext context) { + final String columnName = context.getChild(0).getChild(0).getText(); + final String anonColumnName = getAnonColumnName(columnName); + final String anonValue = visit(context.getChild(0).getChild(1)); + return String.format("%1$s=%2$s", anonColumnName, anonValue); + } + + @Override + public String visitLogicalBinary(final LogicalBinaryContext context) { + return String.format("%1$s %2$s %3$s", + visit(context.left), context.operator.getText(), visit(context.right)); + } + + @Override + public String visitPartitionBy(final PartitionByContext context) { + final String columnName = context.getText(); + return getAnonColumnName(columnName); + } + + @Override + public String visitGroupBy(final GroupByContext context) { + final String columnName = context.getText(); + return getAnonColumnName(columnName); + } + + @Override + public String visitStringLiteral(final StringLiteralContext context) { + return "'[string]'"; + } + + @Override + public String visitIntegerLiteral(final IntegerLiteralContext context) { + return "'0'"; + } + + @Override + public String visitNumericLiteral(final NumericLiteralContext context) { + return "'0'"; + } + + @Override + public String visitBooleanLiteral(final BooleanLiteralContext context) { + return "'false'"; + } + + @Override + public String visitCreateStreamAs(final CreateStreamAsContext context) { + final StringBuilder stringBuilder = new StringBuilder("CREATE "); + + // optional replace + if (context.OR() != null && context.REPLACE() != null) { + stringBuilder.append("OR REPLACE "); + } + + stringBuilder.append("STREAM "); + + // optional if not exists + if (context.IF() != null && context.NOT() != null && context.EXISTS() != null) { + stringBuilder.append("IF NOT EXISTS "); + } + + // anonymize stream name + final String streamName = ParserUtil.getIdentifierText(context.sourceName().identifier()); + stringBuilder.append(getAnonStreamName(streamName)); + + // anonymize properties + if (context.tableProperties() != null) { + stringBuilder.append(visit(context.tableProperties())); + } + + // rest of query + if (context.query() != null) { + stringBuilder.append(String.format(" AS SELECT %s", getQuery(context.query(), true))); + } + + return String.format("%s;", stringBuilder.toString()); + } + + @Override + public String visitCreateStream(final CreateStreamContext context) { + final StringBuilder stringBuilder = new StringBuilder("CREATE "); + + // optional replace + if (context.OR() != null && context.REPLACE() != null) { + stringBuilder.append("OR REPLACE "); + } + + stringBuilder.append("STREAM "); + + // optional if not exists + if (context.IF() != null && context.NOT() != null && context.EXISTS() != null) { + stringBuilder.append("IF NOT EXISTS "); + } + + // anonymize stream name + final String streamName = ParserUtil.getIdentifierText(context.sourceName().identifier()); + stringBuilder.append(String.format("%s ", getAnonStreamName(streamName))); + + // anonymize table elements + if (context.tableElements() != null) { + stringBuilder.append(visit(context.tableElements())); + } + + // anonymize properties + if (context.tableProperties() != null) { + stringBuilder.append(visit(context.tableProperties())); + } + + return String.format("%s;", stringBuilder.toString()); + } + + @Override + public String visitCreateTableAs(final CreateTableAsContext context) { + final StringBuilder stringBuilder = new StringBuilder("CREATE "); + + // optional replace + if (context.OR() != null && context.REPLACE() != null) { + stringBuilder.append("OR REPLACE "); + } + + stringBuilder.append("TABLE "); + + // optional if not exists + if (context.IF() != null && context.NOT() != null && context.EXISTS() != null) { + stringBuilder.append("IF NOT EXISTS "); + } + + // anonymize table name + final String tableName = ParserUtil.getIdentifierText(context.sourceName().identifier()); + stringBuilder.append(getAnonTableName(tableName)); + + // anonymize properties + if (context.tableProperties() != null) { + stringBuilder.append(visit(context.tableProperties())); + } + + // rest of query + if (context.query() != null) { + stringBuilder.append(String.format(" AS SELECT %s", getQuery(context.query(), false))); + } + + return String.format("%s;", stringBuilder.toString()); + } + + @Override + public String visitCreateTable(final CreateTableContext context) { + final StringBuilder stringBuilder = new StringBuilder("CREATE "); + + // optional replace + if (context.OR() != null && context.REPLACE() != null) { + stringBuilder.append("OR REPLACE "); + } + + stringBuilder.append("TABLE "); + + // optional if not exists + if (context.IF() != null && context.NOT() != null && context.EXISTS() != null) { + stringBuilder.append("IF NOT EXISTS "); + } + + // anonymize table name + final String tableName = ParserUtil.getIdentifierText(context.sourceName().identifier()); + stringBuilder.append(String.format("%s ", getAnonTableName(tableName))); + + // anonymize table elements + if (context.tableElements() != null) { + stringBuilder.append(visit(context.tableElements())); + } + + // anonymize properties + if (context.tableProperties() != null) { + stringBuilder.append(visit(context.tableProperties())); + } + + return String.format("%s;", stringBuilder.toString()); + } + + @Override + public String visitTableElements(final TableElementsContext context) { + final List tableElements = new ArrayList<>(); + for (final TableElementContext tableContext : context.tableElement()) { + tableElements.add(visit(tableContext)); + } + + return String.format("(%s) ", StringUtils.join(tableElements, ", ")); + } + + @Override + public String visitTableElement(final TableElementContext context) { + final String columnName = ParserUtil.getIdentifierText(context.identifier()); + final String newName = getAnonColumnName(columnName); + + return String.format("%1$s %2$s", newName, context.type().getText()); + } + + @Override + public String visitTableProperties(final TablePropertiesContext context) { + final List tableProperties = new ArrayList<>(); + for (final TablePropertyContext prop : context.tableProperty()) { + final StringBuilder formattedProp = new StringBuilder(); + if (prop.identifier() != null) { + formattedProp.append(ParserUtil.getIdentifierText(prop.identifier())); + } else { + formattedProp.append(prop.STRING().getText()); + } + formattedProp.append("=").append(visit(prop.literal())); + tableProperties.add(formattedProp.toString()); + } + + return String.format("WITH (%s)", StringUtils.join(tableProperties, ", ")); + } + + @Override + public String visitDropTable(final DropTableContext context) { + final StringBuilder stringBuilder = new StringBuilder("DROP TABLE "); + + if (context.EXISTS() != null) { + stringBuilder.append("IF EXISTS "); + } + + // anonymize table name + final String tableName = ParserUtil.getIdentifierText(context.sourceName().identifier()); + stringBuilder.append(getAnonTableName(tableName)); + + if (context.DELETE() != null) { + stringBuilder.append(" DELETE TOPIC"); + } + + return String.format("%s;", stringBuilder.toString()); + } + + @Override + public String visitDropStream(final DropStreamContext context) { + final StringBuilder stringBuilder = new StringBuilder("DROP STREAM "); + + if (context.EXISTS() != null) { + stringBuilder.append("IF EXISTS "); + } + + // anonymize stream name + final String streamName = ParserUtil.getIdentifierText(context.sourceName().identifier()); + stringBuilder.append(getAnonStreamName(streamName)); + + if (context.DELETE() != null) { + stringBuilder.append(" DELETE TOPIC"); + } + + return String.format("%s;", stringBuilder.toString()); + } + + @Override + public String visitDropConnector(final DropConnectorContext context) { + final StringBuilder stringBuilder = new StringBuilder("DROP CONNECTOR "); + + if (context.EXISTS() != null) { + stringBuilder.append("IF EXISTS "); + } + + stringBuilder.append("connector"); + + return String.format("%s;", stringBuilder.toString()); + } + + @Override + public String visitDropType(final DropTypeContext context) { + final StringBuilder stringBuilder = new StringBuilder("DROP TYPE "); + + if (context.EXISTS() != null) { + stringBuilder.append("IF EXISTS "); + } + + stringBuilder.append("type"); + + return String.format("%s;", stringBuilder.toString()); + } + + private String getQuery(final QueryContext context, final boolean isStream) { + final StringBuilder stringBuilder = new StringBuilder(); + + // visit as select items + final List selectItemList = new ArrayList<>(); + for (SelectItemContext selectItem : context.selectItem()) { + selectItemList.add(visit(selectItem)); + } + stringBuilder.append(StringUtils.join(selectItemList, ", ")); + + // visit from statement + final String streamTableName = context.from.getText(); + final String anonStreamTableName = + !isStream ? getAnonTableName(streamTableName) : getAnonStreamName(streamTableName); + stringBuilder.append(String.format(" FROM %s", anonStreamTableName)); + + // visit where statement + if (context.where != null) { + stringBuilder.append(String.format(" WHERE %s", visit(context.where))); + } + + // visit partition by + if (context.partitionBy() != null) { + stringBuilder.append(String.format(" PARTITION BY %s", visit(context.partitionBy()))); + } + + // visit group by + if (context.groupBy() != null) { + stringBuilder.append(String.format(" GROUP BY %s", visit(context.groupBy()))); + } + + // visit emit changes + if (context.EMIT() != null) { + stringBuilder.append(" EMIT CHANGES"); + } + + return stringBuilder.toString(); + } + + private String getAnonUdfName(final String originName) { + return getAnonName(originName, "udf", udfCount++); + } + + private String getAnonStreamName(final String originName) { + return getAnonName(originName, "stream", streamCount++); + } + + private String getAnonColumnName(final String originName) { + return getAnonName(originName, "column", columnCount++); + } + + private String getAnonTableName(final String originName) { + return getAnonName(originName, "table", tableCount++); + } + + private String getAnonName(final String originName, final String genericName, final int count) { + if (anonTable.containsKey(originName)) { + return anonTable.get(originName); + } + + final String newName = String.format("%s%d", genericName, count); + anonTable.put(originName, newName); + return newName; + } + } +} diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.java new file mode 100644 index 000000000000..fa7a566dd81a --- /dev/null +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.java @@ -0,0 +1,230 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.util; + +import io.confluent.ksql.parser.DefaultKsqlParser; +import org.antlr.v4.runtime.ParserRuleContext; +import org.approvaltests.Approvals; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class QueryAnonymizerTest { + + private QueryAnonymizer anon; + + @Before + public void setUp() { + anon = new QueryAnonymizer(); + } + + @Test + public void shouldWorkAsExpectedWhenPassedAParseTreeInsteadOfString() { + // Given: + final ParserRuleContext tree = + DefaultKsqlParser.getParseTree("DESCRIBE my_stream EXTENDED;"); + + // Then: + Assert.assertEquals("DESCRIBE stream1 EXTENDED;", + anon.anonymize(tree)); + } + + @Test + public void shouldLeaveListAndPrintStatementsAsTheyAre() { + Assert.assertEquals("LIST PROPERTIES;", anon.anonymize("LIST PROPERTIES;")); + Assert.assertEquals("SHOW PROPERTIES;", anon.anonymize("SHOW PROPERTIES;")); + Assert.assertEquals("LIST TOPICS;", anon.anonymize("LIST TOPICS;")); + Assert.assertEquals("SHOW ALL TOPICS EXTENDED;", + anon.anonymize("SHOW ALL TOPICS EXTENDED;")); + Assert.assertEquals("LIST STREAMS EXTENDED;", + anon.anonymize("LIST STREAMS EXTENDED;")); + Assert.assertEquals("LIST FUNCTIONS;", anon.anonymize("LIST FUNCTIONS;")); + Assert.assertEquals("LIST CONNECTORS;", anon.anonymize("LIST CONNECTORS;")); + Assert.assertEquals("LIST SOURCE CONNECTORS;", + anon.anonymize("LIST SOURCE CONNECTORS;")); + Assert.assertEquals("LIST TYPES;", anon.anonymize("LIST TYPES;")); + Assert.assertEquals("LIST VARIABLES;", anon.anonymize("LIST VARIABLES;")); + Assert.assertEquals("LIST QUERIES;", anon.anonymize("LIST QUERIES;")); + } + + @Test + public void describeStatementsShouldGetAnonymized() { + Assert.assertEquals("DESCRIBE stream1 EXTENDED;", + anon.anonymize("DESCRIBE my_stream EXTENDED;")); + Assert.assertEquals("DESCRIBE STREAMS EXTENDED;", + anon.anonymize("DESCRIBE STREAMS EXTENDED;")); + Assert.assertEquals("DESCRIBE FUNCTION function;", + anon.anonymize("DESCRIBE FUNCTION my_function;")); + Assert.assertEquals("DESCRIBE CONNECTOR connector;", + anon.anonymize("DESCRIBE CONNECTOR my_connector;")); + } + + @Test + public void printStatementsShouldGetAnonymized() { + Assert.assertEquals("PRINT topic FROM BEGINNING;", + anon.anonymize("PRINT my_topic FROM BEGINNING;")); + Assert.assertEquals("PRINT topic INTERVAL '0';", + anon.anonymize("PRINT my_topic INTERVAL 2;")); + Assert.assertEquals("PRINT topic LIMIT '0';", + anon.anonymize("PRINT my_topic LIMIT 3;")); + } + + @Test + public void terminateQueryShouldGetAnonymized() { + Assert.assertEquals("TERMINATE query;", + anon.anonymize("TERMINATE my_query;")); + Assert.assertEquals("TERMINATE ALL;", + anon.anonymize("TERMINATE ALL;")); + } + + @Test + public void shouldAnonymizeSetUnsetProperty() { + Assert.assertEquals("SET 'auto.offset.reset'='[string]';", + anon.anonymize("SET 'auto.offset.reset'='earliest';")); + Assert.assertEquals("UNSET 'auto.offset.reset';", + anon.anonymize("UNSET 'auto.offset.reset';")); + } + + @Test + public void shouldAnonymizeDefineUndefineProperty() { + Assert.assertEquals("DEFINE variable='[string]';", + anon.anonymize("DEFINE format = 'JSON';")); + Assert.assertEquals("UNDEFINE variable;", + anon.anonymize("UNDEFINE format;")); + } + + @Test + public void shouldAnonymizeExplainStatementCorrectly() { + Assert.assertEquals("EXPLAIN query", anon.anonymize("EXPLAIN my_query;")); + } + + @Test + public void shouldAnonymizeCreateStreamQueryCorrectly() { + final String output = anon.anonymize( + "CREATE STREAM my_stream (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)\n" + + "WITH (kafka_topic='locations', value_format='json', partitions=1);"); + + Approvals.verify(output); + } + + @Test + public void shouldAnonymizeCreateStreamAsQueryCorrectly() { + final String output = anon.anonymize( + "CREATE STREAM my_stream AS SELECT user_id, browser_cookie, ip_address\n" + + "FROM another_stream\n" + + "WHERE user_id = 4214\n" + + "AND browser_cookie = 'aefde34ec'\n" + + "AND ip_address = '10.10.0.2';"); + + Approvals.verify(output); + } + + @Test + public void shouldAnonymizeCreateTableCorrectly() { + final String output = anon.anonymize( + "CREATE TABLE my_table (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)\n" + + "WITH (kafka_topic='locations', value_format='json', partitions=1);"); + + Approvals.verify(output); + } + + @Test + public void shouldAnonymizeCreateTableAsQueryCorrectly() { + final String output = anon.anonymize( + "CREATE TABLE my_table AS SELECT user_id, browser_cookie, ip_address\n" + + "FROM another_table\n" + + "WHERE user_id = 4214\n" + + "AND browser_cookie = 'aefde34ec'\n" + + "AND ip_address = '10.10.0.2';"); + + Approvals.verify(output); + } + + @Test + public void shouldAnonymizeCreateConnectorCorrectly() { + final String output = anon.anonymize( + "CREATE SOURCE CONNECTOR `jdbc-connector` WITH(\n" + + "\"connector.class\"='io.confluent.connect.jdbc.JdbcSourceConnector',\n" + + "\"connection.url\"='jdbc:postgresql://localhost:5432/my.db',\n" + + "\"mode\"='bulk',\n" + + "\"topic.prefix\"='jdbc-',\n" + + "\"table.whitelist\"='users',\n" + + "\"key\"='username');"); + + Approvals.verify(output); + } + + @Test + public void shouldAnonymizeCreateTypeCorrectly() { + // simple statement + Assert.assertEquals("CREATE TYPE type AS INTEGER;", + anon.anonymize("CREATE TYPE ADDRESS AS INTEGER;")); + + // more elaborate statement + final String output = anon.anonymize( + "CREATE TYPE ADDRESS AS STRUCT;"); + + Approvals.verify(output); + } + + @Test + public void shouldAnonymizeAlterOptionCorrectly() { + final String output = anon.anonymize( + "ALTER STREAM my_stream ADD COLUMN c3 INT, ADD COLUMN c4 INT;"); + + Approvals.verify(output); + } + + @Test + public void shouldAnonymizeInsertIntoCorrectly() { + final String output = anon.anonymize( + "INSERT INTO my_stream SELECT user_id, browser_cookie, ip_address\n" + + "FROM another_stream\n" + + "WHERE user_id = 4214\n" + + "AND browser_cookie = 'aefde34ec'\n" + + "AND ip_address = '10.10.0.2';"); + + Approvals.verify(output); + } + + @Test + public void shouldAnonymizeInsertValuesCorrectly() { + final String output = anon.anonymize( + "INSERT INTO foo (ROWTIME, KEY_COL, COL_A) VALUES (1510923225000, 'key', 'A');"); + + Approvals.verify(output); + } + + @Test + public void shouldAnonymizeDropStatementsCorrectly() { + Assert.assertEquals("DROP STREAM IF EXISTS stream1 DELETE TOPIC;", + anon.anonymize("DROP STREAM IF EXISTS my_stream DELETE TOPIC;")); + Assert.assertEquals("DROP TABLE IF EXISTS table1 DELETE TOPIC;", + anon.anonymize("DROP TABLE IF EXISTS my_table DELETE TOPIC;")); + Assert.assertEquals("DROP CONNECTOR IF EXISTS connector;", + anon.anonymize("DROP CONNECTOR IF EXISTS my_connector;")); + Assert.assertEquals("DROP TYPE IF EXISTS type;", + anon.anonymize("DROP TYPE IF EXISTS my_type;")); + } + + @Test + public void shouldAnonymizeUDFQueriesCorrectly() { + final String output = anon.anonymize("CREATE STREAM OUTPUT AS SELECT ID, " + + "REDUCE(numbers, 2, (s, x) => s + x) AS reduce FROM test;"); + + Approvals.verify(output); + } +} diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeAlterOptionCorrectly.approved.txt b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeAlterOptionCorrectly.approved.txt new file mode 100644 index 000000000000..49df51d07f89 --- /dev/null +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeAlterOptionCorrectly.approved.txt @@ -0,0 +1 @@ +ALTER STREAM stream1 (ADD COLUMN column1 INT, ADD COLUMN column2 INT); \ No newline at end of file diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateConnectorCorrectly.approved.txt b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateConnectorCorrectly.approved.txt new file mode 100644 index 000000000000..370177613124 --- /dev/null +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateConnectorCorrectly.approved.txt @@ -0,0 +1 @@ +CREATE SOURCE CONNECTOR connector WITH (connector.class='[string]', connection.url='[string]', mode='[string]', topic.prefix='[string]', table.whitelist='[string]', key='[string]'); \ No newline at end of file diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateStreamAsQueryCorrectly.approved.txt b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateStreamAsQueryCorrectly.approved.txt new file mode 100644 index 000000000000..42009bb2f915 --- /dev/null +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateStreamAsQueryCorrectly.approved.txt @@ -0,0 +1 @@ +CREATE STREAM stream1 AS SELECT column1, column2, column3 FROM stream2 WHERE column1='0' AND column2='[string]' AND column3='[string]'; \ No newline at end of file diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateStreamQueryCorrectly.approved.txt b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateStreamQueryCorrectly.approved.txt new file mode 100644 index 000000000000..5e4f7068b5c3 --- /dev/null +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateStreamQueryCorrectly.approved.txt @@ -0,0 +1 @@ +CREATE STREAM stream1 (column1 VARCHAR, column2 DOUBLE, column3 DOUBLE) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]', PARTITIONS='0'); \ No newline at end of file diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateTableAsQueryCorrectly.approved.txt b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateTableAsQueryCorrectly.approved.txt new file mode 100644 index 000000000000..47876fdb3443 --- /dev/null +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateTableAsQueryCorrectly.approved.txt @@ -0,0 +1 @@ +CREATE TABLE table1 AS SELECT column1, column2, column3 FROM table2 WHERE column1='0' AND column2='[string]' AND column3='[string]'; \ No newline at end of file diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateTableCorrectly.approved.txt b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateTableCorrectly.approved.txt new file mode 100644 index 000000000000..fb8ce14262a5 --- /dev/null +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateTableCorrectly.approved.txt @@ -0,0 +1 @@ +CREATE TABLE table1 (column1 VARCHAR, column2 DOUBLE, column3 DOUBLE) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]', PARTITIONS='0'); \ No newline at end of file diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateTypeCorrectly.approved.txt b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateTypeCorrectly.approved.txt new file mode 100644 index 000000000000..2c38837255bb --- /dev/null +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateTypeCorrectly.approved.txt @@ -0,0 +1 @@ +CREATE TYPE type AS STRUCT; \ No newline at end of file diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeInsertIntoCorrectly.approved.txt b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeInsertIntoCorrectly.approved.txt new file mode 100644 index 000000000000..36256533307e --- /dev/null +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeInsertIntoCorrectly.approved.txt @@ -0,0 +1 @@ +INSERT INTO stream1 SELECT column1, column2, column3 FROM stream2 WHERE column1='0' AND column2='[string]' AND column3='[string]'; \ No newline at end of file diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeInsertValuesCorrectly.approved.txt b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeInsertValuesCorrectly.approved.txt new file mode 100644 index 000000000000..292dc47124dd --- /dev/null +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeInsertValuesCorrectly.approved.txt @@ -0,0 +1 @@ +INSERT INTO stream1 (column1, column2, column3) VALUES ('0' ,'[string]' ,'[string]'); \ No newline at end of file diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeUDFQueriesCorrectly.approved.txt b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeUDFQueriesCorrectly.approved.txt new file mode 100644 index 000000000000..cab14be4ceeb --- /dev/null +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeUDFQueriesCorrectly.approved.txt @@ -0,0 +1 @@ +CREATE STREAM stream1 AS SELECT column1, udf1 FROM stream2; \ No newline at end of file