Skip to content

Commit

Permalink
refactor: clean up timestamp extraction policy from physical plan (#3936
Browse files Browse the repository at this point in the history
)

* refactor: clean up timestamp extraction policy from physical plan

This patch cleans up the timestamp extraction policy from the physical
plan. Instead, we include a pojo called TimestampColumn, that includes
the information necessary to construction the timestamp extractor -
the column name, and optionally a format string.
  • Loading branch information
rodesai authored Nov 27, 2019
1 parent 8fecb39 commit cb817aa
Show file tree
Hide file tree
Showing 68 changed files with 658 additions and 413 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.timestamp.PartialStringToTimestampParser;
import io.confluent.ksql.util.timestamp.StringToTimestampParser;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import io.confluent.ksql.util.timestamp.StringToTimestampParser;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import org.hamcrest.Description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.confluent.ksql.execution.ddl.commands.CreateStreamCommand;
import io.confluent.ksql.execution.ddl.commands.CreateTableCommand;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
Expand All @@ -44,8 +46,6 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy;
import io.confluent.ksql.util.timestamp.TimestampExtractionPolicyFactory;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -79,7 +79,7 @@ public CreateStreamCommand createStreamCommand(
final KsqlTopic topic = buildTopic(statement.getProperties(), serviceContext);
final LogicalSchema schema = buildSchema(statement.getElements());
final Optional<ColumnName> keyFieldName = buildKeyFieldName(statement, schema);
final TimestampExtractionPolicy timestampExtractionPolicy = buildTimestampExtractor(
final Optional<TimestampColumn> timestampColumn = buildTimestampColumn(
ksqlConfig,
statement.getProperties(),
schema
Expand All @@ -101,7 +101,7 @@ public CreateStreamCommand createStreamCommand(
sourceName,
schema,
keyFieldName,
timestampExtractionPolicy,
timestampColumn,
serdeOptions,
topic
);
Expand All @@ -115,7 +115,7 @@ public CreateTableCommand createTableCommand(
final KsqlTopic topic = buildTopic(statement.getProperties(), serviceContext);
final LogicalSchema schema = buildSchema(statement.getElements());
final Optional<ColumnName> keyFieldName = buildKeyFieldName(statement, schema);
final TimestampExtractionPolicy timestampExtractionPolicy = buildTimestampExtractor(
final Optional<TimestampColumn> timestampColumn = buildTimestampColumn(
ksqlConfig,
statement.getProperties(),
schema
Expand All @@ -137,7 +137,7 @@ public CreateTableCommand createTableCommand(
sourceName,
schema,
keyFieldName,
timestampExtractionPolicy,
timestampColumn,
serdeOptions,
topic
);
Expand Down Expand Up @@ -203,15 +203,18 @@ private static KsqlTopic buildTopic(
return TopicFactory.create(properties);
}

private static TimestampExtractionPolicy buildTimestampExtractor(
private static Optional<TimestampColumn> buildTimestampColumn(
final KsqlConfig ksqlConfig,
final CreateSourceProperties properties,
final LogicalSchema schema
) {
final Optional<ColumnRef> timestampName = properties.getTimestampColumnName();
final Optional<String> timestampFormat = properties.getTimestampFormat();
return TimestampExtractionPolicyFactory
.create(ksqlConfig, schema, timestampName, timestampFormat);
final Optional<TimestampColumn> timestampColumn = timestampName.map(
n -> new TimestampColumn(n, properties.getTimestampFormat())
);
// create the final extraction policy to validate that the ref/format are OK
TimestampExtractionPolicyFactory.validateTimestampColumn(ksqlConfig, schema, timestampColumn);
return timestampColumn;
}

private static void validateSerdeCanHandleSchemas(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public DdlCommandResult executeCreateStream(final CreateStreamCommand createStre
createStream.getSchema(),
createStream.getSerdeOptions(),
getKeyField(createStream.getKeyField()),
createStream.getTimestampExtractionPolicy(),
createStream.getTimestampColumn(),
withQuery,
createStream.getTopic()
);
Expand All @@ -88,7 +88,7 @@ public DdlCommandResult executeCreateTable(final CreateTableCommand createTable)
createTable.getSchema(),
createTable.getSerdeOptions(),
getKeyField(createTable.getKeyField()),
createTable.getTimestampExtractionPolicy(),
createTable.getTimestampColumn(),
withQuery,
createTable.getTopic()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ private Optional<DdlCommand> maybeCreateSinkDdl(
outputNode.getIntoSourceName(),
outputNode.getSchema(),
keyField.ref().map(ColumnRef::name),
outputNode.getTimestampExtractionPolicy(),
outputNode.getTimestampColumn(),
outputNode.getSerdeOptions(),
outputNode.getKsqlTopic()
);
Expand All @@ -241,7 +241,7 @@ private Optional<DdlCommand> maybeCreateSinkDdl(
outputNode.getIntoSourceName(),
outputNode.getSchema(),
keyField.ref().map(ColumnRef::name),
outputNode.getTimestampExtractionPolicy(),
outputNode.getTimestampColumn(),
outputNode.getSerdeOptions(),
outputNode.getKsqlTopic()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.execution.util.ExpressionTypeManager;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.model.KeyField;
Expand All @@ -47,8 +49,6 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy;
import io.confluent.ksql.util.timestamp.TimestampExtractionPolicyFactory;
import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -96,16 +96,15 @@ public OutputNode buildPlan() {

private OutputNode buildOutputNode(final PlanNode sourcePlanNode) {
final LogicalSchema inputSchema = sourcePlanNode.getSchema();
final TimestampExtractionPolicy extractionPolicy =
getTimestampExtractionPolicy(inputSchema, analysis);
final Optional<TimestampColumn> timestampColumn = getTimestampColumn(inputSchema, analysis);

if (!analysis.getInto().isPresent()) {
return new KsqlBareOutputNode(
new PlanNodeId("KSQL_STDOUT_NAME"),
sourcePlanNode,
inputSchema,
analysis.getLimitClause(),
extractionPolicy
timestampColumn
);
}

Expand All @@ -126,7 +125,7 @@ private OutputNode buildOutputNode(final PlanNode sourcePlanNode) {
new PlanNodeId(intoDataSource.getName().name()),
sourcePlanNode,
inputSchema,
extractionPolicy,
timestampColumn,
keyField,
intoDataSource.getKsqlTopic(),
partitionByField,
Expand Down Expand Up @@ -161,15 +160,21 @@ private KeyField buildOutputKeyField(
return KeyField.of(partitionBy);
}

private TimestampExtractionPolicy getTimestampExtractionPolicy(
private Optional<TimestampColumn> getTimestampColumn(
final LogicalSchema inputSchema,
final Analysis analysis
) {
return TimestampExtractionPolicyFactory.create(
final Optional<ColumnRef> timestampColumnName =
analysis.getProperties().getTimestampColumnName();
final Optional<TimestampColumn> timestampColumn = timestampColumnName.map(
n -> new TimestampColumn(n, analysis.getProperties().getTimestampFormat())
);
TimestampExtractionPolicyFactory.validateTimestampColumn(
ksqlConfig,
inputSchema,
analysis.getProperties().getTimestampColumnName(),
analysis.getProperties().getTimestampFormat());
timestampColumn
);
return timestampColumn;
}

private AggregateNode buildAggregateNode(final PlanNode sourcePlanNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.structured.SchemaKStream;
Expand Down Expand Up @@ -142,7 +141,6 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
dataSource,
schema,
contextStacker.push(SOURCE_OP_NAME),
timestampIndex(),
getAutoOffsetReset(builder.getKsqlConfig().getKsqlStreamConfigProps()),
keyField,
alias
Expand All @@ -164,24 +162,12 @@ SchemaKStream<?> create(
DataSource<?> dataSource,
LogicalSchemaWithMetaAndKeyFields schemaWithMetaAndKeyFields,
QueryContext.Stacker contextStacker,
int timestampIndex,
Optional<AutoOffsetReset> offsetReset,
KeyField keyField,
SourceName alias
);
}

private int timestampIndex() {
final LogicalSchema originalSchema = dataSource.getSchema();
final ColumnRef timestampField = dataSource.getTimestampExtractionPolicy().getTimestampField();
if (timestampField == null) {
return -1;
}

return originalSchema.valueColumnIndex(timestampField)
.orElseThrow(IllegalStateException::new);
}

private static Optional<Topology.AutoOffsetReset> getAutoOffsetReset(
final Map<String, Object> props) {
final Object offestReset = props.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
package io.confluent.ksql.planner.plan;

import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.id.QueryIdGenerator;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.ThreadLocalRandom;

Expand All @@ -35,9 +36,9 @@ public KsqlBareOutputNode(
final PlanNode source,
final LogicalSchema schema,
final OptionalInt limit,
final TimestampExtractionPolicy extractionPolicy
final Optional<TimestampColumn> timestampColumn
) {
super(id, source, schema, limit, extractionPolicy);
super(id, source, schema, limit, timestampColumn);
this.keyField = KeyField.of(source.getKeyField().ref())
.validateKeyExistsIn(schema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.QueryId;
Expand All @@ -31,7 +32,6 @@
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
Expand All @@ -51,7 +51,7 @@ public KsqlStructuredDataOutputNode(
final PlanNodeId id,
final PlanNode source,
final LogicalSchema schema,
final TimestampExtractionPolicy timestampExtractionPolicy,
final Optional<TimestampColumn> timestampColumn,
final KeyField keyField,
final KsqlTopic ksqlTopic,
final Optional<ColumnRef> partitionByField,
Expand All @@ -69,7 +69,7 @@ public KsqlStructuredDataOutputNode(
// This leads to strange behaviour, but changing it is a breaking change.
schema.withoutMetaAndKeyColsInValue(),
limit,
timestampExtractionPolicy
timestampColumn
);

this.serdeOptions = ImmutableSet.copyOf(requireNonNull(serdeOptions, "serdeOptions"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.id.QueryIdGenerator;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import javax.annotation.concurrent.Immutable;

Expand All @@ -37,22 +38,22 @@ public abstract class OutputNode
private final PlanNode source;
private final LogicalSchema schema;
private final OptionalInt limit;
private final TimestampExtractionPolicy timestampExtractionPolicy;
private final Optional<TimestampColumn> timestampColumn;

protected OutputNode(
final PlanNodeId id,
final PlanNode source,
final LogicalSchema schema,
final OptionalInt limit,
final TimestampExtractionPolicy timestampExtractionPolicy
final Optional<TimestampColumn> timestampColumn
) {
super(id, source.getNodeOutputType());

this.source = requireNonNull(source, "source");
this.schema = requireNonNull(schema, "schema");
this.limit = requireNonNull(limit, "limit");
this.timestampExtractionPolicy =
requireNonNull(timestampExtractionPolicy, "timestampExtractionPolicy");
this.timestampColumn =
requireNonNull(timestampColumn, "timestampColumn");
}

@Override
Expand Down Expand Up @@ -88,8 +89,8 @@ public <C, R> R accept(final PlanVisitor<C, R> visitor, final C context) {
return visitor.visitOutput(this, context);
}

public TimestampExtractionPolicy getTimestampExtractionPolicy() {
return timestampExtractionPolicy;
public Optional<TimestampColumn> getTimestampColumn() {
return timestampColumn;
}

public abstract QueryId getQueryId(QueryIdGenerator queryIdGenerator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ public static SchemaKStream<?> forSource(
final DataSource<?> dataSource,
final LogicalSchemaWithMetaAndKeyFields schemaWithMetaAndKeyFields,
final QueryContext.Stacker contextStacker,
final int timestampIndex,
final Optional<AutoOffsetReset> offsetReset,
final KeyField keyField,
final SourceName alias
Expand All @@ -131,8 +130,7 @@ public static SchemaKStream<?> forSource(
schemaWithMetaAndKeyFields,
topic.getKafkaTopicName(),
Formats.of(topic.getKeyFormat(), topic.getValueFormat(), dataSource.getSerdeOptions()),
dataSource.getTimestampExtractionPolicy(),
timestampIndex,
dataSource.getTimestampColumn(),
offsetReset,
alias
);
Expand All @@ -147,8 +145,7 @@ public static SchemaKStream<?> forSource(
schemaWithMetaAndKeyFields,
topic.getKafkaTopicName(),
Formats.of(topic.getKeyFormat(), topic.getValueFormat(), dataSource.getSerdeOptions()),
dataSource.getTimestampExtractionPolicy(),
timestampIndex,
dataSource.getTimestampColumn(),
offsetReset,
alias
);
Expand Down
Loading

0 comments on commit cb817aa

Please sign in to comment.