diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandExec.java b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandExec.java index 1bd5a5d9fe9e..df5b288849f2 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandExec.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandExec.java @@ -114,7 +114,7 @@ public DdlCommandResult executeDropSource(final DropSourceCommand dropSource) { @Override public DdlCommandResult executeRegisterType(final RegisterTypeCommand registerType) { - final String name = registerType.getName(); + final String name = registerType.getTypeName(); final SqlType type = registerType.getType(); metaStore.registerType(name, type); return new DdlCommandResult( @@ -141,7 +141,7 @@ private static KeyField getKeyField(final Optional keyFieldName) { private static KsqlTopic getKsqlTopic(final CreateSourceCommand createSource) { return new KsqlTopic( - createSource.getKafkaTopicName(), + createSource.getTopicName(), KeyFormat.of(createSource.getFormats().getKeyFormat(), createSource.getWindowInfo()), ValueFormat.of(createSource.getFormats().getValueFormat()) ); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index ffb3d8897bbc..fd8ccae90f6a 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -33,7 +33,7 @@ import io.confluent.ksql.execution.plan.StreamFlatMap; import io.confluent.ksql.execution.plan.StreamGroupBy; import io.confluent.ksql.execution.plan.StreamGroupByKey; -import io.confluent.ksql.execution.plan.StreamMapValues; +import io.confluent.ksql.execution.plan.StreamSelect; import io.confluent.ksql.execution.plan.StreamSelectKey; import io.confluent.ksql.execution.plan.StreamSink; import io.confluent.ksql.execution.plan.StreamStreamJoin; @@ -147,7 +147,7 @@ public SchemaKStream select( final KsqlQueryBuilder ksqlQueryBuilder ) { final KeyField keyField = findKeyField(selectExpressions); - final StreamMapValues step = ExecutionStepFactory.streamMapValues( + final StreamSelect step = ExecutionStepFactory.streamSelect( contextStacker, sourceStep, selectExpressions diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java index 13e260a417d1..68268f839070 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java @@ -26,7 +26,7 @@ import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.execution.plan.TableFilter; import io.confluent.ksql.execution.plan.TableGroupBy; -import io.confluent.ksql.execution.plan.TableMapValues; +import io.confluent.ksql.execution.plan.TableSelect; import io.confluent.ksql.execution.plan.TableSink; import io.confluent.ksql.execution.plan.TableTableJoin; import io.confluent.ksql.execution.streams.ExecutionStepFactory; @@ -118,7 +118,7 @@ public SchemaKTable select( final KsqlQueryBuilder ksqlQueryBuilder ) { final KeyField keyField = findKeyField(selectExpressions); - final TableMapValues step = ExecutionStepFactory.tableMapValues( + final TableSelect step = ExecutionStepFactory.tableMapValues( contextStacker, sourceTableStep, selectExpressions diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/AvroUtil.java b/ksql-engine/src/main/java/io/confluent/ksql/util/AvroUtil.java index e636d15f516d..69977d4f4e71 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/AvroUtil.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/AvroUtil.java @@ -53,7 +53,7 @@ public static void throwOnInvalidSchemaEvolution( ksqlConfig ); - final String topicName = ddl.getKafkaTopicName(); + final String topicName = ddl.getTopicName(); if (!isValidAvroSchemaForTopic(topicName, avroSchema, schemaRegistryClient)) { throw new KsqlStatementException(String.format( diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/PlanSummary.java b/ksql-engine/src/main/java/io/confluent/ksql/util/PlanSummary.java index f9c210b18281..71297e0cbf7a 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/PlanSummary.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/PlanSummary.java @@ -24,7 +24,7 @@ import io.confluent.ksql.execution.plan.StreamFilter; import io.confluent.ksql.execution.plan.StreamFlatMap; import io.confluent.ksql.execution.plan.StreamGroupBy; -import io.confluent.ksql.execution.plan.StreamMapValues; +import io.confluent.ksql.execution.plan.StreamSelect; import io.confluent.ksql.execution.plan.StreamSelectKey; import io.confluent.ksql.execution.plan.StreamSink; import io.confluent.ksql.execution.plan.StreamSource; @@ -34,7 +34,7 @@ import io.confluent.ksql.execution.plan.TableAggregate; import io.confluent.ksql.execution.plan.TableFilter; import io.confluent.ksql.execution.plan.TableGroupBy; -import io.confluent.ksql.execution.plan.TableMapValues; +import io.confluent.ksql.execution.plan.TableSelect; import io.confluent.ksql.execution.plan.TableSink; import io.confluent.ksql.execution.plan.TableSource; import io.confluent.ksql.execution.plan.TableTableJoin; @@ -66,7 +66,7 @@ public class PlanSummary { .put(StreamFilter.class, "FILTER") .put(StreamFlatMap.class, "FLAT_MAP") .put(StreamGroupBy.class, "GROUP_BY") - .put(StreamMapValues.class, "PROJECT") + .put(StreamSelect.class, "PROJECT") .put(StreamSelectKey.class, "REKEY") .put(StreamSink.class, "SINK") .put(StreamSource.class, "SOURCE") @@ -76,7 +76,7 @@ public class PlanSummary { .put(TableAggregate.class, "AGGREGATE") .put(TableFilter.class, "FILTER") .put(TableGroupBy.class, "GROUP_BY") - .put(TableMapValues.class, "PROJECT") + .put(TableSelect.class, "PROJECT") .put(TableSink.class, "SINK") .put(TableTableJoin.class, "JOIN") .put(TableSource.class, "SOURCE") diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/RegisterTypeFactoryTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/RegisterTypeFactoryTest.java index d4a0142c23b4..39c88938101e 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/RegisterTypeFactoryTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/RegisterTypeFactoryTest.java @@ -44,6 +44,6 @@ public void shouldCreateCommandForRegisterType() { // Then: assertThat(result.getType(), equalTo(ddlStatement.getType().getSqlType())); - assertThat(result.getName(), equalTo("alias")); + assertThat(result.getTypeName(), equalTo("alias")); } } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java index d3682df93790..75447d9c6d7d 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java @@ -169,7 +169,7 @@ public void shouldBuildStepForSelect() { assertThat( projectedSchemaKStream.getSourceStep(), equalTo( - ExecutionStepFactory.streamMapValues( + ExecutionStepFactory.streamSelect( childContextStacker, initialSchemaKStream.getSourceStep(), selectExpressions diff --git a/ksql-engine/src/test/java/io/confluent/ksql/util/AvroUtilTest.java b/ksql-engine/src/test/java/io/confluent/ksql/util/AvroUtilTest.java index 0767ad088c62..6eaa30181cae 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/util/AvroUtilTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/util/AvroUtilTest.java @@ -122,7 +122,7 @@ public class AvroUtilTest { public void setUp() { when(ddlCommand.getFormats()).thenReturn(FORMATS); when(ddlCommand.getSchema()).thenReturn(MUTLI_FIELD_SCHEMA); - when(ddlCommand.getKafkaTopicName()).thenReturn(RESULT_TOPIC_NAME); + when(ddlCommand.getTopicName()).thenReturn(RESULT_TOPIC_NAME); } @Test @@ -258,7 +258,7 @@ public void shouldThrowOnSrAuthorizationErrors() throws Exception { expectedException.expectMessage("Could not connect to Schema Registry service"); expectedException.expectMessage(containsString(String.format( "Not authorized to access Schema Registry subject: [%s]", - ddlCommand.getKafkaTopicName() + ddlCommand.getTopicName() + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX ))); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/util/PlanSummaryTest.java b/ksql-engine/src/test/java/io/confluent/ksql/util/PlanSummaryTest.java index d50c797b94eb..5ae82fbeb8c8 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/util/PlanSummaryTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/util/PlanSummaryTest.java @@ -25,7 +25,7 @@ import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.plan.ExecutionStep; import io.confluent.ksql.execution.plan.ExecutionStepPropertiesV1; -import io.confluent.ksql.execution.plan.StreamMapValues; +import io.confluent.ksql.execution.plan.StreamSelect; import io.confluent.ksql.execution.plan.StreamSource; import io.confluent.ksql.execution.plan.StreamStreamJoin; import io.confluent.ksql.execution.streams.StepSchemaResolver; @@ -76,7 +76,7 @@ public void shouldSummarizeWithSource() { final LogicalSchema schema = LogicalSchema.builder() .valueColumn(ColumnName.of("L1"), SqlTypes.STRING) .build(); - final ExecutionStep step = givenStep(StreamMapValues.class, "child", schema, sourceStep); + final ExecutionStep step = givenStep(StreamSelect.class, "child", schema, sourceStep); // When: final String summary = planSummaryBuilder.summarize(step); diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateSourceCommand.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateSourceCommand.java index d4beaa204d5b..c4aae635222f 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateSourceCommand.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateSourceCommand.java @@ -34,7 +34,7 @@ public abstract class CreateSourceCommand implements DdlCommand { private final LogicalSchema schema; private final Optional keyField; private final Optional timestampColumn; - private final String kafkaTopicName; + private final String topicName; private final Formats formats; private final Optional windowInfo; @@ -43,7 +43,7 @@ public abstract class CreateSourceCommand implements DdlCommand { final LogicalSchema schema, final Optional keyField, final Optional timestampColumn, - final String kafkaTopicName, + final String topicName, final Formats formats, final Optional windowInfo ) { @@ -52,7 +52,7 @@ public abstract class CreateSourceCommand implements DdlCommand { this.keyField = Objects.requireNonNull(keyField, "keyField"); this.timestampColumn = Objects.requireNonNull(timestampColumn, "timestampColumn"); - this.kafkaTopicName = Objects.requireNonNull(kafkaTopicName, "kafkaTopicName"); + this.topicName = Objects.requireNonNull(topicName, "topicName"); this.formats = Objects.requireNonNull(formats, "formats"); this.windowInfo = Objects.requireNonNull(windowInfo, "windowInfo"); @@ -78,8 +78,8 @@ public Optional getKeyField() { return keyField; } - public String getKafkaTopicName() { - return kafkaTopicName; + public String getTopicName() { + return topicName; } public Formats getFormats() { diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateStreamCommand.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateStreamCommand.java index f14e336823e8..9b08c8970370 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateStreamCommand.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateStreamCommand.java @@ -33,7 +33,7 @@ public CreateStreamCommand( @JsonProperty(value = "keyField") Optional keyField, @JsonProperty(value = "timestampColumn") Optional timestampColumn, - @JsonProperty(value = "kafkaTopicName", required = true) String kafkaTopicName, + @JsonProperty(value = "topicName", required = true) String topicName, @JsonProperty(value = "formats", required = true) final Formats formats, @JsonProperty(value = "windowInfo") final Optional windowInfo ) { @@ -42,7 +42,7 @@ public CreateStreamCommand( schema, keyField, timestampColumn, - kafkaTopicName, + topicName, formats, windowInfo ); diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateTableCommand.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateTableCommand.java index 6e47b27ef636..46f1b4fd87fa 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateTableCommand.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateTableCommand.java @@ -32,7 +32,7 @@ public CreateTableCommand( @JsonProperty(value = "schema", required = true) LogicalSchema schema, @JsonProperty("keyField") Optional keyField, @JsonProperty("timestampColumn") Optional timestampColumn, - @JsonProperty(value = "kafkaTopicName", required = true) String kafkaTopicName, + @JsonProperty(value = "topicName", required = true) String topicName, @JsonProperty(value = "formats", required = true) final Formats formats, @JsonProperty(value = "windowInfo") final Optional windowInfo ) { @@ -41,7 +41,7 @@ public CreateTableCommand( schema, keyField, timestampColumn, - kafkaTopicName, + topicName, formats, windowInfo ); diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/RegisterTypeCommand.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/RegisterTypeCommand.java index dfef139cae13..73e59c5a9934 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/RegisterTypeCommand.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/RegisterTypeCommand.java @@ -23,13 +23,13 @@ @Immutable public class RegisterTypeCommand implements DdlCommand { private final SqlType type; - private final String name; + private final String typeName; public RegisterTypeCommand( @JsonProperty(value = "type", required = true) final SqlType type, - @JsonProperty(value = "name", required = true) final String name) { + @JsonProperty(value = "typeName", required = true) final String typeName) { this.type = Objects.requireNonNull(type, "type"); - this.name = Objects.requireNonNull(name, "name"); + this.typeName = Objects.requireNonNull(typeName, "typeName"); } @Override @@ -41,7 +41,7 @@ public SqlType getType() { return type; } - public String getName() { - return name; + public String getTypeName() { + return typeName; } } diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStep.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStep.java index cc86d1969585..44bcab20f2c6 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStep.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStep.java @@ -32,7 +32,7 @@ @Type(value = StreamFlatMap.class, name = "streamFlatMapV1"), @Type(value = StreamGroupBy.class, name = "streamGroupByV1"), @Type(value = StreamGroupByKey.class, name = "streamGroupByKeyV1"), - @Type(value = StreamMapValues.class, name = "streamMapValuesV1"), + @Type(value = StreamSelect.class, name = "streamSelectV1"), @Type(value = StreamSelectKey.class, name = "streamSelectKeyV1"), @Type(value = StreamSink.class, name = "streamSinkV1"), @Type(value = StreamSource.class, name = "streamSourceV1"), @@ -45,7 +45,7 @@ @Type(value = TableAggregate.class, name = "tableAggregateV1"), @Type(value = TableFilter.class, name = "tableFilterV1"), @Type(value = TableGroupBy.class, name = "tableGroupByV1"), - @Type(value = TableMapValues.class, name = "tableMapValuesV1"), + @Type(value = TableSelect.class, name = "tableSelectV1"), @Type(value = TableSink.class, name = "tableSinkV1"), @Type(value = TableTableJoin.class, name = "tableTableJoinV1") }) diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/PlanBuilder.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/PlanBuilder.java index 40856a1331b6..81c1ca94cb0e 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/PlanBuilder.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/PlanBuilder.java @@ -33,7 +33,7 @@ public interface PlanBuilder { KTableHolder visitStreamAggregate(StreamAggregate streamAggregate); - KStreamHolder visitStreamMapValues(StreamMapValues streamMapValues); + KStreamHolder visitStreamSelect(StreamSelect streamSelect); KStreamHolder visitFlatMap(StreamFlatMap streamFlatMap); @@ -64,7 +64,7 @@ KTableHolder> visitStreamWindowedAggregate( KGroupedTableHolder visitTableGroupBy(TableGroupBy tableGroupBy); - KTableHolder visitTableMapValues(TableMapValues tableMapValues); + KTableHolder visitTableSelect(TableSelect tableSelect); KTableHolder visitTableSink(TableSink tableSink); diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamAggregate.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamAggregate.java index 826488384943..a4a662d62336 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamAggregate.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamAggregate.java @@ -30,24 +30,25 @@ public class StreamAggregate implements ExecutionStep> { private final ExecutionStepPropertiesV1 properties; private final ExecutionStep source; - private final Formats formats; + private final Formats internalFormats; private final int nonFuncColumnCount; - private final ImmutableList aggregations; + private final ImmutableList aggregationFunctions; public StreamAggregate( @JsonProperty(value = "properties", required = true) ExecutionStepPropertiesV1 properties, @JsonProperty(value = "source", required = true) ExecutionStep source, - @JsonProperty(value = "formats", required = true) Formats formats, + @JsonProperty(value = "internalFormats", required = true) Formats internalFormats, @JsonProperty(value = "nonFuncColumnCount", required = true) int nonFuncColumnCount, - @JsonProperty(value = "aggregations", required = true) - List aggregations) { + @JsonProperty(value = "aggregationFunctions", required = true) + List aggregationFunctions) { this.properties = requireNonNull(properties, "properties"); this.source = requireNonNull(source, "source"); - this.formats = requireNonNull(formats, "formats"); + this.internalFormats = requireNonNull(internalFormats, "internalFormats"); this.nonFuncColumnCount = nonFuncColumnCount; - this.aggregations = ImmutableList.copyOf(requireNonNull(aggregations, "aggregations")); + this.aggregationFunctions = ImmutableList.copyOf( + requireNonNull(aggregationFunctions, "aggregationFunctions")); } @Override @@ -65,12 +66,12 @@ public int getNonFuncColumnCount() { return nonFuncColumnCount; } - public List getAggregations() { - return aggregations; + public List getAggregationFunctions() { + return aggregationFunctions; } - public Formats getFormats() { - return formats; + public Formats getInternalFormats() { + return internalFormats; } public ExecutionStep getSource() { @@ -93,8 +94,8 @@ public boolean equals(Object o) { StreamAggregate that = (StreamAggregate) o; return Objects.equals(properties, that.properties) && Objects.equals(source, that.source) - && Objects.equals(formats, that.formats) - && Objects.equals(aggregations, that.aggregations) + && Objects.equals(internalFormats, that.internalFormats) + && Objects.equals(aggregationFunctions, that.aggregationFunctions) && nonFuncColumnCount == that.nonFuncColumnCount; } @@ -104,8 +105,8 @@ public int hashCode() { return Objects.hash( properties, source, - formats, - aggregations, + internalFormats, + aggregationFunctions, nonFuncColumnCount ); } diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamGroupBy.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamGroupBy.java index 7081750ac0d9..332eeb942e65 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamGroupBy.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamGroupBy.java @@ -30,17 +30,17 @@ public class StreamGroupBy implements ExecutionStep { private final ExecutionStepPropertiesV1 properties; private final ExecutionStep> source; - private final Formats formats; + private final Formats internalFormats; private final ImmutableList groupByExpressions; public StreamGroupBy( @JsonProperty(value = "properties", required = true) ExecutionStepPropertiesV1 properties, @JsonProperty(value = "source", required = true) ExecutionStep> source, - @JsonProperty(value = "formats", required = true) Formats formats, + @JsonProperty(value = "internalFormats", required = true) Formats internalFormats, @JsonProperty(value = "groupByExpressions", required = true) List groupByExpressions) { this.properties = requireNonNull(properties, "properties"); - this.formats = requireNonNull(formats, "formats"); + this.internalFormats = requireNonNull(internalFormats, "internalFormats"); this.source = requireNonNull(source, "source"); this.groupByExpressions = ImmutableList .copyOf(requireNonNull(groupByExpressions, "groupByExpressions")); @@ -61,8 +61,8 @@ public List> getSources() { return Collections.singletonList(source); } - public Formats getFormats() { - return formats; + public Formats getInternalFormats() { + return internalFormats; } public ExecutionStep> getSource() { @@ -85,12 +85,12 @@ public boolean equals(Object o) { StreamGroupBy that = (StreamGroupBy) o; return Objects.equals(properties, that.properties) && Objects.equals(source, that.source) - && Objects.equals(formats, that.formats) + && Objects.equals(internalFormats, that.internalFormats) && Objects.equals(groupByExpressions, that.groupByExpressions); } @Override public int hashCode() { - return Objects.hash(properties, source, formats, groupByExpressions); + return Objects.hash(properties, source, internalFormats, groupByExpressions); } } diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamGroupByKey.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamGroupByKey.java index 910512e44056..cef844bc45c4 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamGroupByKey.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamGroupByKey.java @@ -26,15 +26,15 @@ public class StreamGroupByKey implements ExecutionStep { private final ExecutionStepPropertiesV1 properties; private final ExecutionStep> source; - private final Formats formats; + private final Formats internalFormats; public StreamGroupByKey( @JsonProperty(value = "properties", required = true) ExecutionStepPropertiesV1 properties, @JsonProperty(value = "source", required = true) ExecutionStep> source, - @JsonProperty(value = "formats", required = true) Formats formats) { + @JsonProperty(value = "internalFormats", required = true) Formats internalFormats) { this.properties = Objects.requireNonNull(properties, "properties"); - this.formats = Objects.requireNonNull(formats, "formats"); + this.internalFormats = Objects.requireNonNull(internalFormats, "internalFormats"); this.source = Objects.requireNonNull(source, "source"); } @@ -53,8 +53,8 @@ public ExecutionStep> getSource() { return source; } - public Formats getFormats() { - return formats; + public Formats getInternalFormats() { + return internalFormats; } @Override @@ -73,12 +73,12 @@ public boolean equals(Object o) { StreamGroupByKey that = (StreamGroupByKey) o; return Objects.equals(properties, that.properties) && Objects.equals(source, that.source) - && Objects.equals(formats, that.formats); + && Objects.equals(internalFormats, that.internalFormats); } @Override public int hashCode() { - return Objects.hash(properties, source, formats); + return Objects.hash(properties, source, internalFormats); } } diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamMapValues.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSelect.java similarity index 92% rename from ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamMapValues.java rename to ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSelect.java index 7f15591740bb..2fad08a1b3c7 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamMapValues.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSelect.java @@ -25,13 +25,13 @@ import java.util.Objects; @Immutable -public class StreamMapValues implements ExecutionStep> { +public class StreamSelect implements ExecutionStep> { private final ExecutionStepPropertiesV1 properties; private final ExecutionStep> source; private final ImmutableList selectExpressions; - public StreamMapValues( + public StreamSelect( @JsonProperty(value = "properties", required = true) ExecutionStepPropertiesV1 properties, @JsonProperty(value = "source", required = true) ExecutionStep> source, @JsonProperty(value = "selectExpressions", required = true) @@ -63,7 +63,7 @@ public ExecutionStep> getSource() { @Override public KStreamHolder build(PlanBuilder builder) { - return builder.visitStreamMapValues(this); + return builder.visitStreamSelect(this); } @Override @@ -74,7 +74,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - StreamMapValues that = (StreamMapValues) o; + StreamSelect that = (StreamSelect) o; return Objects.equals(properties, that.properties) && Objects.equals(source, that.source) && Objects.equals(selectExpressions, that.selectExpressions); diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamStreamJoin.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamStreamJoin.java index 5dec37a59410..51a326162a47 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamStreamJoin.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamStreamJoin.java @@ -27,30 +27,32 @@ public class StreamStreamJoin implements ExecutionStep> { private final ExecutionStepPropertiesV1 properties; private final JoinType joinType; - private final Formats leftFormats; - private final Formats rightFormats; + private final Formats leftInternalFormats; + private final Formats rightInternalFormats; private final ExecutionStep> left; private final ExecutionStep> right; - private final Duration before; - private final Duration after; + private final Duration beforeMillis; + private final Duration afterMillis; public StreamStreamJoin( @JsonProperty(value = "properties", required = true) ExecutionStepPropertiesV1 properties, @JsonProperty(value = "joinType", required = true) JoinType joinType, - @JsonProperty(value = "leftFormats", required = true) Formats leftFormats, - @JsonProperty(value = "rightFormats", required = true) Formats rightFormats, + @JsonProperty(value = "leftInternalFormats", required = true) Formats leftInternalFormats, + @JsonProperty(value = "rightInternalFormats", required = true) Formats rightInternalFormats, @JsonProperty(value = "left", required = true) ExecutionStep> left, @JsonProperty(value = "right", required = true) ExecutionStep> right, - @JsonProperty(value = "before", required = true) Duration before, - @JsonProperty(value = "after", required = true) Duration after) { + @JsonProperty(value = "beforeMillis", required = true) Duration beforeMillis, + @JsonProperty(value = "afterMillis", required = true) Duration afterMillis) { this.properties = Objects.requireNonNull(properties, "properties"); - this.leftFormats = Objects.requireNonNull(leftFormats, "formats"); - this.rightFormats = Objects.requireNonNull(rightFormats, "rightFormats"); + this.leftInternalFormats = + Objects.requireNonNull(leftInternalFormats, "leftInternalFormats"); + this.rightInternalFormats = + Objects.requireNonNull(rightInternalFormats, "rightInternalFormats"); this.joinType = Objects.requireNonNull(joinType, "joinType"); this.left = Objects.requireNonNull(left, "left"); this.right = Objects.requireNonNull(right, "right"); - this.before = Objects.requireNonNull(before, "before"); - this.after = Objects.requireNonNull(after, "after"); + this.beforeMillis = Objects.requireNonNull(beforeMillis, "beforeMillis"); + this.afterMillis = Objects.requireNonNull(afterMillis, "afterMillis"); } @Override @@ -64,12 +66,12 @@ public List> getSources() { return ImmutableList.of(left, right); } - public Formats getLeftFormats() { - return leftFormats; + public Formats getLeftInternalFormats() { + return leftInternalFormats; } - public Formats getRightFormats() { - return rightFormats; + public Formats getRightInternalFormats() { + return rightInternalFormats; } public ExecutionStep> getLeft() { @@ -84,12 +86,12 @@ public JoinType getJoinType() { return joinType; } - public Duration getAfter() { - return after; + public Duration getAfterMillis() { + return afterMillis; } - public Duration getBefore() { - return before; + public Duration getBeforeMillis() { + return beforeMillis; } @Override @@ -109,12 +111,12 @@ public boolean equals(Object o) { StreamStreamJoin that = (StreamStreamJoin) o; return Objects.equals(properties, that.properties) && joinType == that.joinType - && Objects.equals(leftFormats, that.leftFormats) - && Objects.equals(rightFormats, that.rightFormats) + && Objects.equals(leftInternalFormats, that.leftInternalFormats) + && Objects.equals(rightInternalFormats, that.rightInternalFormats) && Objects.equals(left, that.left) && Objects.equals(right, that.right) - && Objects.equals(before, that.before) - && Objects.equals(after, that.after); + && Objects.equals(beforeMillis, that.beforeMillis) + && Objects.equals(afterMillis, that.afterMillis); } // CHECKSTYLE_RULES.ON: CyclomaticComplexity @@ -123,12 +125,12 @@ public int hashCode() { return Objects.hash( properties, joinType, - leftFormats, - rightFormats, + leftInternalFormats, + rightInternalFormats, left, right, - before, - after + beforeMillis, + afterMillis ); } } diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamTableJoin.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamTableJoin.java index c0a02344bb47..3ff1b86e5f37 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamTableJoin.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamTableJoin.java @@ -26,18 +26,18 @@ public class StreamTableJoin implements ExecutionStep> { private final ExecutionStepPropertiesV1 properties; private final JoinType joinType; - private final Formats formats; + private final Formats internalFormats; private final ExecutionStep> left; private final ExecutionStep> right; public StreamTableJoin( @JsonProperty(value = "properties", required = true) ExecutionStepPropertiesV1 properties, @JsonProperty(value = "joinType", required = true) JoinType joinType, - @JsonProperty(value = "formats", required = true) Formats formats, + @JsonProperty(value = "internalFormats", required = true) Formats internalFormats, @JsonProperty(value = "left", required = true) ExecutionStep> left, @JsonProperty(value = "right", required = true) ExecutionStep> right) { this.properties = Objects.requireNonNull(properties, "properties"); - this.formats = Objects.requireNonNull(formats, "formats"); + this.internalFormats = Objects.requireNonNull(internalFormats, "internalFormats"); this.joinType = Objects.requireNonNull(joinType, "joinType"); this.left = Objects.requireNonNull(left, "left"); this.right = Objects.requireNonNull(right, "right"); @@ -54,8 +54,8 @@ public List> getSources() { return ImmutableList.of(left, right); } - public Formats getFormats() { - return formats; + public Formats getInternalFormats() { + return internalFormats; } public ExecutionStep> getLeft() { @@ -86,7 +86,7 @@ public boolean equals(Object o) { StreamTableJoin that = (StreamTableJoin) o; return Objects.equals(properties, that.properties) && joinType == that.joinType - && Objects.equals(formats, that.formats) + && Objects.equals(internalFormats, that.internalFormats) && Objects.equals(left, that.left) && Objects.equals(right, that.right); } @@ -94,6 +94,6 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(properties, joinType, formats, left, right); + return Objects.hash(properties, joinType, internalFormats, left, right); } } diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamWindowedAggregate.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamWindowedAggregate.java index 643a0e238be9..01b294fdea95 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamWindowedAggregate.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamWindowedAggregate.java @@ -34,25 +34,27 @@ public class StreamWindowedAggregate implements ExecutionStep>> { private final ExecutionStepPropertiesV1 properties; private final ExecutionStep source; - private final Formats formats; + private final Formats internalFormats; private final int nonFuncColumnCount; - private final ImmutableList aggregations; + private final ImmutableList aggregationFunctions; private final KsqlWindowExpression windowExpression; public StreamWindowedAggregate( @JsonProperty(value = "properties", required = true) ExecutionStepPropertiesV1 properties, @JsonProperty(value = "source", required = true) ExecutionStep source, - @JsonProperty(value = "formats", required = true) Formats formats, + @JsonProperty(value = "internalFormats", required = true) Formats internalFormats, @JsonProperty(value = "nonFuncColumnCount", required = true) int nonFuncColumnCount, - @JsonProperty(value = "aggregations", required = true) List aggregations, + @JsonProperty(value = "aggregationFunctions", required = true) + List aggregationFunctions, @JsonProperty(value = "windowExpression", required = true) KsqlWindowExpression windowExpression) { this.properties = requireNonNull(properties, "properties"); this.source = requireNonNull(source, "source"); - this.formats = requireNonNull(formats, "formats"); + this.internalFormats = requireNonNull(internalFormats, "internalFormats"); this.nonFuncColumnCount = nonFuncColumnCount; - this.aggregations = ImmutableList.copyOf(requireNonNull(aggregations, "aggregations")); + this.aggregationFunctions = ImmutableList.copyOf( + requireNonNull(aggregationFunctions, "aggregationFunctions")); this.windowExpression = requireNonNull(windowExpression, "windowExpression"); } @@ -71,12 +73,12 @@ public int getNonFuncColumnCount() { return nonFuncColumnCount; } - public List getAggregations() { - return aggregations; + public List getAggregationFunctions() { + return aggregationFunctions; } - public Formats getFormats() { - return formats; + public Formats getInternalFormats() { + return internalFormats; } public KsqlWindowExpression getWindowExpression() { @@ -103,8 +105,8 @@ public boolean equals(Object o) { StreamWindowedAggregate that = (StreamWindowedAggregate) o; return Objects.equals(properties, that.properties) && Objects.equals(source, that.source) - && Objects.equals(formats, that.formats) - && Objects.equals(aggregations, that.aggregations) + && Objects.equals(internalFormats, that.internalFormats) + && Objects.equals(aggregationFunctions, that.aggregationFunctions) && nonFuncColumnCount == that.nonFuncColumnCount && Objects.equals(windowExpression, that.windowExpression); } @@ -115,8 +117,8 @@ public int hashCode() { return Objects.hash( properties, source, - formats, - aggregations, + internalFormats, + aggregationFunctions, nonFuncColumnCount, windowExpression ); diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableAggregate.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableAggregate.java index 5e9de2e213ea..8db5734c380d 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableAggregate.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableAggregate.java @@ -30,24 +30,24 @@ public class TableAggregate implements ExecutionStep> { private final ExecutionStepPropertiesV1 properties; private final ExecutionStep source; - private final Formats formats; + private final Formats internalFormats; private final int nonFuncColumnCount; - private final ImmutableList aggregations; + private final ImmutableList aggregationFunctions; public TableAggregate( @JsonProperty(value = "properties", required = true) ExecutionStepPropertiesV1 properties, @JsonProperty(value = "source", required = true) ExecutionStep source, - @JsonProperty(value = "formats", required = true) Formats formats, + @JsonProperty(value = "internalFormats", required = true) Formats internalFormats, @JsonProperty(value = "nonFuncColumnCount", required = true) int nonFuncColumnCount, - @JsonProperty(value = "aggregations", required = true) - List aggregations) { + @JsonProperty(value = "aggregationFunctions", required = true) + List aggregationFunctions) { this.properties = requireNonNull(properties, "properties"); this.source = requireNonNull(source, "source"); - this.formats = requireNonNull(formats, "formats"); + this.internalFormats = requireNonNull(internalFormats, "internalFormats"); this.nonFuncColumnCount = nonFuncColumnCount; - this.aggregations = ImmutableList - .copyOf(requireNonNull(aggregations, "aggValToFunctionMap")); + this.aggregationFunctions = ImmutableList + .copyOf(requireNonNull(aggregationFunctions, "aggValToFunctionMap")); } @Override @@ -61,12 +61,12 @@ public List> getSources() { return Collections.singletonList(source); } - public Formats getFormats() { - return formats; + public Formats getInternalFormats() { + return internalFormats; } - public List getAggregations() { - return aggregations; + public List getAggregationFunctions() { + return aggregationFunctions; } public int getNonFuncColumnCount() { @@ -93,14 +93,15 @@ public boolean equals(Object o) { TableAggregate that = (TableAggregate) o; return Objects.equals(properties, that.properties) && Objects.equals(source, that.source) - && Objects.equals(formats, that.formats) + && Objects.equals(internalFormats, that.internalFormats) && nonFuncColumnCount == that.nonFuncColumnCount - && Objects.equals(aggregations, that.aggregations); + && Objects.equals(aggregationFunctions, that.aggregationFunctions); } @Override public int hashCode() { - return Objects.hash(properties, source, formats, nonFuncColumnCount, aggregations); + return Objects.hash(properties, source, internalFormats, nonFuncColumnCount, + aggregationFunctions); } } diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableGroupBy.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableGroupBy.java index 042a4982de79..0d8c94f4e3f4 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableGroupBy.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableGroupBy.java @@ -29,19 +29,19 @@ public class TableGroupBy implements ExecutionStep { private final ExecutionStepPropertiesV1 properties; private final ExecutionStep> source; - private final Formats formats; + private final Formats internalFormats; private final ImmutableList groupByExpressions; public TableGroupBy( @JsonProperty(value = "properties", required = true) ExecutionStepPropertiesV1 properties, @JsonProperty(value = "source", required = true) ExecutionStep> source, - @JsonProperty(value = "formats", required = true) Formats formats, + @JsonProperty(value = "internalFormats", required = true) Formats internalFormats, @JsonProperty(value = "groupByExpressions", required = true) List groupByExpressions ) { this.properties = requireNonNull(properties, "properties"); this.source = requireNonNull(source, "source"); - this.formats = requireNonNull(formats, "formats"); + this.internalFormats = requireNonNull(internalFormats, "internalFormats"); this.groupByExpressions = ImmutableList .copyOf(requireNonNull(groupByExpressions, "groupByExpressions")); } @@ -57,8 +57,8 @@ public List> getSources() { return Collections.singletonList(source); } - public Formats getFormats() { - return formats; + public Formats getInternalFormats() { + return internalFormats; } public List getGroupByExpressions() { @@ -85,13 +85,13 @@ public boolean equals(Object o) { TableGroupBy that = (TableGroupBy) o; return Objects.equals(properties, that.properties) && Objects.equals(source, that.source) - && Objects.equals(formats, that.formats) + && Objects.equals(internalFormats, that.internalFormats) && Objects.equals(groupByExpressions, that.groupByExpressions); } @Override public int hashCode() { - return Objects.hash(properties, source, formats, groupByExpressions); + return Objects.hash(properties, source, internalFormats, groupByExpressions); } } diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableMapValues.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableSelect.java similarity index 92% rename from ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableMapValues.java rename to ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableSelect.java index 02d5c93ac801..e551d41ff5c2 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableMapValues.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableSelect.java @@ -25,13 +25,13 @@ import java.util.Objects; @Immutable -public class TableMapValues implements ExecutionStep> { +public class TableSelect implements ExecutionStep> { private final ExecutionStepPropertiesV1 properties; private final ExecutionStep> source; private final ImmutableList selectExpressions; - public TableMapValues( + public TableSelect( @JsonProperty(value = "properties", required = true) ExecutionStepPropertiesV1 properties, @JsonProperty(value = "source", required = true) ExecutionStep> source, @JsonProperty(value = "selectExpressions", required = true) @@ -63,7 +63,7 @@ public ExecutionStep> getSource() { @Override public KTableHolder build(PlanBuilder builder) { - return builder.visitTableMapValues(this); + return builder.visitTableSelect(this); } @Override @@ -74,7 +74,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - TableMapValues that = (TableMapValues) o; + TableSelect that = (TableSelect) o; return Objects.equals(properties, that.properties) && Objects.equals(source, that.source) && Objects.equals(selectExpressions, that.selectExpressions); diff --git a/ksql-rest-app/src/test/resources/ksql-plan-schema/schema.json b/ksql-rest-app/src/test/resources/ksql-plan-schema/schema.json index 36add88927be..4bc1f2815d81 100644 --- a/ksql-rest-app/src/test/resources/ksql-plan-schema/schema.json +++ b/ksql-rest-app/src/test/resources/ksql-plan-schema/schema.json @@ -58,7 +58,7 @@ "timestampColumn" : { "$ref" : "#/definitions/TimestampColumn" }, - "kafkaTopicName" : { + "topicName" : { "type" : "string" }, "formats" : { @@ -69,7 +69,7 @@ } }, "title" : "createStreamV1", - "required" : [ "@type", "sourceName", "schema", "kafkaTopicName", "formats" ] + "required" : [ "@type", "sourceName", "schema", "topicName", "formats" ] }, "TimestampColumn" : { "type" : "object", @@ -156,7 +156,7 @@ "timestampColumn" : { "$ref" : "#/definitions/TimestampColumn" }, - "kafkaTopicName" : { + "topicName" : { "type" : "string" }, "formats" : { @@ -167,7 +167,7 @@ } }, "title" : "createTableV1", - "required" : [ "@type", "sourceName", "schema", "kafkaTopicName", "formats" ] + "required" : [ "@type", "sourceName", "schema", "topicName", "formats" ] }, "RegisterTypeCommand" : { "type" : "object", @@ -181,12 +181,12 @@ "type" : { "type" : "string" }, - "name" : { + "typeName" : { "type" : "string" } }, "title" : "registerTypeV1", - "required" : [ "@type", "type", "name" ] + "required" : [ "@type", "type", "typeName" ] }, "DropSourceCommand" : { "type" : "object", @@ -257,13 +257,13 @@ "source" : { "$ref" : "#/definitions/ExecutionStep" }, - "formats" : { + "internalFormats" : { "$ref" : "#/definitions/Formats" }, "nonFuncColumnCount" : { "type" : "integer" }, - "aggregations" : { + "aggregationFunctions" : { "type" : "array", "items" : { "type" : "string" @@ -271,7 +271,7 @@ } }, "title" : "streamAggregateV1", - "required" : [ "@type", "properties", "source", "formats", "nonFuncColumnCount", "aggregations" ] + "required" : [ "@type", "properties", "source", "internalFormats", "nonFuncColumnCount", "aggregationFunctions" ] }, "ExecutionStepPropertiesV1" : { "type" : "object", @@ -345,7 +345,7 @@ "source" : { "$ref" : "#/definitions/ExecutionStep" }, - "formats" : { + "internalFormats" : { "$ref" : "#/definitions/Formats" }, "groupByExpressions" : { @@ -356,7 +356,7 @@ } }, "title" : "streamGroupByV1", - "required" : [ "@type", "properties", "source", "formats", "groupByExpressions" ] + "required" : [ "@type", "properties", "source", "internalFormats", "groupByExpressions" ] }, "StreamGroupByKey" : { "type" : "object", @@ -373,21 +373,21 @@ "source" : { "$ref" : "#/definitions/ExecutionStep" }, - "formats" : { + "internalFormats" : { "$ref" : "#/definitions/Formats" } }, "title" : "streamGroupByKeyV1", - "required" : [ "@type", "properties", "source", "formats" ] + "required" : [ "@type", "properties", "source", "internalFormats" ] }, - "StreamMapValues" : { + "StreamSelect" : { "type" : "object", "additionalProperties" : false, "properties" : { "@type" : { "type" : "string", - "enum" : [ "streamMapValuesV1" ], - "default" : "streamMapValuesV1" + "enum" : [ "streamSelectV1" ], + "default" : "streamSelectV1" }, "properties" : { "$ref" : "#/definitions/ExecutionStepPropertiesV1" @@ -402,7 +402,7 @@ } } }, - "title" : "streamMapValuesV1", + "title" : "streamSelectV1", "required" : [ "@type", "properties", "source", "selectExpressions" ] }, "StreamSelectKey" : { @@ -541,10 +541,10 @@ "type" : "string", "enum" : [ "INNER", "LEFT", "OUTER" ] }, - "leftFormats" : { + "leftInternalFormats" : { "$ref" : "#/definitions/Formats" }, - "rightFormats" : { + "rightInternalFormats" : { "$ref" : "#/definitions/Formats" }, "left" : { @@ -553,15 +553,15 @@ "right" : { "$ref" : "#/definitions/ExecutionStep" }, - "before" : { + "beforeMillis" : { "type" : "integer" }, - "after" : { + "afterMillis" : { "type" : "integer" } }, "title" : "streamStreamJoinV1", - "required" : [ "@type", "properties", "joinType", "leftFormats", "rightFormats", "left", "right", "before", "after" ] + "required" : [ "@type", "properties", "joinType", "leftInternalFormats", "rightInternalFormats", "left", "right", "beforeMillis", "afterMillis" ] }, "StreamTableJoin" : { "type" : "object", @@ -579,7 +579,7 @@ "type" : "string", "enum" : [ "INNER", "LEFT", "OUTER" ] }, - "formats" : { + "internalFormats" : { "$ref" : "#/definitions/Formats" }, "left" : { @@ -590,7 +590,7 @@ } }, "title" : "streamTableJoinV1", - "required" : [ "@type", "properties", "joinType", "formats", "left", "right" ] + "required" : [ "@type", "properties", "joinType", "internalFormats", "left", "right" ] }, "StreamWindowedAggregate" : { "type" : "object", @@ -607,13 +607,13 @@ "source" : { "$ref" : "#/definitions/ExecutionStep" }, - "formats" : { + "internalFormats" : { "$ref" : "#/definitions/Formats" }, "nonFuncColumnCount" : { "type" : "integer" }, - "aggregations" : { + "aggregationFunctions" : { "type" : "array", "items" : { "type" : "string" @@ -624,7 +624,7 @@ } }, "title" : "streamWindowedAggregateV1", - "required" : [ "@type", "properties", "source", "formats", "nonFuncColumnCount", "aggregations", "windowExpression" ] + "required" : [ "@type", "properties", "source", "internalFormats", "nonFuncColumnCount", "aggregationFunctions", "windowExpression" ] }, "TableSource" : { "type" : "object", @@ -714,13 +714,13 @@ "source" : { "$ref" : "#/definitions/ExecutionStep" }, - "formats" : { + "internalFormats" : { "$ref" : "#/definitions/Formats" }, "nonFuncColumnCount" : { "type" : "integer" }, - "aggregations" : { + "aggregationFunctions" : { "type" : "array", "items" : { "type" : "string" @@ -728,7 +728,7 @@ } }, "title" : "tableAggregateV1", - "required" : [ "@type", "properties", "source", "formats", "nonFuncColumnCount", "aggregations" ] + "required" : [ "@type", "properties", "source", "internalFormats", "nonFuncColumnCount", "aggregationFunctions" ] }, "TableFilter" : { "type" : "object", @@ -767,7 +767,7 @@ "source" : { "$ref" : "#/definitions/ExecutionStep" }, - "formats" : { + "internalFormats" : { "$ref" : "#/definitions/Formats" }, "groupByExpressions" : { @@ -778,16 +778,16 @@ } }, "title" : "tableGroupByV1", - "required" : [ "@type", "properties", "source", "formats", "groupByExpressions" ] + "required" : [ "@type", "properties", "source", "internalFormats", "groupByExpressions" ] }, - "TableMapValues" : { + "TableSelect" : { "type" : "object", "additionalProperties" : false, "properties" : { "@type" : { "type" : "string", - "enum" : [ "tableMapValuesV1" ], - "default" : "tableMapValuesV1" + "enum" : [ "tableSelectV1" ], + "default" : "tableSelectV1" }, "properties" : { "$ref" : "#/definitions/ExecutionStepPropertiesV1" @@ -802,7 +802,7 @@ } } }, - "title" : "tableMapValuesV1", + "title" : "tableSelectV1", "required" : [ "@type", "properties", "source", "selectExpressions" ] }, "TableSink" : { @@ -868,7 +868,7 @@ }, { "$ref" : "#/definitions/StreamGroupByKey" }, { - "$ref" : "#/definitions/StreamMapValues" + "$ref" : "#/definitions/StreamSelect" }, { "$ref" : "#/definitions/StreamSelectKey" }, { @@ -894,7 +894,7 @@ }, { "$ref" : "#/definitions/TableGroupBy" }, { - "$ref" : "#/definitions/TableMapValues" + "$ref" : "#/definitions/TableSelect" }, { "$ref" : "#/definitions/TableSink" }, { diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java index c7ea805c519f..b6db598573f5 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java @@ -32,7 +32,7 @@ import io.confluent.ksql.execution.plan.StreamFlatMap; import io.confluent.ksql.execution.plan.StreamGroupBy; import io.confluent.ksql.execution.plan.StreamGroupByKey; -import io.confluent.ksql.execution.plan.StreamMapValues; +import io.confluent.ksql.execution.plan.StreamSelect; import io.confluent.ksql.execution.plan.StreamSelectKey; import io.confluent.ksql.execution.plan.StreamSink; import io.confluent.ksql.execution.plan.StreamSource; @@ -42,7 +42,7 @@ import io.confluent.ksql.execution.plan.TableAggregate; import io.confluent.ksql.execution.plan.TableFilter; import io.confluent.ksql.execution.plan.TableGroupBy; -import io.confluent.ksql.execution.plan.TableMapValues; +import io.confluent.ksql.execution.plan.TableSelect; import io.confluent.ksql.execution.plan.TableSink; import io.confluent.ksql.execution.plan.TableSource; import io.confluent.ksql.execution.plan.TableTableJoin; @@ -194,7 +194,7 @@ public static StreamFilter streamFilter( ); } - public static StreamMapValues streamMapValues( + public static StreamSelect streamSelect( final QueryContext.Stacker stacker, final ExecutionStep> source, final List selectExpressions @@ -202,7 +202,7 @@ public static StreamMapValues streamMapValues( final ExecutionStepPropertiesV1 properties = new ExecutionStepPropertiesV1( stacker.getQueryContext() ); - return new StreamMapValues<>( + return new StreamSelect<>( properties, source, selectExpressions @@ -285,7 +285,7 @@ public static TableFilter tableFilter( ); } - public static TableMapValues tableMapValues( + public static TableSelect tableMapValues( final QueryContext.Stacker stacker, final ExecutionStep> source, final List selectExpressions @@ -293,7 +293,7 @@ public static TableMapValues tableMapValues( final ExecutionStepPropertiesV1 properties = new ExecutionStepPropertiesV1( stacker.getQueryContext() ); - return new TableMapValues<>( + return new TableSelect<>( properties, source, selectExpressions diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java index 9d283462f81b..f73d151127c1 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java @@ -26,7 +26,7 @@ import io.confluent.ksql.execution.plan.StreamFlatMap; import io.confluent.ksql.execution.plan.StreamGroupBy; import io.confluent.ksql.execution.plan.StreamGroupByKey; -import io.confluent.ksql.execution.plan.StreamMapValues; +import io.confluent.ksql.execution.plan.StreamSelect; import io.confluent.ksql.execution.plan.StreamSelectKey; import io.confluent.ksql.execution.plan.StreamSink; import io.confluent.ksql.execution.plan.StreamSource; @@ -36,7 +36,7 @@ import io.confluent.ksql.execution.plan.TableAggregate; import io.confluent.ksql.execution.plan.TableFilter; import io.confluent.ksql.execution.plan.TableGroupBy; -import io.confluent.ksql.execution.plan.TableMapValues; +import io.confluent.ksql.execution.plan.TableSelect; import io.confluent.ksql.execution.plan.TableSink; import io.confluent.ksql.execution.plan.TableSource; import io.confluent.ksql.execution.plan.TableTableJoin; @@ -121,10 +121,10 @@ public KTableHolder visitStreamAggregate( } @Override - public KStreamHolder visitStreamMapValues( - final StreamMapValues streamMapValues) { - final KStreamHolder source = streamMapValues.getSource().build(this); - return StreamMapValuesBuilder.build(source, streamMapValues, queryBuilder); + public KStreamHolder visitStreamSelect( + final StreamSelect streamSelect) { + final KStreamHolder source = streamSelect.getSource().build(this); + return StreamSelectBuilder.build(source, streamSelect, queryBuilder); } @Override @@ -258,10 +258,10 @@ public KGroupedTableHolder visitTableGroupBy( } @Override - public KTableHolder visitTableMapValues( - final TableMapValues tableMapValues) { - final KTableHolder source = tableMapValues.getSource().build(this); - return TableMapValuesBuilder.build(source, tableMapValues, queryBuilder); + public KTableHolder visitTableSelect( + final TableSelect tableSelect) { + final KTableHolder source = tableSelect.getSource().build(this); + return TableSelectBuilder.build(source, tableSelect, queryBuilder); } @Override diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java index fb161c261dc8..baadf7c76c0e 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java @@ -22,7 +22,7 @@ import io.confluent.ksql.execution.plan.StreamFlatMap; import io.confluent.ksql.execution.plan.StreamGroupBy; import io.confluent.ksql.execution.plan.StreamGroupByKey; -import io.confluent.ksql.execution.plan.StreamMapValues; +import io.confluent.ksql.execution.plan.StreamSelect; import io.confluent.ksql.execution.plan.StreamSelectKey; import io.confluent.ksql.execution.plan.StreamSink; import io.confluent.ksql.execution.plan.StreamSource; @@ -32,7 +32,7 @@ import io.confluent.ksql.execution.plan.TableAggregate; import io.confluent.ksql.execution.plan.TableFilter; import io.confluent.ksql.execution.plan.TableGroupBy; -import io.confluent.ksql.execution.plan.TableMapValues; +import io.confluent.ksql.execution.plan.TableSelect; import io.confluent.ksql.execution.plan.TableSink; import io.confluent.ksql.execution.plan.TableSource; import io.confluent.ksql.execution.plan.TableTableJoin; @@ -62,7 +62,7 @@ public final class StepSchemaResolver { .put(StreamFlatMap.class, StepSchemaResolver::handleStreamFlatMap) .put(StreamGroupBy.class, StepSchemaResolver::sameSchema) .put(StreamGroupByKey.class, StepSchemaResolver::sameSchema) - .put(StreamMapValues.class, StepSchemaResolver::handleStreamMapValues) + .put(StreamSelect.class, StepSchemaResolver::handleStreamSelect) .put(StreamSelectKey.class, StepSchemaResolver::sameSchema) .put(StreamSink.class, StepSchemaResolver::handleSink) .put(StreamSource.class, StepSchemaResolver::handleSource) @@ -70,7 +70,7 @@ public final class StepSchemaResolver { .put(TableAggregate.class, StepSchemaResolver::handleTableAggregate) .put(TableFilter.class, StepSchemaResolver::sameSchema) .put(TableGroupBy.class, StepSchemaResolver::sameSchema) - .put(TableMapValues.class, StepSchemaResolver::handleTableMapValues) + .put(TableSelect.class, StepSchemaResolver::handleTableSelect) .put(TableSink.class, StepSchemaResolver::handleSink) .put(TableSource.class, StepSchemaResolver::handleSource) .put(WindowedTableSource.class, StepSchemaResolver::handleSource) @@ -131,7 +131,7 @@ private LogicalSchema handleStreamAggregate( schema, step.getNonFuncColumnCount(), functionRegistry, - step.getAggregations() + step.getAggregationFunctions() ).getSchema(); } @@ -143,7 +143,7 @@ private LogicalSchema handleStreamWindowedAggregate( schema, step.getNonFuncColumnCount(), functionRegistry, - step.getAggregations() + step.getAggregationFunctions() ).getSchema(); } @@ -158,13 +158,13 @@ private LogicalSchema handleStreamFlatMap( ); } - private LogicalSchema handleStreamMapValues( + private LogicalSchema handleStreamSelect( final LogicalSchema schema, - final StreamMapValues streamMapValues + final StreamSelect streamSelect ) { return Selection.of( schema, - streamMapValues.getSelectExpressions(), + streamSelect.getSelectExpressions(), ksqlConfig, functionRegistry ).getSchema(); @@ -188,13 +188,13 @@ private LogicalSchema handleTableAggregate( schema, step.getNonFuncColumnCount(), functionRegistry, - step.getAggregations() + step.getAggregationFunctions() ).getSchema(); } - private LogicalSchema handleTableMapValues( + private LogicalSchema handleTableSelect( final LogicalSchema schema, - final TableMapValues step + final TableSelect step ) { return Selection.of( schema, diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java index 629077f55897..a0c996ca9bfa 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java @@ -79,7 +79,7 @@ static KTableHolder build( sourceSchema, nonFuncColumns, queryBuilder.getFunctionRegistry(), - aggregate.getAggregations() + aggregate.getAggregationFunctions() ); final LogicalSchema aggregateSchema = aggregateParams.getAggregateSchema(); final LogicalSchema resultSchema = aggregateParams.getSchema(); @@ -87,7 +87,7 @@ static KTableHolder build( AggregateBuilderUtils.buildMaterialized( aggregate, aggregateSchema, - aggregate.getFormats(), + aggregate.getInternalFormats(), queryBuilder, materializedFactory ); @@ -151,7 +151,7 @@ static KTableHolder> build( sourceSchema, nonFuncColumns, queryBuilder.getFunctionRegistry(), - aggregate.getAggregations() + aggregate.getAggregationFunctions() ); final LogicalSchema aggregateSchema = aggregateParams.getAggregateSchema(); final LogicalSchema resultSchema = aggregateParams.getSchema(); @@ -223,7 +223,7 @@ private static class WindowedAggregator this.materializedFactory = Objects.requireNonNull(materializedFactory, "materializedFactory"); this.aggregateParams = Objects.requireNonNull(aggregateParams, "aggregateParams"); this.queryContext = AggregateBuilderUtils.materializeContext(aggregate); - this.formats = aggregate.getFormats(); + this.formats = aggregate.getInternalFormats(); final PhysicalSchema physicalSchema = PhysicalSchema.from( aggregateSchema, formats.getOptions() diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamGroupByBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamGroupByBuilder.java index c1732f433eb8..60f284e431e1 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamGroupByBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamGroupByBuilder.java @@ -44,7 +44,7 @@ public static KGroupedStreamHolder build( ) { final LogicalSchema sourceSchema = stream.getSchema(); final QueryContext queryContext = step.getProperties().getQueryContext(); - final Formats formats = step.getFormats(); + final Formats formats = step.getInternalFormats(); final Grouped grouped = buildGrouped( formats, sourceSchema, @@ -63,7 +63,7 @@ public static KGroupedStreamHolder build( ) { final LogicalSchema sourceSchema = stream.getSchema(); final QueryContext queryContext = step.getProperties().getQueryContext(); - final Formats formats = step.getFormats(); + final Formats formats = step.getInternalFormats(); final Grouped grouped = buildGrouped( formats, sourceSchema, diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamMapValuesBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSelectBuilder.java similarity index 92% rename from ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamMapValuesBuilder.java rename to ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSelectBuilder.java index c73882166105..46bc03f8475d 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamMapValuesBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSelectBuilder.java @@ -19,7 +19,7 @@ import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.context.QueryLoggerUtil; import io.confluent.ksql.execution.plan.KStreamHolder; -import io.confluent.ksql.execution.plan.StreamMapValues; +import io.confluent.ksql.execution.plan.StreamSelect; import io.confluent.ksql.execution.streams.transform.KsTransformer; import io.confluent.ksql.execution.transform.select.SelectValueMapper; import io.confluent.ksql.execution.transform.select.Selection; @@ -27,13 +27,13 @@ import io.confluent.ksql.schema.ksql.LogicalSchema; import org.apache.kafka.streams.kstream.Named; -public final class StreamMapValuesBuilder { - private StreamMapValuesBuilder() { +public final class StreamSelectBuilder { + private StreamSelectBuilder() { } public static KStreamHolder build( final KStreamHolder stream, - final StreamMapValues step, + final StreamSelect step, final KsqlQueryBuilder queryBuilder ) { final QueryContext queryContext = step.getProperties().getQueryContext(); diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilder.java index 71b5be4aa7bc..37301d6b7a34 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilder.java @@ -41,7 +41,7 @@ public static KStreamHolder build( final StreamStreamJoin join, final KsqlQueryBuilder queryBuilder, final StreamJoinedFactory streamJoinedFactory) { - final Formats leftFormats = join.getLeftFormats(); + final Formats leftFormats = join.getLeftInternalFormats(); final QueryContext queryContext = join.getProperties().getQueryContext(); final QueryContext.Stacker stacker = QueryContext.Stacker.of(queryContext); final LogicalSchema leftSchema = left.getSchema(); @@ -54,7 +54,7 @@ public static KStreamHolder build( leftPhysicalSchema, stacker.push(LEFT_SERDE_CTX).getQueryContext() ); - final Formats rightFormats = join.getRightFormats(); + final Formats rightFormats = join.getRightInternalFormats(); final LogicalSchema rightSchema = right.getSchema(); final PhysicalSchema rightPhysicalSchema = PhysicalSchema.from( rightSchema.withoutAlias(), @@ -78,7 +78,8 @@ public static KStreamHolder build( StreamsUtil.buildOpName(queryContext) ); final JoinParams joinParams = JoinParamsFactory.create(leftSchema, rightSchema); - final JoinWindows joinWindows = JoinWindows.of(join.getBefore()).after(join.getAfter()); + final JoinWindows joinWindows = + JoinWindows.of(join.getBeforeMillis()).after(join.getAfterMillis()); final KStream result; switch (join.getJoinType()) { case LEFT: diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilder.java index 131a3506afae..91faa7e6d40d 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilder.java @@ -40,7 +40,7 @@ public static KStreamHolder build( final StreamTableJoin join, final KsqlQueryBuilder queryBuilder, final JoinedFactory joinedFactory) { - final Formats leftFormats = join.getFormats(); + final Formats leftFormats = join.getInternalFormats(); final QueryContext queryContext = join.getProperties().getQueryContext(); final QueryContext.Stacker stacker = QueryContext.Stacker.of(queryContext); final LogicalSchema leftSchema = left.getSchema(); diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableAggregateBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableAggregateBuilder.java index 357ed6e50936..dfd02235d948 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableAggregateBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableAggregateBuilder.java @@ -62,7 +62,7 @@ public static KTableHolder build( sourceSchema, nonFuncColumns, queryBuilder.getFunctionRegistry(), - aggregate.getAggregations() + aggregate.getAggregationFunctions() ); final LogicalSchema aggregateSchema = aggregateParams.getAggregateSchema(); final LogicalSchema resultSchema = aggregateParams.getSchema(); @@ -70,7 +70,7 @@ public static KTableHolder build( AggregateBuilderUtils.buildMaterialized( aggregate, aggregateSchema, - aggregate.getFormats(), + aggregate.getInternalFormats(), queryBuilder, materializedFactory ); diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableGroupByBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableGroupByBuilder.java index 3b32fb12bfcd..f81d6fd5f5fc 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableGroupByBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableGroupByBuilder.java @@ -46,7 +46,7 @@ public static KGroupedTableHolder build( ) { final LogicalSchema sourceSchema = table.getSchema(); final QueryContext queryContext = step.getProperties().getQueryContext(); - final Formats formats = step.getFormats(); + final Formats formats = step.getInternalFormats(); final PhysicalSchema physicalSchema = PhysicalSchema.from( sourceSchema, formats.getOptions() diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableMapValuesBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableSelectBuilder.java similarity index 94% rename from ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableMapValuesBuilder.java rename to ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableSelectBuilder.java index 839e88b90178..c29fd47a6e83 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableMapValuesBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableSelectBuilder.java @@ -20,7 +20,7 @@ import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.context.QueryLoggerUtil; import io.confluent.ksql.execution.plan.KTableHolder; -import io.confluent.ksql.execution.plan.TableMapValues; +import io.confluent.ksql.execution.plan.TableSelect; import io.confluent.ksql.execution.streams.transform.KsTransformer; import io.confluent.ksql.execution.transform.KsqlTransformer; import io.confluent.ksql.execution.transform.select.SelectValueMapper; @@ -29,15 +29,15 @@ import io.confluent.ksql.schema.ksql.LogicalSchema; import org.apache.kafka.streams.kstream.Named; -public final class TableMapValuesBuilder { +public final class TableSelectBuilder { - private TableMapValuesBuilder() { + private TableSelectBuilder() { } @SuppressWarnings("unchecked") public static KTableHolder build( final KTableHolder table, - final TableMapValues step, + final TableSelect step, final KsqlQueryBuilder queryBuilder ) { final LogicalSchema sourceSchema = table.getSchema(); diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StepSchemaResolverTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StepSchemaResolverTest.java index f71d60f25031..9add12a0d38e 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StepSchemaResolverTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StepSchemaResolverTest.java @@ -41,14 +41,14 @@ import io.confluent.ksql.execution.plan.StreamFlatMap; import io.confluent.ksql.execution.plan.StreamGroupBy; import io.confluent.ksql.execution.plan.StreamGroupByKey; -import io.confluent.ksql.execution.plan.StreamMapValues; +import io.confluent.ksql.execution.plan.StreamSelect; import io.confluent.ksql.execution.plan.StreamSelectKey; import io.confluent.ksql.execution.plan.StreamSource; import io.confluent.ksql.execution.plan.StreamWindowedAggregate; import io.confluent.ksql.execution.plan.TableAggregate; import io.confluent.ksql.execution.plan.TableFilter; import io.confluent.ksql.execution.plan.TableGroupBy; -import io.confluent.ksql.execution.plan.TableMapValues; +import io.confluent.ksql.execution.plan.TableSelect; import io.confluent.ksql.execution.plan.TableSource; import io.confluent.ksql.execution.plan.WindowedStreamSource; import io.confluent.ksql.execution.plan.WindowedTableSource; @@ -163,7 +163,7 @@ public void shouldResolveSchemaForStreamWindowedAggregate() { @Test public void shouldResolveSchemaForStreamSelect() { // Given: - final StreamMapValues step = new StreamMapValues<>( + final StreamSelect step = new StreamSelect<>( PROPERTIES, streamSource, ImmutableList.of( @@ -357,7 +357,7 @@ public void shouldResolveSchemaForTableGroupBy() { @Test public void shouldResolveSchemaForTableSelect() { // Given: - final TableMapValues step = new TableMapValues<>( + final TableSelect step = new TableSelect<>( PROPERTIES, tableSource, ImmutableList.of( diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamMapValuesBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectBuilderTest.java similarity index 97% rename from ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamMapValuesBuilderTest.java rename to ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectBuilderTest.java index e9b85f5b5ad3..a1fa1165f97d 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamMapValuesBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectBuilderTest.java @@ -35,7 +35,7 @@ import io.confluent.ksql.execution.plan.KeySerdeFactory; import io.confluent.ksql.execution.plan.PlanBuilder; import io.confluent.ksql.execution.plan.SelectExpression; -import io.confluent.ksql.execution.plan.StreamMapValues; +import io.confluent.ksql.execution.plan.StreamSelect; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.logging.processing.ProcessingLogger; @@ -62,7 +62,7 @@ import org.mockito.junit.MockitoRule; @SuppressWarnings("unchecked") -public class StreamMapValuesBuilderTest { +public class StreamSelectBuilderTest { private static final LogicalSchema SCHEMA = new LogicalSchema.Builder() .valueColumn(ColumnName.of("foo"), SqlTypes.STRING) @@ -113,7 +113,7 @@ public class StreamMapValuesBuilderTest { new QueryContext.Stacker().push("foo").push("bar").getQueryContext(); private PlanBuilder planBuilder; - private StreamMapValues step; + private StreamSelect step; @Before public void setup() { @@ -131,7 +131,7 @@ public void setup() { final KStreamHolder sourceStream = new KStreamHolder<>(sourceKStream, SCHEMA, keySerdeFactory); when(sourceStep.build(any())).thenReturn(sourceStream); - step = new StreamMapValues<>( + step = new StreamSelect<>( properties, sourceStep, SELECT_EXPRESSIONS