Skip to content

Commit

Permalink
chore: clean up plan names (#4065)
Browse files Browse the repository at this point in the history
aggregations -> aggregationFunctions
formats -> internalFormats
before/after -> beforeMillis/afterMillis
kafkaTopicName -> topicName
name -> typeName (for register/drop types)
(Stream|Table)MapValues -> (Stream|Table)Select
  • Loading branch information
rodesai authored Dec 6, 2019
1 parent 0f31f8e commit e81997c
Show file tree
Hide file tree
Showing 39 changed files with 245 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public DdlCommandResult executeDropSource(final DropSourceCommand dropSource) {

@Override
public DdlCommandResult executeRegisterType(final RegisterTypeCommand registerType) {
final String name = registerType.getName();
final String name = registerType.getTypeName();
final SqlType type = registerType.getType();
metaStore.registerType(name, type);
return new DdlCommandResult(
Expand All @@ -141,7 +141,7 @@ private static KeyField getKeyField(final Optional<ColumnName> keyFieldName) {

private static KsqlTopic getKsqlTopic(final CreateSourceCommand createSource) {
return new KsqlTopic(
createSource.getKafkaTopicName(),
createSource.getTopicName(),
KeyFormat.of(createSource.getFormats().getKeyFormat(), createSource.getWindowInfo()),
ValueFormat.of(createSource.getFormats().getValueFormat())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import io.confluent.ksql.execution.plan.StreamFlatMap;
import io.confluent.ksql.execution.plan.StreamGroupBy;
import io.confluent.ksql.execution.plan.StreamGroupByKey;
import io.confluent.ksql.execution.plan.StreamMapValues;
import io.confluent.ksql.execution.plan.StreamSelect;
import io.confluent.ksql.execution.plan.StreamSelectKey;
import io.confluent.ksql.execution.plan.StreamSink;
import io.confluent.ksql.execution.plan.StreamStreamJoin;
Expand Down Expand Up @@ -147,7 +147,7 @@ public SchemaKStream<K> select(
final KsqlQueryBuilder ksqlQueryBuilder
) {
final KeyField keyField = findKeyField(selectExpressions);
final StreamMapValues<K> step = ExecutionStepFactory.streamMapValues(
final StreamSelect<K> step = ExecutionStepFactory.streamSelect(
contextStacker,
sourceStep,
selectExpressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.execution.plan.TableFilter;
import io.confluent.ksql.execution.plan.TableGroupBy;
import io.confluent.ksql.execution.plan.TableMapValues;
import io.confluent.ksql.execution.plan.TableSelect;
import io.confluent.ksql.execution.plan.TableSink;
import io.confluent.ksql.execution.plan.TableTableJoin;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
Expand Down Expand Up @@ -118,7 +118,7 @@ public SchemaKTable<K> select(
final KsqlQueryBuilder ksqlQueryBuilder
) {
final KeyField keyField = findKeyField(selectExpressions);
final TableMapValues<K> step = ExecutionStepFactory.tableMapValues(
final TableSelect<K> step = ExecutionStepFactory.tableMapValues(
contextStacker,
sourceTableStep,
selectExpressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public static void throwOnInvalidSchemaEvolution(
ksqlConfig
);

final String topicName = ddl.getKafkaTopicName();
final String topicName = ddl.getTopicName();

if (!isValidAvroSchemaForTopic(topicName, avroSchema, schemaRegistryClient)) {
throw new KsqlStatementException(String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.confluent.ksql.execution.plan.StreamFilter;
import io.confluent.ksql.execution.plan.StreamFlatMap;
import io.confluent.ksql.execution.plan.StreamGroupBy;
import io.confluent.ksql.execution.plan.StreamMapValues;
import io.confluent.ksql.execution.plan.StreamSelect;
import io.confluent.ksql.execution.plan.StreamSelectKey;
import io.confluent.ksql.execution.plan.StreamSink;
import io.confluent.ksql.execution.plan.StreamSource;
Expand All @@ -34,7 +34,7 @@
import io.confluent.ksql.execution.plan.TableAggregate;
import io.confluent.ksql.execution.plan.TableFilter;
import io.confluent.ksql.execution.plan.TableGroupBy;
import io.confluent.ksql.execution.plan.TableMapValues;
import io.confluent.ksql.execution.plan.TableSelect;
import io.confluent.ksql.execution.plan.TableSink;
import io.confluent.ksql.execution.plan.TableSource;
import io.confluent.ksql.execution.plan.TableTableJoin;
Expand Down Expand Up @@ -66,7 +66,7 @@ public class PlanSummary {
.put(StreamFilter.class, "FILTER")
.put(StreamFlatMap.class, "FLAT_MAP")
.put(StreamGroupBy.class, "GROUP_BY")
.put(StreamMapValues.class, "PROJECT")
.put(StreamSelect.class, "PROJECT")
.put(StreamSelectKey.class, "REKEY")
.put(StreamSink.class, "SINK")
.put(StreamSource.class, "SOURCE")
Expand All @@ -76,7 +76,7 @@ public class PlanSummary {
.put(TableAggregate.class, "AGGREGATE")
.put(TableFilter.class, "FILTER")
.put(TableGroupBy.class, "GROUP_BY")
.put(TableMapValues.class, "PROJECT")
.put(TableSelect.class, "PROJECT")
.put(TableSink.class, "SINK")
.put(TableTableJoin.class, "JOIN")
.put(TableSource.class, "SOURCE")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ public void shouldCreateCommandForRegisterType() {

// Then:
assertThat(result.getType(), equalTo(ddlStatement.getType().getSqlType()));
assertThat(result.getName(), equalTo("alias"));
assertThat(result.getTypeName(), equalTo("alias"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void shouldBuildStepForSelect() {
assertThat(
projectedSchemaKStream.getSourceStep(),
equalTo(
ExecutionStepFactory.streamMapValues(
ExecutionStepFactory.streamSelect(
childContextStacker,
initialSchemaKStream.getSourceStep(),
selectExpressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public class AvroUtilTest {
public void setUp() {
when(ddlCommand.getFormats()).thenReturn(FORMATS);
when(ddlCommand.getSchema()).thenReturn(MUTLI_FIELD_SCHEMA);
when(ddlCommand.getKafkaTopicName()).thenReturn(RESULT_TOPIC_NAME);
when(ddlCommand.getTopicName()).thenReturn(RESULT_TOPIC_NAME);
}

@Test
Expand Down Expand Up @@ -258,7 +258,7 @@ public void shouldThrowOnSrAuthorizationErrors() throws Exception {
expectedException.expectMessage("Could not connect to Schema Registry service");
expectedException.expectMessage(containsString(String.format(
"Not authorized to access Schema Registry subject: [%s]",
ddlCommand.getKafkaTopicName()
ddlCommand.getTopicName()
+ KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX
)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.ExecutionStepPropertiesV1;
import io.confluent.ksql.execution.plan.StreamMapValues;
import io.confluent.ksql.execution.plan.StreamSelect;
import io.confluent.ksql.execution.plan.StreamSource;
import io.confluent.ksql.execution.plan.StreamStreamJoin;
import io.confluent.ksql.execution.streams.StepSchemaResolver;
Expand Down Expand Up @@ -76,7 +76,7 @@ public void shouldSummarizeWithSource() {
final LogicalSchema schema = LogicalSchema.builder()
.valueColumn(ColumnName.of("L1"), SqlTypes.STRING)
.build();
final ExecutionStep step = givenStep(StreamMapValues.class, "child", schema, sourceStep);
final ExecutionStep step = givenStep(StreamSelect.class, "child", schema, sourceStep);

// When:
final String summary = planSummaryBuilder.summarize(step);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public abstract class CreateSourceCommand implements DdlCommand {
private final LogicalSchema schema;
private final Optional<ColumnName> keyField;
private final Optional<TimestampColumn> timestampColumn;
private final String kafkaTopicName;
private final String topicName;
private final Formats formats;
private final Optional<WindowInfo> windowInfo;

Expand All @@ -43,7 +43,7 @@ public abstract class CreateSourceCommand implements DdlCommand {
final LogicalSchema schema,
final Optional<ColumnName> keyField,
final Optional<TimestampColumn> timestampColumn,
final String kafkaTopicName,
final String topicName,
final Formats formats,
final Optional<WindowInfo> windowInfo
) {
Expand All @@ -52,7 +52,7 @@ public abstract class CreateSourceCommand implements DdlCommand {
this.keyField = Objects.requireNonNull(keyField, "keyField");
this.timestampColumn =
Objects.requireNonNull(timestampColumn, "timestampColumn");
this.kafkaTopicName = Objects.requireNonNull(kafkaTopicName, "kafkaTopicName");
this.topicName = Objects.requireNonNull(topicName, "topicName");
this.formats = Objects.requireNonNull(formats, "formats");
this.windowInfo = Objects.requireNonNull(windowInfo, "windowInfo");

Expand All @@ -78,8 +78,8 @@ public Optional<ColumnName> getKeyField() {
return keyField;
}

public String getKafkaTopicName() {
return kafkaTopicName;
public String getTopicName() {
return topicName;
}

public Formats getFormats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public CreateStreamCommand(
@JsonProperty(value = "keyField") Optional<ColumnName> keyField,
@JsonProperty(value = "timestampColumn")
Optional<TimestampColumn> timestampColumn,
@JsonProperty(value = "kafkaTopicName", required = true) String kafkaTopicName,
@JsonProperty(value = "topicName", required = true) String topicName,
@JsonProperty(value = "formats", required = true) final Formats formats,
@JsonProperty(value = "windowInfo") final Optional<WindowInfo> windowInfo
) {
Expand All @@ -42,7 +42,7 @@ public CreateStreamCommand(
schema,
keyField,
timestampColumn,
kafkaTopicName,
topicName,
formats,
windowInfo
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public CreateTableCommand(
@JsonProperty(value = "schema", required = true) LogicalSchema schema,
@JsonProperty("keyField") Optional<ColumnName> keyField,
@JsonProperty("timestampColumn") Optional<TimestampColumn> timestampColumn,
@JsonProperty(value = "kafkaTopicName", required = true) String kafkaTopicName,
@JsonProperty(value = "topicName", required = true) String topicName,
@JsonProperty(value = "formats", required = true) final Formats formats,
@JsonProperty(value = "windowInfo") final Optional<WindowInfo> windowInfo
) {
Expand All @@ -41,7 +41,7 @@ public CreateTableCommand(
schema,
keyField,
timestampColumn,
kafkaTopicName,
topicName,
formats,
windowInfo
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
@Immutable
public class RegisterTypeCommand implements DdlCommand {
private final SqlType type;
private final String name;
private final String typeName;

public RegisterTypeCommand(
@JsonProperty(value = "type", required = true) final SqlType type,
@JsonProperty(value = "name", required = true) final String name) {
@JsonProperty(value = "typeName", required = true) final String typeName) {
this.type = Objects.requireNonNull(type, "type");
this.name = Objects.requireNonNull(name, "name");
this.typeName = Objects.requireNonNull(typeName, "typeName");
}

@Override
Expand All @@ -41,7 +41,7 @@ public SqlType getType() {
return type;
}

public String getName() {
return name;
public String getTypeName() {
return typeName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
@Type(value = StreamFlatMap.class, name = "streamFlatMapV1"),
@Type(value = StreamGroupBy.class, name = "streamGroupByV1"),
@Type(value = StreamGroupByKey.class, name = "streamGroupByKeyV1"),
@Type(value = StreamMapValues.class, name = "streamMapValuesV1"),
@Type(value = StreamSelect.class, name = "streamSelectV1"),
@Type(value = StreamSelectKey.class, name = "streamSelectKeyV1"),
@Type(value = StreamSink.class, name = "streamSinkV1"),
@Type(value = StreamSource.class, name = "streamSourceV1"),
Expand All @@ -45,7 +45,7 @@
@Type(value = TableAggregate.class, name = "tableAggregateV1"),
@Type(value = TableFilter.class, name = "tableFilterV1"),
@Type(value = TableGroupBy.class, name = "tableGroupByV1"),
@Type(value = TableMapValues.class, name = "tableMapValuesV1"),
@Type(value = TableSelect.class, name = "tableSelectV1"),
@Type(value = TableSink.class, name = "tableSinkV1"),
@Type(value = TableTableJoin.class, name = "tableTableJoinV1")
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface PlanBuilder {

KTableHolder<Struct> visitStreamAggregate(StreamAggregate streamAggregate);

<K> KStreamHolder<K> visitStreamMapValues(StreamMapValues<K> streamMapValues);
<K> KStreamHolder<K> visitStreamSelect(StreamSelect<K> streamSelect);

<K> KStreamHolder<K> visitFlatMap(StreamFlatMap<K> streamFlatMap);

Expand Down Expand Up @@ -64,7 +64,7 @@ KTableHolder<Windowed<Struct>> visitStreamWindowedAggregate(

<K> KGroupedTableHolder visitTableGroupBy(TableGroupBy<K> tableGroupBy);

<K> KTableHolder<K> visitTableMapValues(TableMapValues<K> tableMapValues);
<K> KTableHolder<K> visitTableSelect(TableSelect<K> tableSelect);

<K> KTableHolder<K> visitTableSink(TableSink<K> tableSink);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,25 @@
public class StreamAggregate implements ExecutionStep<KTableHolder<Struct>> {
private final ExecutionStepPropertiesV1 properties;
private final ExecutionStep<KGroupedStreamHolder> source;
private final Formats formats;
private final Formats internalFormats;
private final int nonFuncColumnCount;
private final ImmutableList<FunctionCall> aggregations;
private final ImmutableList<FunctionCall> aggregationFunctions;

public StreamAggregate(
@JsonProperty(value = "properties", required = true)
ExecutionStepPropertiesV1 properties,
@JsonProperty(value = "source", required = true)
ExecutionStep<KGroupedStreamHolder> source,
@JsonProperty(value = "formats", required = true) Formats formats,
@JsonProperty(value = "internalFormats", required = true) Formats internalFormats,
@JsonProperty(value = "nonFuncColumnCount", required = true) int nonFuncColumnCount,
@JsonProperty(value = "aggregations", required = true)
List<FunctionCall> aggregations) {
@JsonProperty(value = "aggregationFunctions", required = true)
List<FunctionCall> aggregationFunctions) {
this.properties = requireNonNull(properties, "properties");
this.source = requireNonNull(source, "source");
this.formats = requireNonNull(formats, "formats");
this.internalFormats = requireNonNull(internalFormats, "internalFormats");
this.nonFuncColumnCount = nonFuncColumnCount;
this.aggregations = ImmutableList.copyOf(requireNonNull(aggregations, "aggregations"));
this.aggregationFunctions = ImmutableList.copyOf(
requireNonNull(aggregationFunctions, "aggregationFunctions"));
}

@Override
Expand All @@ -65,12 +66,12 @@ public int getNonFuncColumnCount() {
return nonFuncColumnCount;
}

public List<FunctionCall> getAggregations() {
return aggregations;
public List<FunctionCall> getAggregationFunctions() {
return aggregationFunctions;
}

public Formats getFormats() {
return formats;
public Formats getInternalFormats() {
return internalFormats;
}

public ExecutionStep<KGroupedStreamHolder> getSource() {
Expand All @@ -93,8 +94,8 @@ public boolean equals(Object o) {
StreamAggregate that = (StreamAggregate) o;
return Objects.equals(properties, that.properties)
&& Objects.equals(source, that.source)
&& Objects.equals(formats, that.formats)
&& Objects.equals(aggregations, that.aggregations)
&& Objects.equals(internalFormats, that.internalFormats)
&& Objects.equals(aggregationFunctions, that.aggregationFunctions)
&& nonFuncColumnCount == that.nonFuncColumnCount;
}

Expand All @@ -104,8 +105,8 @@ public int hashCode() {
return Objects.hash(
properties,
source,
formats,
aggregations,
internalFormats,
aggregationFunctions,
nonFuncColumnCount
);
}
Expand Down
Loading

0 comments on commit e81997c

Please sign in to comment.