diff --git a/ksqldb-parser/pom.xml b/ksqldb-parser/pom.xml
index 7d66f222a4f4..234a2454ada4 100644
--- a/ksqldb-parser/pom.xml
+++ b/ksqldb-parser/pom.xml
@@ -85,6 +85,12 @@
test
+
+ com.approvaltests
+ approvaltests
+ 9.5.0
+
+
diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/DefaultKsqlParser.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/DefaultKsqlParser.java
index 9cc3b5755cd1..0d62d7df9c3d 100644
--- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/DefaultKsqlParser.java
+++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/DefaultKsqlParser.java
@@ -82,7 +82,7 @@ public PreparedStatement> prepare(
}
}
- private static SqlBaseParser.StatementsContext getParseTree(final String sql) {
+ public static SqlBaseParser.StatementsContext getParseTree(final String sql) {
final SqlBaseLexer sqlBaseLexer = new SqlBaseLexer(
new CaseInsensitiveStream(CharStreams.fromString(sql)));
diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/util/QueryAnonymizer.java b/ksqldb-parser/src/main/java/io/confluent/ksql/util/QueryAnonymizer.java
new file mode 100644
index 000000000000..633a14d6482d
--- /dev/null
+++ b/ksqldb-parser/src/main/java/io/confluent/ksql/util/QueryAnonymizer.java
@@ -0,0 +1,802 @@
+/*
+ * Copyright 2021 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.util;
+
+import io.confluent.ksql.execution.expression.tree.FunctionCall;
+import io.confluent.ksql.metastore.TypeRegistry;
+import io.confluent.ksql.name.ColumnName;
+import io.confluent.ksql.parser.AstBuilder;
+import io.confluent.ksql.parser.DefaultKsqlParser;
+import io.confluent.ksql.parser.SqlBaseBaseVisitor;
+import io.confluent.ksql.parser.SqlBaseParser.AlterOptionContext;
+import io.confluent.ksql.parser.SqlBaseParser.AlterSourceContext;
+import io.confluent.ksql.parser.SqlBaseParser.BooleanDefaultContext;
+import io.confluent.ksql.parser.SqlBaseParser.BooleanLiteralContext;
+import io.confluent.ksql.parser.SqlBaseParser.CreateConnectorContext;
+import io.confluent.ksql.parser.SqlBaseParser.CreateStreamAsContext;
+import io.confluent.ksql.parser.SqlBaseParser.CreateStreamContext;
+import io.confluent.ksql.parser.SqlBaseParser.CreateTableAsContext;
+import io.confluent.ksql.parser.SqlBaseParser.CreateTableContext;
+import io.confluent.ksql.parser.SqlBaseParser.DefineVariableContext;
+import io.confluent.ksql.parser.SqlBaseParser.DescribeConnectorContext;
+import io.confluent.ksql.parser.SqlBaseParser.DescribeFunctionContext;
+import io.confluent.ksql.parser.SqlBaseParser.DescribeStreamsContext;
+import io.confluent.ksql.parser.SqlBaseParser.DropConnectorContext;
+import io.confluent.ksql.parser.SqlBaseParser.DropStreamContext;
+import io.confluent.ksql.parser.SqlBaseParser.DropTableContext;
+import io.confluent.ksql.parser.SqlBaseParser.DropTypeContext;
+import io.confluent.ksql.parser.SqlBaseParser.ExplainContext;
+import io.confluent.ksql.parser.SqlBaseParser.ExpressionContext;
+import io.confluent.ksql.parser.SqlBaseParser.GroupByContext;
+import io.confluent.ksql.parser.SqlBaseParser.InsertIntoContext;
+import io.confluent.ksql.parser.SqlBaseParser.InsertValuesContext;
+import io.confluent.ksql.parser.SqlBaseParser.IntegerLiteralContext;
+import io.confluent.ksql.parser.SqlBaseParser.ListConnectorsContext;
+import io.confluent.ksql.parser.SqlBaseParser.ListFunctionsContext;
+import io.confluent.ksql.parser.SqlBaseParser.ListPropertiesContext;
+import io.confluent.ksql.parser.SqlBaseParser.ListQueriesContext;
+import io.confluent.ksql.parser.SqlBaseParser.ListStreamsContext;
+import io.confluent.ksql.parser.SqlBaseParser.ListTopicsContext;
+import io.confluent.ksql.parser.SqlBaseParser.ListTypesContext;
+import io.confluent.ksql.parser.SqlBaseParser.ListVariablesContext;
+import io.confluent.ksql.parser.SqlBaseParser.LogicalBinaryContext;
+import io.confluent.ksql.parser.SqlBaseParser.NumericLiteralContext;
+import io.confluent.ksql.parser.SqlBaseParser.PartitionByContext;
+import io.confluent.ksql.parser.SqlBaseParser.PrintTopicContext;
+import io.confluent.ksql.parser.SqlBaseParser.QueryContext;
+import io.confluent.ksql.parser.SqlBaseParser.RegisterTypeContext;
+import io.confluent.ksql.parser.SqlBaseParser.SelectItemContext;
+import io.confluent.ksql.parser.SqlBaseParser.SelectSingleContext;
+import io.confluent.ksql.parser.SqlBaseParser.SetPropertyContext;
+import io.confluent.ksql.parser.SqlBaseParser.ShowColumnsContext;
+import io.confluent.ksql.parser.SqlBaseParser.SingleExpressionContext;
+import io.confluent.ksql.parser.SqlBaseParser.SingleStatementContext;
+import io.confluent.ksql.parser.SqlBaseParser.StatementsContext;
+import io.confluent.ksql.parser.SqlBaseParser.StringLiteralContext;
+import io.confluent.ksql.parser.SqlBaseParser.TableElementContext;
+import io.confluent.ksql.parser.SqlBaseParser.TableElementsContext;
+import io.confluent.ksql.parser.SqlBaseParser.TablePropertiesContext;
+import io.confluent.ksql.parser.SqlBaseParser.TablePropertyContext;
+import io.confluent.ksql.parser.SqlBaseParser.TerminateQueryContext;
+import io.confluent.ksql.parser.SqlBaseParser.TypeContext;
+import io.confluent.ksql.parser.SqlBaseParser.UndefineVariableContext;
+import io.confluent.ksql.parser.SqlBaseParser.UnquotedIdentifierContext;
+import io.confluent.ksql.parser.SqlBaseParser.UnsetPropertyContext;
+import io.confluent.ksql.parser.SqlBaseParser.ValueExpressionContext;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.antlr.v4.runtime.ParserRuleContext;
+import org.antlr.v4.runtime.tree.TerminalNode;
+import org.apache.commons.lang3.StringUtils;
+
+public class QueryAnonymizer {
+
+ public String anonymize(final ParserRuleContext tree) {
+ return build(tree);
+ }
+
+ public String anonymize(final String query) {
+ final ParserRuleContext tree = DefaultKsqlParser.getParseTree(query);
+ return build(tree);
+ }
+
+ private String build(final ParserRuleContext parseTree) {
+ return new Visitor().visit(parseTree);
+ }
+
+ private static final class Visitor extends SqlBaseBaseVisitor {
+ private int streamCount = 1;
+ private int columnCount = 1;
+ private int tableCount = 1;
+ private int udfCount = 1;
+ private final Hashtable anonTable = new Hashtable<>();
+
+ @Override
+ public String visitStatements(final StatementsContext context) {
+ final List statementList = new ArrayList<>();
+ for (final SingleStatementContext stmtContext : context.singleStatement()) {
+ final String statement = visitSingleStatement(stmtContext);
+ statementList.add(statement);
+ }
+ return StringUtils.join(statementList, "");
+ }
+
+ @Override
+ public String visitSingleStatement(final SingleStatementContext context) {
+ return visit(context.statement());
+ }
+
+ @Override
+ public String visitType(final TypeContext context) {
+ if (context.type().isEmpty()) {
+ return context.getText();
+ }
+ final List typeList = new ArrayList<>();
+ for (TypeContext typeContext : context.type()) {
+ typeList.add(visit(typeContext));
+ }
+
+ return String.format("STRUCT<%s>", StringUtils.join(typeList, ", "));
+ }
+
+ @Override
+ public String visitAlterSource(final AlterSourceContext context) {
+ final StringBuilder stringBuilder = new StringBuilder("ALTER");
+
+ // anonymize stream or table name
+ final String streamTable = ParserUtil.getIdentifierText(context.sourceName().identifier());
+ if (context.STREAM() != null) {
+ stringBuilder.append(String.format(" STREAM %s", getAnonStreamName(streamTable)));
+ } else {
+ stringBuilder.append(String.format(" TABLE %s", getAnonTableName(streamTable)));
+ }
+
+ // alter option
+ final List alterOptions = new ArrayList<>();
+ for (AlterOptionContext alterOption : context.alterOption()) {
+ alterOptions.add(visit(alterOption));
+ }
+ stringBuilder.append(String.format(" (%s)", StringUtils.join(alterOptions, ", ")));
+
+ return String.format("%s;", stringBuilder.toString());
+ }
+
+ @Override
+ public String visitAlterOption(final AlterOptionContext context) {
+ final String columnName = context.identifier().getText();
+ final String anonColumnName = getAnonColumnName(columnName);
+ final String typeName = context.type().getText();
+
+ return String.format("ADD COLUMN %1$s %2$s", anonColumnName, typeName);
+ }
+
+ @Override
+ public String visitRegisterType(final RegisterTypeContext context) {
+ final StringBuilder stringBuilder = new StringBuilder("CREATE TYPE");
+
+ // optional if not exists
+ if (context.EXISTS() != null) {
+ stringBuilder.append(" IF NOT EXISTS");
+ }
+
+ // anonymize type
+ stringBuilder.append(String.format(" type AS %s", visit(context.type())));
+
+ return String.format("%s;", stringBuilder.toString());
+ }
+
+ @Override
+ public String visitCreateConnector(final CreateConnectorContext context) {
+ final StringBuilder stringBuilder = new StringBuilder("CREATE");
+
+ if (context.SOURCE() != null) {
+ stringBuilder.append(" SOURCE");
+ } else if (context.SINK() != null) {
+ stringBuilder.append(" SINK");
+ }
+
+ stringBuilder.append(" CONNECTOR");
+
+ // optional if not exists
+ if (context.EXISTS() != null) {
+ stringBuilder.append(" IF NOT EXISTS");
+ }
+
+ stringBuilder.append(" connector ");
+
+ // anonymize properties
+ if (context.tableProperties() != null) {
+ stringBuilder.append(visit(context.tableProperties()));
+ }
+
+ return String.format("%s;", stringBuilder.toString());
+ }
+
+ @Override
+ public String visitInsertInto(final InsertIntoContext context) {
+ final StringBuilder stringBuilder = new StringBuilder("INSERT INTO ");
+
+ // anonymize stream name
+ final String streamName = ParserUtil.getIdentifierText(context.sourceName().identifier());
+ stringBuilder.append(getAnonStreamName(streamName));
+
+ // anonymize properties
+ if (context.tableProperties() != null) {
+ stringBuilder.append(visit(context.tableProperties()));
+ }
+
+ // anonymize with query
+ if (context.query() != null) {
+ stringBuilder.append(String.format(" SELECT %s", getQuery(context.query(), true)));
+ }
+
+ return String.format("%s;", stringBuilder.toString());
+ }
+
+ @Override
+ public String visitInsertValues(final InsertValuesContext context) {
+ final StringBuilder stringBuilder = new StringBuilder("INSERT INTO ");
+
+ // anonymize stream name
+ final String streamName = ParserUtil.getIdentifierText(context.sourceName().identifier());
+ stringBuilder.append(getAnonStreamName(streamName));
+
+ // visit columns
+ if (context.columns() != null) {
+ final List columns = context.columns().identifier()
+ .stream()
+ .map(ParserUtil::getIdentifierText)
+ .map(ColumnName::of)
+ .map(ColumnName::toString)
+ .map(this::getAnonColumnName)
+ .collect(Collectors.toList());
+ stringBuilder.append(String.format(" (%s)", StringUtils.join(columns, ", ")));
+ }
+
+ // visit values
+ final List values = new ArrayList<>();
+ for (ValueExpressionContext value : context.values().valueExpression()) {
+ values.add(visit(value));
+ }
+ stringBuilder.append(String.format(" VALUES (%s)", StringUtils.join(values, " ,")));
+
+ return String.format("%s;", stringBuilder.toString());
+ }
+
+ @Override
+ public String visitListConnectors(final ListConnectorsContext context) {
+ final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW();
+ final StringBuilder stringBuilder = new StringBuilder(listOrVisit.toString());
+
+ if (context.SOURCE() != null) {
+ stringBuilder.append(" SOURCE");
+ } else if (context.SINK() != null) {
+ stringBuilder.append(" SINK");
+ }
+
+ stringBuilder.append(" CONNECTORS");
+
+ return String.format("%s;", stringBuilder.toString());
+ }
+
+ @Override
+ public String visitListStreams(final ListStreamsContext context) {
+ final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW();
+ final StringBuilder stringBuilder = new StringBuilder(listOrVisit.toString() + " STREAMS");
+
+ if (context.EXTENDED() != null) {
+ stringBuilder.append(" EXTENDED");
+ }
+
+ return String.format("%s;", stringBuilder.toString());
+ }
+
+ @Override
+ public String visitListFunctions(final ListFunctionsContext context) {
+ final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW();
+ return String.format("%s FUNCTIONS;", listOrVisit.toString());
+ }
+
+ @Override
+ public String visitListProperties(final ListPropertiesContext context) {
+ final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW();
+ return String.format("%s PROPERTIES;", listOrVisit.toString());
+ }
+
+ @Override
+ public String visitListTypes(final ListTypesContext context) {
+ final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW();
+ return String.format("%s TYPES;", listOrVisit.toString());
+ }
+
+ @Override
+ public String visitListVariables(final ListVariablesContext context) {
+ final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW();
+ return String.format("%s VARIABLES;", listOrVisit.toString());
+ }
+
+ @Override
+ public String visitListQueries(final ListQueriesContext context) {
+ final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW();
+ final StringBuilder stringBuilder = new StringBuilder(listOrVisit.toString() + " QUERIES");
+
+ if (context.EXTENDED() != null) {
+ stringBuilder.append(" EXTENDED");
+ }
+
+ return String.format("%s;", stringBuilder.toString());
+ }
+
+ @Override
+ public String visitListTopics(final ListTopicsContext context) {
+ final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW();
+ final StringBuilder stringBuilder = new StringBuilder(listOrVisit.toString());
+
+ if (context.ALL() != null) {
+ stringBuilder.append(" ALL");
+ }
+
+ stringBuilder.append(" TOPICS");
+
+ if (context.EXTENDED() != null) {
+ stringBuilder.append(" EXTENDED");
+ }
+
+ return String.format("%s;", stringBuilder.toString());
+ }
+
+ @Override
+ public String visitDescribeFunction(final DescribeFunctionContext context) {
+ return "DESCRIBE FUNCTION function;";
+ }
+
+ @Override
+ public String visitDescribeConnector(final DescribeConnectorContext context) {
+ return "DESCRIBE CONNECTOR connector;";
+ }
+
+ @Override
+ public String visitPrintTopic(final PrintTopicContext context) {
+ final StringBuilder stringBuilder = new StringBuilder("PRINT topic");
+
+ if (context.printClause().FROM() != null) {
+ stringBuilder.append(" FROM BEGINNING");
+ }
+
+ if (context.printClause().intervalClause() != null) {
+ stringBuilder.append(" INTERVAL '0'");
+ }
+
+ if (context.printClause().limitClause() != null) {
+ stringBuilder.append(" LIMIT '0'");
+ }
+
+ return String.format("%s;", stringBuilder.toString());
+ }
+
+ @Override
+ public String visitTerminateQuery(final TerminateQueryContext context) {
+ if (context.ALL() != null) {
+ return "TERMINATE ALL;";
+ }
+ return "TERMINATE query;";
+ }
+
+
+ @Override
+ public String visitDescribeStreams(final DescribeStreamsContext context) {
+ final StringBuilder stringBuilder = new StringBuilder("DESCRIBE STREAMS ");
+
+ if (context.EXTENDED() != null) {
+ stringBuilder.append("EXTENDED");
+ }
+
+ return String.format("%s;", stringBuilder.toString());
+ }
+
+ @Override
+ public String visitShowColumns(final ShowColumnsContext context) {
+ final StringBuilder stringBuilder = new StringBuilder("DESCRIBE ");
+
+ final String streamTable = ParserUtil.getIdentifierText(context.sourceName().identifier());
+ if (context.sourceName().identifier() instanceof UnquotedIdentifierContext
+ && context.sourceName().getText().equalsIgnoreCase("TABLES")) {
+ stringBuilder.append(getAnonTableName(streamTable));
+ } else {
+ stringBuilder.append(getAnonStreamName(streamTable));
+ }
+
+ if (context.EXTENDED() != null) {
+ stringBuilder.append(" EXTENDED");
+ }
+
+ return String.format("%s;", stringBuilder.toString());
+ }
+
+ @Override
+ public String visitSetProperty(final SetPropertyContext context) {
+ final String propertyName = context.STRING(0).getText();
+ return String.format("SET %s='[string]';", propertyName);
+ }
+
+ @Override
+ public String visitUnsetProperty(final UnsetPropertyContext context) {
+ final String propertyName = context.STRING().getText();
+ return String.format("UNSET %s;", propertyName);
+ }
+
+ @Override
+ public String visitDefineVariable(final DefineVariableContext context) {
+ return "DEFINE variable='[string]';";
+ }
+
+ @Override
+ public String visitUndefineVariable(final UndefineVariableContext context) {
+ return "UNDEFINE variable;";
+ }
+
+ @Override
+ public String visitExplain(final ExplainContext context) {
+ return "EXPLAIN query";
+ }
+
+ @Override
+ public String visitExpression(final ExpressionContext context) {
+ final String columnName = context.getText();
+
+ // check if it's an udf
+ if (new AstBuilder(TypeRegistry.EMPTY).buildExpression(context) instanceof FunctionCall) {
+ return getAnonUdfName(columnName);
+ }
+
+ return getAnonColumnName(columnName);
+ }
+
+ @Override
+ public String visitSelectSingle(final SelectSingleContext context) {
+ return visit(context.expression());
+ }
+
+ @Override
+ public String visitSingleExpression(final SingleExpressionContext context) {
+ return visit(context.expression());
+ }
+
+ @Override
+ public String visitBooleanDefault(final BooleanDefaultContext context) {
+ final String columnName = context.getChild(0).getChild(0).getText();
+ final String anonColumnName = getAnonColumnName(columnName);
+ final String anonValue = visit(context.getChild(0).getChild(1));
+ return String.format("%1$s=%2$s", anonColumnName, anonValue);
+ }
+
+ @Override
+ public String visitLogicalBinary(final LogicalBinaryContext context) {
+ return String.format("%1$s %2$s %3$s",
+ visit(context.left), context.operator.getText(), visit(context.right));
+ }
+
+ @Override
+ public String visitPartitionBy(final PartitionByContext context) {
+ final String columnName = context.getText();
+ return getAnonColumnName(columnName);
+ }
+
+ @Override
+ public String visitGroupBy(final GroupByContext context) {
+ final String columnName = context.getText();
+ return getAnonColumnName(columnName);
+ }
+
+ @Override
+ public String visitStringLiteral(final StringLiteralContext context) {
+ return "'[string]'";
+ }
+
+ @Override
+ public String visitIntegerLiteral(final IntegerLiteralContext context) {
+ return "'0'";
+ }
+
+ @Override
+ public String visitNumericLiteral(final NumericLiteralContext context) {
+ return "'0'";
+ }
+
+ @Override
+ public String visitBooleanLiteral(final BooleanLiteralContext context) {
+ return "'false'";
+ }
+
+ @Override
+ public String visitCreateStreamAs(final CreateStreamAsContext context) {
+ final StringBuilder stringBuilder = new StringBuilder("CREATE ");
+
+ // optional replace
+ if (context.OR() != null && context.REPLACE() != null) {
+ stringBuilder.append("OR REPLACE ");
+ }
+
+ stringBuilder.append("STREAM ");
+
+ // optional if not exists
+ if (context.IF() != null && context.NOT() != null && context.EXISTS() != null) {
+ stringBuilder.append("IF NOT EXISTS ");
+ }
+
+ // anonymize stream name
+ final String streamName = ParserUtil.getIdentifierText(context.sourceName().identifier());
+ stringBuilder.append(getAnonStreamName(streamName));
+
+ // anonymize properties
+ if (context.tableProperties() != null) {
+ stringBuilder.append(visit(context.tableProperties()));
+ }
+
+ // rest of query
+ if (context.query() != null) {
+ stringBuilder.append(String.format(" AS SELECT %s", getQuery(context.query(), true)));
+ }
+
+ return String.format("%s;", stringBuilder.toString());
+ }
+
+ @Override
+ public String visitCreateStream(final CreateStreamContext context) {
+ final StringBuilder stringBuilder = new StringBuilder("CREATE ");
+
+ // optional replace
+ if (context.OR() != null && context.REPLACE() != null) {
+ stringBuilder.append("OR REPLACE ");
+ }
+
+ stringBuilder.append("STREAM ");
+
+ // optional if not exists
+ if (context.IF() != null && context.NOT() != null && context.EXISTS() != null) {
+ stringBuilder.append("IF NOT EXISTS ");
+ }
+
+ // anonymize stream name
+ final String streamName = ParserUtil.getIdentifierText(context.sourceName().identifier());
+ stringBuilder.append(String.format("%s ", getAnonStreamName(streamName)));
+
+ // anonymize table elements
+ if (context.tableElements() != null) {
+ stringBuilder.append(visit(context.tableElements()));
+ }
+
+ // anonymize properties
+ if (context.tableProperties() != null) {
+ stringBuilder.append(visit(context.tableProperties()));
+ }
+
+ return String.format("%s;", stringBuilder.toString());
+ }
+
+ @Override
+ public String visitCreateTableAs(final CreateTableAsContext context) {
+ final StringBuilder stringBuilder = new StringBuilder("CREATE ");
+
+ // optional replace
+ if (context.OR() != null && context.REPLACE() != null) {
+ stringBuilder.append("OR REPLACE ");
+ }
+
+ stringBuilder.append("TABLE ");
+
+ // optional if not exists
+ if (context.IF() != null && context.NOT() != null && context.EXISTS() != null) {
+ stringBuilder.append("IF NOT EXISTS ");
+ }
+
+ // anonymize table name
+ final String tableName = ParserUtil.getIdentifierText(context.sourceName().identifier());
+ stringBuilder.append(getAnonTableName(tableName));
+
+ // anonymize properties
+ if (context.tableProperties() != null) {
+ stringBuilder.append(visit(context.tableProperties()));
+ }
+
+ // rest of query
+ if (context.query() != null) {
+ stringBuilder.append(String.format(" AS SELECT %s", getQuery(context.query(), false)));
+ }
+
+ return String.format("%s;", stringBuilder.toString());
+ }
+
+ @Override
+ public String visitCreateTable(final CreateTableContext context) {
+ final StringBuilder stringBuilder = new StringBuilder("CREATE ");
+
+ // optional replace
+ if (context.OR() != null && context.REPLACE() != null) {
+ stringBuilder.append("OR REPLACE ");
+ }
+
+ stringBuilder.append("TABLE ");
+
+ // optional if not exists
+ if (context.IF() != null && context.NOT() != null && context.EXISTS() != null) {
+ stringBuilder.append("IF NOT EXISTS ");
+ }
+
+ // anonymize table name
+ final String tableName = ParserUtil.getIdentifierText(context.sourceName().identifier());
+ stringBuilder.append(String.format("%s ", getAnonTableName(tableName)));
+
+ // anonymize table elements
+ if (context.tableElements() != null) {
+ stringBuilder.append(visit(context.tableElements()));
+ }
+
+ // anonymize properties
+ if (context.tableProperties() != null) {
+ stringBuilder.append(visit(context.tableProperties()));
+ }
+
+ return String.format("%s;", stringBuilder.toString());
+ }
+
+ @Override
+ public String visitTableElements(final TableElementsContext context) {
+ final List tableElements = new ArrayList<>();
+ for (final TableElementContext tableContext : context.tableElement()) {
+ tableElements.add(visit(tableContext));
+ }
+
+ return String.format("(%s) ", StringUtils.join(tableElements, ", "));
+ }
+
+ @Override
+ public String visitTableElement(final TableElementContext context) {
+ final String columnName = ParserUtil.getIdentifierText(context.identifier());
+ final String newName = getAnonColumnName(columnName);
+
+ return String.format("%1$s %2$s", newName, context.type().getText());
+ }
+
+ @Override
+ public String visitTableProperties(final TablePropertiesContext context) {
+ final List tableProperties = new ArrayList<>();
+ for (final TablePropertyContext prop : context.tableProperty()) {
+ final StringBuilder formattedProp = new StringBuilder();
+ if (prop.identifier() != null) {
+ formattedProp.append(ParserUtil.getIdentifierText(prop.identifier()));
+ } else {
+ formattedProp.append(prop.STRING().getText());
+ }
+ formattedProp.append("=").append(visit(prop.literal()));
+ tableProperties.add(formattedProp.toString());
+ }
+
+ return String.format("WITH (%s)", StringUtils.join(tableProperties, ", "));
+ }
+
+ @Override
+ public String visitDropTable(final DropTableContext context) {
+ final StringBuilder stringBuilder = new StringBuilder("DROP TABLE ");
+
+ if (context.EXISTS() != null) {
+ stringBuilder.append("IF EXISTS ");
+ }
+
+ // anonymize table name
+ final String tableName = ParserUtil.getIdentifierText(context.sourceName().identifier());
+ stringBuilder.append(getAnonTableName(tableName));
+
+ if (context.DELETE() != null) {
+ stringBuilder.append(" DELETE TOPIC");
+ }
+
+ return String.format("%s;", stringBuilder.toString());
+ }
+
+ @Override
+ public String visitDropStream(final DropStreamContext context) {
+ final StringBuilder stringBuilder = new StringBuilder("DROP STREAM ");
+
+ if (context.EXISTS() != null) {
+ stringBuilder.append("IF EXISTS ");
+ }
+
+ // anonymize stream name
+ final String streamName = ParserUtil.getIdentifierText(context.sourceName().identifier());
+ stringBuilder.append(getAnonStreamName(streamName));
+
+ if (context.DELETE() != null) {
+ stringBuilder.append(" DELETE TOPIC");
+ }
+
+ return String.format("%s;", stringBuilder.toString());
+ }
+
+ @Override
+ public String visitDropConnector(final DropConnectorContext context) {
+ final StringBuilder stringBuilder = new StringBuilder("DROP CONNECTOR ");
+
+ if (context.EXISTS() != null) {
+ stringBuilder.append("IF EXISTS ");
+ }
+
+ stringBuilder.append("connector");
+
+ return String.format("%s;", stringBuilder.toString());
+ }
+
+ @Override
+ public String visitDropType(final DropTypeContext context) {
+ final StringBuilder stringBuilder = new StringBuilder("DROP TYPE ");
+
+ if (context.EXISTS() != null) {
+ stringBuilder.append("IF EXISTS ");
+ }
+
+ stringBuilder.append("type");
+
+ return String.format("%s;", stringBuilder.toString());
+ }
+
+ private String getQuery(final QueryContext context, final boolean isStream) {
+ final StringBuilder stringBuilder = new StringBuilder();
+
+ // visit as select items
+ final List selectItemList = new ArrayList<>();
+ for (SelectItemContext selectItem : context.selectItem()) {
+ selectItemList.add(visit(selectItem));
+ }
+ stringBuilder.append(StringUtils.join(selectItemList, ", "));
+
+ // visit from statement
+ final String streamTableName = context.from.getText();
+ final String anonStreamTableName =
+ !isStream ? getAnonTableName(streamTableName) : getAnonStreamName(streamTableName);
+ stringBuilder.append(String.format(" FROM %s", anonStreamTableName));
+
+ // visit where statement
+ if (context.where != null) {
+ stringBuilder.append(String.format(" WHERE %s", visit(context.where)));
+ }
+
+ // visit partition by
+ if (context.partitionBy() != null) {
+ stringBuilder.append(String.format(" PARTITION BY %s", visit(context.partitionBy())));
+ }
+
+ // visit group by
+ if (context.groupBy() != null) {
+ stringBuilder.append(String.format(" GROUP BY %s", visit(context.groupBy())));
+ }
+
+ // visit emit changes
+ if (context.EMIT() != null) {
+ stringBuilder.append(" EMIT CHANGES");
+ }
+
+ return stringBuilder.toString();
+ }
+
+ private String getAnonUdfName(final String originName) {
+ return getAnonName(originName, "udf", udfCount++);
+ }
+
+ private String getAnonStreamName(final String originName) {
+ return getAnonName(originName, "stream", streamCount++);
+ }
+
+ private String getAnonColumnName(final String originName) {
+ return getAnonName(originName, "column", columnCount++);
+ }
+
+ private String getAnonTableName(final String originName) {
+ return getAnonName(originName, "table", tableCount++);
+ }
+
+ private String getAnonName(final String originName, final String genericName, final int count) {
+ if (anonTable.containsKey(originName)) {
+ return anonTable.get(originName);
+ }
+
+ final String newName = String.format("%s%d", genericName, count);
+ anonTable.put(originName, newName);
+ return newName;
+ }
+ }
+}
diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.java
new file mode 100644
index 000000000000..fa7a566dd81a
--- /dev/null
+++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.java
@@ -0,0 +1,230 @@
+/*
+ * Copyright 2021 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.util;
+
+import io.confluent.ksql.parser.DefaultKsqlParser;
+import org.antlr.v4.runtime.ParserRuleContext;
+import org.approvaltests.Approvals;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class QueryAnonymizerTest {
+
+ private QueryAnonymizer anon;
+
+ @Before
+ public void setUp() {
+ anon = new QueryAnonymizer();
+ }
+
+ @Test
+ public void shouldWorkAsExpectedWhenPassedAParseTreeInsteadOfString() {
+ // Given:
+ final ParserRuleContext tree =
+ DefaultKsqlParser.getParseTree("DESCRIBE my_stream EXTENDED;");
+
+ // Then:
+ Assert.assertEquals("DESCRIBE stream1 EXTENDED;",
+ anon.anonymize(tree));
+ }
+
+ @Test
+ public void shouldLeaveListAndPrintStatementsAsTheyAre() {
+ Assert.assertEquals("LIST PROPERTIES;", anon.anonymize("LIST PROPERTIES;"));
+ Assert.assertEquals("SHOW PROPERTIES;", anon.anonymize("SHOW PROPERTIES;"));
+ Assert.assertEquals("LIST TOPICS;", anon.anonymize("LIST TOPICS;"));
+ Assert.assertEquals("SHOW ALL TOPICS EXTENDED;",
+ anon.anonymize("SHOW ALL TOPICS EXTENDED;"));
+ Assert.assertEquals("LIST STREAMS EXTENDED;",
+ anon.anonymize("LIST STREAMS EXTENDED;"));
+ Assert.assertEquals("LIST FUNCTIONS;", anon.anonymize("LIST FUNCTIONS;"));
+ Assert.assertEquals("LIST CONNECTORS;", anon.anonymize("LIST CONNECTORS;"));
+ Assert.assertEquals("LIST SOURCE CONNECTORS;",
+ anon.anonymize("LIST SOURCE CONNECTORS;"));
+ Assert.assertEquals("LIST TYPES;", anon.anonymize("LIST TYPES;"));
+ Assert.assertEquals("LIST VARIABLES;", anon.anonymize("LIST VARIABLES;"));
+ Assert.assertEquals("LIST QUERIES;", anon.anonymize("LIST QUERIES;"));
+ }
+
+ @Test
+ public void describeStatementsShouldGetAnonymized() {
+ Assert.assertEquals("DESCRIBE stream1 EXTENDED;",
+ anon.anonymize("DESCRIBE my_stream EXTENDED;"));
+ Assert.assertEquals("DESCRIBE STREAMS EXTENDED;",
+ anon.anonymize("DESCRIBE STREAMS EXTENDED;"));
+ Assert.assertEquals("DESCRIBE FUNCTION function;",
+ anon.anonymize("DESCRIBE FUNCTION my_function;"));
+ Assert.assertEquals("DESCRIBE CONNECTOR connector;",
+ anon.anonymize("DESCRIBE CONNECTOR my_connector;"));
+ }
+
+ @Test
+ public void printStatementsShouldGetAnonymized() {
+ Assert.assertEquals("PRINT topic FROM BEGINNING;",
+ anon.anonymize("PRINT my_topic FROM BEGINNING;"));
+ Assert.assertEquals("PRINT topic INTERVAL '0';",
+ anon.anonymize("PRINT my_topic INTERVAL 2;"));
+ Assert.assertEquals("PRINT topic LIMIT '0';",
+ anon.anonymize("PRINT my_topic LIMIT 3;"));
+ }
+
+ @Test
+ public void terminateQueryShouldGetAnonymized() {
+ Assert.assertEquals("TERMINATE query;",
+ anon.anonymize("TERMINATE my_query;"));
+ Assert.assertEquals("TERMINATE ALL;",
+ anon.anonymize("TERMINATE ALL;"));
+ }
+
+ @Test
+ public void shouldAnonymizeSetUnsetProperty() {
+ Assert.assertEquals("SET 'auto.offset.reset'='[string]';",
+ anon.anonymize("SET 'auto.offset.reset'='earliest';"));
+ Assert.assertEquals("UNSET 'auto.offset.reset';",
+ anon.anonymize("UNSET 'auto.offset.reset';"));
+ }
+
+ @Test
+ public void shouldAnonymizeDefineUndefineProperty() {
+ Assert.assertEquals("DEFINE variable='[string]';",
+ anon.anonymize("DEFINE format = 'JSON';"));
+ Assert.assertEquals("UNDEFINE variable;",
+ anon.anonymize("UNDEFINE format;"));
+ }
+
+ @Test
+ public void shouldAnonymizeExplainStatementCorrectly() {
+ Assert.assertEquals("EXPLAIN query", anon.anonymize("EXPLAIN my_query;"));
+ }
+
+ @Test
+ public void shouldAnonymizeCreateStreamQueryCorrectly() {
+ final String output = anon.anonymize(
+ "CREATE STREAM my_stream (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)\n"
+ + "WITH (kafka_topic='locations', value_format='json', partitions=1);");
+
+ Approvals.verify(output);
+ }
+
+ @Test
+ public void shouldAnonymizeCreateStreamAsQueryCorrectly() {
+ final String output = anon.anonymize(
+ "CREATE STREAM my_stream AS SELECT user_id, browser_cookie, ip_address\n"
+ + "FROM another_stream\n"
+ + "WHERE user_id = 4214\n"
+ + "AND browser_cookie = 'aefde34ec'\n"
+ + "AND ip_address = '10.10.0.2';");
+
+ Approvals.verify(output);
+ }
+
+ @Test
+ public void shouldAnonymizeCreateTableCorrectly() {
+ final String output = anon.anonymize(
+ "CREATE TABLE my_table (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)\n"
+ + "WITH (kafka_topic='locations', value_format='json', partitions=1);");
+
+ Approvals.verify(output);
+ }
+
+ @Test
+ public void shouldAnonymizeCreateTableAsQueryCorrectly() {
+ final String output = anon.anonymize(
+ "CREATE TABLE my_table AS SELECT user_id, browser_cookie, ip_address\n"
+ + "FROM another_table\n"
+ + "WHERE user_id = 4214\n"
+ + "AND browser_cookie = 'aefde34ec'\n"
+ + "AND ip_address = '10.10.0.2';");
+
+ Approvals.verify(output);
+ }
+
+ @Test
+ public void shouldAnonymizeCreateConnectorCorrectly() {
+ final String output = anon.anonymize(
+ "CREATE SOURCE CONNECTOR `jdbc-connector` WITH(\n"
+ + "\"connector.class\"='io.confluent.connect.jdbc.JdbcSourceConnector',\n"
+ + "\"connection.url\"='jdbc:postgresql://localhost:5432/my.db',\n"
+ + "\"mode\"='bulk',\n"
+ + "\"topic.prefix\"='jdbc-',\n"
+ + "\"table.whitelist\"='users',\n"
+ + "\"key\"='username');");
+
+ Approvals.verify(output);
+ }
+
+ @Test
+ public void shouldAnonymizeCreateTypeCorrectly() {
+ // simple statement
+ Assert.assertEquals("CREATE TYPE type AS INTEGER;",
+ anon.anonymize("CREATE TYPE ADDRESS AS INTEGER;"));
+
+ // more elaborate statement
+ final String output = anon.anonymize(
+ "CREATE TYPE ADDRESS AS STRUCT;");
+
+ Approvals.verify(output);
+ }
+
+ @Test
+ public void shouldAnonymizeAlterOptionCorrectly() {
+ final String output = anon.anonymize(
+ "ALTER STREAM my_stream ADD COLUMN c3 INT, ADD COLUMN c4 INT;");
+
+ Approvals.verify(output);
+ }
+
+ @Test
+ public void shouldAnonymizeInsertIntoCorrectly() {
+ final String output = anon.anonymize(
+ "INSERT INTO my_stream SELECT user_id, browser_cookie, ip_address\n"
+ + "FROM another_stream\n"
+ + "WHERE user_id = 4214\n"
+ + "AND browser_cookie = 'aefde34ec'\n"
+ + "AND ip_address = '10.10.0.2';");
+
+ Approvals.verify(output);
+ }
+
+ @Test
+ public void shouldAnonymizeInsertValuesCorrectly() {
+ final String output = anon.anonymize(
+ "INSERT INTO foo (ROWTIME, KEY_COL, COL_A) VALUES (1510923225000, 'key', 'A');");
+
+ Approvals.verify(output);
+ }
+
+ @Test
+ public void shouldAnonymizeDropStatementsCorrectly() {
+ Assert.assertEquals("DROP STREAM IF EXISTS stream1 DELETE TOPIC;",
+ anon.anonymize("DROP STREAM IF EXISTS my_stream DELETE TOPIC;"));
+ Assert.assertEquals("DROP TABLE IF EXISTS table1 DELETE TOPIC;",
+ anon.anonymize("DROP TABLE IF EXISTS my_table DELETE TOPIC;"));
+ Assert.assertEquals("DROP CONNECTOR IF EXISTS connector;",
+ anon.anonymize("DROP CONNECTOR IF EXISTS my_connector;"));
+ Assert.assertEquals("DROP TYPE IF EXISTS type;",
+ anon.anonymize("DROP TYPE IF EXISTS my_type;"));
+ }
+
+ @Test
+ public void shouldAnonymizeUDFQueriesCorrectly() {
+ final String output = anon.anonymize("CREATE STREAM OUTPUT AS SELECT ID, "
+ + "REDUCE(numbers, 2, (s, x) => s + x) AS reduce FROM test;");
+
+ Approvals.verify(output);
+ }
+}
diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeAlterOptionCorrectly.approved.txt b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeAlterOptionCorrectly.approved.txt
new file mode 100644
index 000000000000..49df51d07f89
--- /dev/null
+++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeAlterOptionCorrectly.approved.txt
@@ -0,0 +1 @@
+ALTER STREAM stream1 (ADD COLUMN column1 INT, ADD COLUMN column2 INT);
\ No newline at end of file
diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateConnectorCorrectly.approved.txt b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateConnectorCorrectly.approved.txt
new file mode 100644
index 000000000000..370177613124
--- /dev/null
+++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateConnectorCorrectly.approved.txt
@@ -0,0 +1 @@
+CREATE SOURCE CONNECTOR connector WITH (connector.class='[string]', connection.url='[string]', mode='[string]', topic.prefix='[string]', table.whitelist='[string]', key='[string]');
\ No newline at end of file
diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateStreamAsQueryCorrectly.approved.txt b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateStreamAsQueryCorrectly.approved.txt
new file mode 100644
index 000000000000..42009bb2f915
--- /dev/null
+++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateStreamAsQueryCorrectly.approved.txt
@@ -0,0 +1 @@
+CREATE STREAM stream1 AS SELECT column1, column2, column3 FROM stream2 WHERE column1='0' AND column2='[string]' AND column3='[string]';
\ No newline at end of file
diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateStreamQueryCorrectly.approved.txt b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateStreamQueryCorrectly.approved.txt
new file mode 100644
index 000000000000..5e4f7068b5c3
--- /dev/null
+++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateStreamQueryCorrectly.approved.txt
@@ -0,0 +1 @@
+CREATE STREAM stream1 (column1 VARCHAR, column2 DOUBLE, column3 DOUBLE) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]', PARTITIONS='0');
\ No newline at end of file
diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateTableAsQueryCorrectly.approved.txt b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateTableAsQueryCorrectly.approved.txt
new file mode 100644
index 000000000000..47876fdb3443
--- /dev/null
+++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateTableAsQueryCorrectly.approved.txt
@@ -0,0 +1 @@
+CREATE TABLE table1 AS SELECT column1, column2, column3 FROM table2 WHERE column1='0' AND column2='[string]' AND column3='[string]';
\ No newline at end of file
diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateTableCorrectly.approved.txt b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateTableCorrectly.approved.txt
new file mode 100644
index 000000000000..fb8ce14262a5
--- /dev/null
+++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateTableCorrectly.approved.txt
@@ -0,0 +1 @@
+CREATE TABLE table1 (column1 VARCHAR, column2 DOUBLE, column3 DOUBLE) WITH (KAFKA_TOPIC='[string]', VALUE_FORMAT='[string]', PARTITIONS='0');
\ No newline at end of file
diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateTypeCorrectly.approved.txt b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateTypeCorrectly.approved.txt
new file mode 100644
index 000000000000..2c38837255bb
--- /dev/null
+++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeCreateTypeCorrectly.approved.txt
@@ -0,0 +1 @@
+CREATE TYPE type AS STRUCT;
\ No newline at end of file
diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeInsertIntoCorrectly.approved.txt b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeInsertIntoCorrectly.approved.txt
new file mode 100644
index 000000000000..36256533307e
--- /dev/null
+++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeInsertIntoCorrectly.approved.txt
@@ -0,0 +1 @@
+INSERT INTO stream1 SELECT column1, column2, column3 FROM stream2 WHERE column1='0' AND column2='[string]' AND column3='[string]';
\ No newline at end of file
diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeInsertValuesCorrectly.approved.txt b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeInsertValuesCorrectly.approved.txt
new file mode 100644
index 000000000000..292dc47124dd
--- /dev/null
+++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeInsertValuesCorrectly.approved.txt
@@ -0,0 +1 @@
+INSERT INTO stream1 (column1, column2, column3) VALUES ('0' ,'[string]' ,'[string]');
\ No newline at end of file
diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeUDFQueriesCorrectly.approved.txt b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeUDFQueriesCorrectly.approved.txt
new file mode 100644
index 000000000000..cab14be4ceeb
--- /dev/null
+++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/QueryAnonymizerTest.shouldAnonymizeUDFQueriesCorrectly.approved.txt
@@ -0,0 +1 @@
+CREATE STREAM stream1 AS SELECT column1, udf1 FROM stream2;
\ No newline at end of file