Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: csas/ctas with timestamp column is used for output rowtime #4489

Merged
merged 8 commits into from
Feb 26, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
getKsqlTopic().getKafkaTopicName(),
getKsqlTopic().getValueFormat(),
serdeOptions,
contextStacker
contextStacker,
getTimestampColumn()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.confluent.ksql.execution.plan.StreamTableJoin;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.streams.StepSchemaResolver;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.name.ColumnName;
Expand Down Expand Up @@ -97,13 +98,15 @@ public SchemaKStream<K> into(
final String kafkaTopicName,
final ValueFormat valueFormat,
final Set<SerdeOption> options,
final QueryContext.Stacker contextStacker
final QueryContext.Stacker contextStacker,
final Optional<TimestampColumn> timestampColumn
) {
final StreamSink<K> step = ExecutionStepFactory.streamSink(
contextStacker,
Formats.of(keyFormat, valueFormat, options),
sourceStep,
kafkaTopicName
kafkaTopicName,
timestampColumn
);
return new SchemaKStream<>(
step,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.confluent.ksql.execution.plan.TableSink;
import io.confluent.ksql.execution.plan.TableTableJoin;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.name.ColumnName;
Expand Down Expand Up @@ -73,13 +74,15 @@ public SchemaKTable<K> into(
final String kafkaTopicName,
final ValueFormat valueFormat,
final Set<SerdeOption> options,
final QueryContext.Stacker contextStacker
final QueryContext.Stacker contextStacker,
final Optional<TimestampColumn> timestampColumn
) {
final TableSink<K> step = ExecutionStepFactory.tableSink(
contextStacker,
sourceTableStep,
Formats.of(keyFormat, valueFormat, options),
kafkaTopicName
kafkaTopicName,
timestampColumn
);
return new SchemaKTable<>(
step,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,8 @@ public void shouldUseTimestampExtractedFromDDLStatement() throws Exception {
final List<ConsumerRecord<byte[], byte[]>> records =
TEST_HARNESS.verifyAvailableRecords(resultStream.toUpperCase(), 1);

final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
final long timestamp = records.get(0).timestamp();
assertThat(timestamp, equalTo(dateFormat.parse("2018-01-04").getTime()));
assertThat(timestamp, is(4L));
}

private void testTimestampColumnSelection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void before() {
when(sourceNode.getNodeOutputType()).thenReturn(DataSourceType.KSTREAM);
when(sourceNode.buildStream(ksqlStreamBuilder)).thenReturn((SchemaKStream) sourceStream);

when(sourceStream.into(any(), any(), any(), any()))
when(sourceStream.into(any(), any(), any(), any(), any()))
.thenReturn((SchemaKStream) sinkStream);

when(ksqlStreamBuilder.buildNodeContext(any())).thenAnswer(inv ->
Expand Down Expand Up @@ -189,7 +189,7 @@ public void shouldBuildOutputNodeForInsertIntoAvroFromNonAvro() {
outputNode.buildStream(ksqlStreamBuilder);

// Then:
verify(sourceStream).into(any(), eq(valueFormat), any(), any());
verify(sourceStream).into(any(), eq(valueFormat), any(), any(), any());
}

@Test
Expand All @@ -202,7 +202,8 @@ public void shouldCallInto() {
eq(SINK_KAFKA_TOPIC_NAME),
eq(JSON_FORMAT),
eq(SerdeOption.none()),
stackerCaptor.capture()
stackerCaptor.capture(),
eq(outputNode.getTimestampColumn())
);
assertThat(
stackerCaptor.getValue().getQueryContext().getContext(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,33 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.execution.timestamp.TimestampColumn;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

@Immutable
public class StreamSink<K> implements ExecutionStep<KStreamHolder<K>> {
private final ExecutionStepPropertiesV1 properties;
private final ExecutionStep<KStreamHolder<K>> source;
private final Formats formats;
private final String topicName;
private final Optional<TimestampColumn> timestampColumn;

public StreamSink(
@JsonProperty(value = "properties", required = true) final ExecutionStepPropertiesV1 props,
@JsonProperty(value = "source", required = true) final ExecutionStep<KStreamHolder<K>> source,
@JsonProperty(value = "formats", required = true) final Formats formats,
@JsonProperty(value = "topicName", required = true) final String topicName) {
@JsonProperty(value = "topicName", required = true) final String topicName,
@JsonProperty(value = "timestampColumn") final Optional<TimestampColumn> timestampColumn
) {
this.properties = Objects.requireNonNull(props, "props");
this.formats = Objects.requireNonNull(formats, "formats");
this.source = Objects.requireNonNull(source, "source");
this.topicName = Objects.requireNonNull(topicName, "topicName");
this.timestampColumn = Objects.requireNonNull(timestampColumn, "timestampColumn");
}

public String getTopicName() {
Expand All @@ -60,6 +67,10 @@ public ExecutionStep<KStreamHolder<K>> getSource() {
return source;
}

public Optional<TimestampColumn> getTimestampColumn() {
return timestampColumn;
}

@Override
public KStreamHolder<K> build(final PlanBuilder builder) {
return builder.visitStreamSink(this);
Expand All @@ -77,12 +88,13 @@ public boolean equals(final Object o) {
return Objects.equals(properties, that.properties)
&& Objects.equals(source, that.source)
&& Objects.equals(formats, that.formats)
&& Objects.equals(topicName, that.topicName);
&& Objects.equals(topicName, that.topicName)
&& Objects.equals(timestampColumn, that.timestampColumn);
}

@Override
public int hashCode() {

return Objects.hash(properties, source, formats, topicName);
return Objects.hash(properties, source, formats, topicName, timestampColumn);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,33 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.execution.timestamp.TimestampColumn;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

@Immutable
public class TableSink<K> implements ExecutionStep<KTableHolder<K>> {
private final ExecutionStepPropertiesV1 properties;
private final ExecutionStep<KTableHolder<K>> source;
private final Formats formats;
private final String topicName;
private final Optional<TimestampColumn> timestampColumn;

public TableSink(
@JsonProperty(value = "properties", required = true) final ExecutionStepPropertiesV1 props,
@JsonProperty(value = "source", required = true) final ExecutionStep<KTableHolder<K>> source,
@JsonProperty(value = "formats", required = true) final Formats formats,
@JsonProperty(value = "topicName", required = true) final String topicName
@JsonProperty(value = "topicName", required = true) final String topicName,
@JsonProperty(value = "timestampColumn") final Optional<TimestampColumn> timestampColumn
) {
this.properties = Objects.requireNonNull(props, "props");
this.source = Objects.requireNonNull(source, "source");
this.formats = Objects.requireNonNull(formats, "formats");
this.topicName = Objects.requireNonNull(topicName, "topicName");
this.timestampColumn = Objects.requireNonNull(timestampColumn, "timestampColumn");
}

@Override
Expand All @@ -63,6 +69,10 @@ public ExecutionStep<KTableHolder<K>> getSource() {
return source;
}

public Optional<TimestampColumn> getTimestampColumn() {
return timestampColumn;
}

@Override
public KTableHolder<K> build(final PlanBuilder builder) {
return builder.visitTableSink(this);
Expand All @@ -80,12 +90,13 @@ public boolean equals(final Object o) {
return Objects.equals(properties, tableSink.properties)
&& Objects.equals(source, tableSink.source)
&& Objects.equals(formats, tableSink.formats)
&& Objects.equals(topicName, tableSink.topicName);
&& Objects.equals(topicName, tableSink.topicName)
&& Objects.equals(timestampColumn, tableSink.timestampColumn);
}

@Override
public int hashCode() {

return Objects.hash(properties, source, formats, topicName);
return Objects.hash(properties, source, formats, topicName, timestampColumn);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
package io.confluent.ksql.execution.plan;

import com.google.common.testing.EqualsTester;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.name.ColumnName;
import org.apache.kafka.connect.data.Struct;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Optional;

@RunWith(MockitoJUnitRunner.class)
public class StreamSinkTest {
@Mock
Expand All @@ -40,11 +44,13 @@ public class StreamSinkTest {
public void shouldImplementEquals() {
new EqualsTester()
.addEqualityGroup(
new StreamSink<>(properties1, source1, formats1, "topic1"),
new StreamSink<>(properties1, source1, formats1, "topic1"))
.addEqualityGroup(new StreamSink<>(properties2, source1, formats1, "topic1"))
.addEqualityGroup(new StreamSink<>(properties1, source2, formats1, "topic1"))
.addEqualityGroup(new StreamSink<>(properties1, source1, formats2, "topic1"))
.addEqualityGroup(new StreamSink<>(properties1, source1, formats1, "topic2"));
new StreamSink<>(properties1, source1, formats1, "topic1", Optional.empty()),
new StreamSink<>(properties1, source1, formats1, "topic1", Optional.empty()))
.addEqualityGroup(new StreamSink<>(properties2, source1, formats1, "topic1", Optional.empty()))
.addEqualityGroup(new StreamSink<>(properties1, source2, formats1, "topic1", Optional.empty()))
.addEqualityGroup(new StreamSink<>(properties1, source1, formats2, "topic1", Optional.empty()))
.addEqualityGroup(new StreamSink<>(properties1, source1, formats1, "topic2", Optional.empty()))
.addEqualityGroup(new StreamSink<>(properties1, source1, formats1, "topic1",
Optional.of(new TimestampColumn(ColumnName.of("c1"), Optional.of("BIGINT")))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
package io.confluent.ksql.execution.plan;

import com.google.common.testing.EqualsTester;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.name.ColumnName;
import org.apache.kafka.connect.data.Struct;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Optional;

@RunWith(MockitoJUnitRunner.class)
public class TableSinkTest {
@Mock
Expand All @@ -40,11 +44,13 @@ public class TableSinkTest {
public void shouldImplementEquals() {
new EqualsTester()
.addEqualityGroup(
new TableSink<>(properties1, source1, formats1, "topic1"),
new TableSink<>(properties1, source1, formats1, "topic1"))
.addEqualityGroup(new TableSink<>(properties2, source1, formats1, "topic1"))
.addEqualityGroup(new TableSink<>(properties1, source2, formats1, "topic1"))
.addEqualityGroup(new TableSink<>(properties1, source1, formats2, "topic1"))
.addEqualityGroup(new TableSink<>(properties1, source1, formats1, "topic2"));
new TableSink<>(properties1, source1, formats1, "topic1", Optional.empty()),
new TableSink<>(properties1, source1, formats1, "topic1", Optional.empty()))
.addEqualityGroup(new TableSink<>(properties2, source1, formats1, "topic1", Optional.empty()))
.addEqualityGroup(new TableSink<>(properties1, source2, formats1, "topic1", Optional.empty()))
.addEqualityGroup(new TableSink<>(properties1, source1, formats2, "topic1", Optional.empty()))
.addEqualityGroup(new TableSink<>(properties1, source1, formats1, "topic2", Optional.empty()))
.addEqualityGroup(new TableSink<>(properties1, source1, formats1, "topic1",
Optional.of(new TimestampColumn(ColumnName.of("c1"), Optional.of("BIGINT")))));;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,11 @@ Topologies:
--> Project
<-- Join-this-join, Join-other-join
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000012
--> ApplyTimestampTransform-S1_JOIN_S2
<-- Join-merge
Sink: KSTREAM-SINK-0000000012 (topic: S1_JOIN_S2)
Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: [])
--> KSTREAM-SINK-0000000012
<-- Project
Sink: KSTREAM-SINK-0000000012 (topic: S1_JOIN_S2)
<-- ApplyTimestampTransform-S1_JOIN_S2

Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,11 @@ Topologies:
--> Project
<-- Join-this-join, Join-other-join
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000012
--> ApplyTimestampTransform-S1_JOIN_S2
<-- Join-merge
Sink: KSTREAM-SINK-0000000012 (topic: S1_JOIN_S2)
Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: [])
--> KSTREAM-SINK-0000000012
<-- Project
Sink: KSTREAM-SINK-0000000012 (topic: S1_JOIN_S2)
<-- ApplyTimestampTransform-S1_JOIN_S2

Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,28 @@ Topologies:
Processor: KSTREAM-TRANSFORMVALUES-0000000005 (stores: [])
--> PrependAliasLeft
<-- KSTREAM-SOURCE-0000000004
Source: KSTREAM-SOURCE-0000000000 (topics: [t1])
--> KTABLE-SOURCE-0000000001
Processor: PrependAliasLeft (stores: [])
--> Join
<-- KSTREAM-TRANSFORMVALUES-0000000005
Processor: Join (stores: [KafkaTopic_Right-Reduce])
--> Project
<-- PrependAliasLeft
Source: KSTREAM-SOURCE-0000000000 (topics: [t1])
--> KTABLE-SOURCE-0000000001
Processor: KTABLE-SOURCE-0000000001 (stores: [KafkaTopic_Right-Reduce])
--> KTABLE-TRANSFORMVALUES-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> ApplyTimestampTransform-S1_JOIN_T1
<-- Join
Processor: ApplyTimestampTransform-S1_JOIN_T1 (stores: [])
--> KSTREAM-SINK-0000000009
<-- Project
Processor: KTABLE-TRANSFORMVALUES-0000000002 (stores: [])
--> PrependAliasRight
<-- KTABLE-SOURCE-0000000001
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000009
<-- Join
Sink: KSTREAM-SINK-0000000009 (topic: S1_JOIN_T1)
<-- Project
<-- ApplyTimestampTransform-S1_JOIN_T1
Processor: PrependAliasRight (stores: [])
--> none
<-- KTABLE-TRANSFORMVALUES-0000000002
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ Topologies:
--> KTABLE-TOSTREAM-0000000012
<-- KTABLE-MERGE-0000000008
Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
--> KSTREAM-SINK-0000000013
--> ApplyTimestampTransform-S1_JOIN_S2
<-- Project
Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2)
Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: [])
--> KSTREAM-SINK-0000000013
<-- KTABLE-TOSTREAM-0000000012
Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2)
<-- ApplyTimestampTransform-S1_JOIN_S2

Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ Topologies:
--> KTABLE-TOSTREAM-0000000012
<-- KTABLE-MERGE-0000000008
Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
--> KSTREAM-SINK-0000000013
--> ApplyTimestampTransform-S1_JOIN_S2
<-- Project
Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2)
Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: [])
--> KSTREAM-SINK-0000000013
<-- KTABLE-TOSTREAM-0000000012
Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2)
<-- ApplyTimestampTransform-S1_JOIN_S2

Loading