Skip to content

Commit

Permalink
fix: convert AbstractColumnTimestampExtractor to KsqlTimestampExtractor
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Feb 25, 2020
1 parent dc5c900 commit 2eece42
Show file tree
Hide file tree
Showing 25 changed files with 233 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ Topologies:
--> Project
<-- Join-this-join, Join-other-join
Processor: Project (stores: [])
--> KSTREAM-TRANSFORM-0000000012
--> ApplyTimestampTransform-S1_JOIN_S2
<-- Join-merge
Processor: KSTREAM-TRANSFORM-0000000012 (stores: [])
--> KSTREAM-SINK-0000000013
Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: [])
--> KSTREAM-SINK-0000000012
<-- Project
Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2)
<-- KSTREAM-TRANSFORM-0000000012
Sink: KSTREAM-SINK-0000000012 (topic: S1_JOIN_S2)
<-- ApplyTimestampTransform-S1_JOIN_S2

Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ Topologies:
--> Project
<-- Join-this-join, Join-other-join
Processor: Project (stores: [])
--> KSTREAM-TRANSFORM-0000000012
--> ApplyTimestampTransform-S1_JOIN_S2
<-- Join-merge
Processor: KSTREAM-TRANSFORM-0000000012 (stores: [])
--> KSTREAM-SINK-0000000013
Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: [])
--> KSTREAM-SINK-0000000012
<-- Project
Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2)
<-- KSTREAM-TRANSFORM-0000000012
Sink: KSTREAM-SINK-0000000012 (topic: S1_JOIN_S2)
<-- ApplyTimestampTransform-S1_JOIN_S2

Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ Topologies:
--> KTABLE-TRANSFORMVALUES-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-TRANSFORM-0000000009
--> ApplyTimestampTransform-S1_JOIN_T1
<-- Join
Processor: KSTREAM-TRANSFORM-0000000009 (stores: [])
--> KSTREAM-SINK-0000000010
Processor: ApplyTimestampTransform-S1_JOIN_T1 (stores: [])
--> KSTREAM-SINK-0000000009
<-- Project
Processor: KTABLE-TRANSFORMVALUES-0000000002 (stores: [])
--> PrependAliasRight
<-- KTABLE-SOURCE-0000000001
Sink: KSTREAM-SINK-0000000010 (topic: S1_JOIN_T1)
<-- KSTREAM-TRANSFORM-0000000009
Sink: KSTREAM-SINK-0000000009 (topic: S1_JOIN_T1)
<-- ApplyTimestampTransform-S1_JOIN_T1
Processor: PrependAliasRight (stores: [])
--> none
<-- KTABLE-TRANSFORMVALUES-0000000002
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ Topologies:
--> KTABLE-TOSTREAM-0000000012
<-- KTABLE-MERGE-0000000008
Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
--> KSTREAM-TRANSFORM-0000000013
--> ApplyTimestampTransform-S1_JOIN_S2
<-- Project
Processor: KSTREAM-TRANSFORM-0000000013 (stores: [])
--> KSTREAM-SINK-0000000014
Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: [])
--> KSTREAM-SINK-0000000013
<-- KTABLE-TOSTREAM-0000000012
Sink: KSTREAM-SINK-0000000014 (topic: S1_JOIN_S2)
<-- KSTREAM-TRANSFORM-0000000013
Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2)
<-- ApplyTimestampTransform-S1_JOIN_S2

Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ Topologies:
--> KTABLE-TOSTREAM-0000000012
<-- KTABLE-MERGE-0000000008
Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
--> KSTREAM-TRANSFORM-0000000013
--> ApplyTimestampTransform-S1_JOIN_S2
<-- Project
Processor: KSTREAM-TRANSFORM-0000000013 (stores: [])
--> KSTREAM-SINK-0000000014
Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: [])
--> KSTREAM-SINK-0000000013
<-- KTABLE-TOSTREAM-0000000012
Sink: KSTREAM-SINK-0000000014 (topic: S1_JOIN_S2)
<-- KSTREAM-TRANSFORM-0000000013
Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2)
<-- ApplyTimestampTransform-S1_JOIN_S2

Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ Topologies:
--> Project
<-- Join-this-join, Join-other-join
Processor: Project (stores: [])
--> KSTREAM-TRANSFORM-0000000012
--> ApplyTimestampTransform-S1_JOIN_S2
<-- Join-merge
Processor: KSTREAM-TRANSFORM-0000000012 (stores: [])
--> KSTREAM-SINK-0000000013
Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: [])
--> KSTREAM-SINK-0000000012
<-- Project
Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2)
<-- KSTREAM-TRANSFORM-0000000012
Sink: KSTREAM-SINK-0000000012 (topic: S1_JOIN_S2)
<-- ApplyTimestampTransform-S1_JOIN_S2

Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ Topologies:
--> Project
<-- Join-this-join, Join-other-join
Processor: Project (stores: [])
--> KSTREAM-TRANSFORM-0000000012
--> ApplyTimestampTransform-S1_JOIN_S2
<-- Join-merge
Processor: KSTREAM-TRANSFORM-0000000012 (stores: [])
--> KSTREAM-SINK-0000000013
Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: [])
--> KSTREAM-SINK-0000000012
<-- Project
Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2)
<-- KSTREAM-TRANSFORM-0000000012
Sink: KSTREAM-SINK-0000000012 (topic: S1_JOIN_S2)
<-- ApplyTimestampTransform-S1_JOIN_S2

Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,16 @@ Topologies:
--> KTABLE-TRANSFORMVALUES-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-TRANSFORM-0000000009
--> ApplyTimestampTransform-S1_JOIN_T1
<-- Join
Processor: KSTREAM-TRANSFORM-0000000009 (stores: [])
--> KSTREAM-SINK-0000000010
Processor: ApplyTimestampTransform-S1_JOIN_T1 (stores: [])
--> KSTREAM-SINK-0000000009
<-- Project
Processor: KTABLE-TRANSFORMVALUES-0000000002 (stores: [])
--> PrependAliasRight
<-- KTABLE-SOURCE-0000000001
Sink: KSTREAM-SINK-0000000010 (topic: S1_JOIN_T1)
<-- KSTREAM-TRANSFORM-0000000009
Sink: KSTREAM-SINK-0000000009 (topic: S1_JOIN_T1)
<-- ApplyTimestampTransform-S1_JOIN_T1
Processor: PrependAliasRight (stores: [])
--> none
<-- KTABLE-TRANSFORMVALUES-0000000002
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ Topologies:
--> KTABLE-TOSTREAM-0000000012
<-- KTABLE-MERGE-0000000008
Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
--> KSTREAM-TRANSFORM-0000000013
--> ApplyTimestampTransform-S1_JOIN_S2
<-- Project
Processor: KSTREAM-TRANSFORM-0000000013 (stores: [])
--> KSTREAM-SINK-0000000014
Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: [])
--> KSTREAM-SINK-0000000013
<-- KTABLE-TOSTREAM-0000000012
Sink: KSTREAM-SINK-0000000014 (topic: S1_JOIN_S2)
<-- KSTREAM-TRANSFORM-0000000013
Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2)
<-- ApplyTimestampTransform-S1_JOIN_S2

Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ Topologies:
--> KTABLE-TOSTREAM-0000000012
<-- KTABLE-MERGE-0000000008
Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
--> KSTREAM-TRANSFORM-0000000013
--> ApplyTimestampTransform-S1_JOIN_S2
<-- Project
Processor: KSTREAM-TRANSFORM-0000000013 (stores: [])
--> KSTREAM-SINK-0000000014
Processor: ApplyTimestampTransform-S1_JOIN_S2 (stores: [])
--> KSTREAM-SINK-0000000013
<-- KTABLE-TOSTREAM-0000000012
Sink: KSTREAM-SINK-0000000014 (topic: S1_JOIN_S2)
<-- KSTREAM-TRANSFORM-0000000013
Sink: KSTREAM-SINK-0000000013 (topic: S1_JOIN_S2)
<-- ApplyTimestampTransform-S1_JOIN_S2

4 changes: 2 additions & 2 deletions ksql-rest-app/src/test/resources/ksql-plan-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@
}
},
"title" : "streamSinkV1",
"required" : [ "@type", "properties", "source", "formats", "topicName", "timestampColumn" ]
"required" : [ "@type", "properties", "source", "formats", "topicName" ]
},
"StreamSource" : {
"type" : "object",
Expand Down Expand Up @@ -812,7 +812,7 @@
}
},
"title" : "tableSinkV1",
"required" : [ "@type", "properties", "source", "formats", "topicName", "timestampColumn" ]
"required" : [ "@type", "properties", "source", "formats", "topicName" ]
},
"TableTableJoin" : {
"type" : "object",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.confluent.ksql.execution.context.QueryLoggerUtil;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.plan.KeySerdeFactory;
import io.confluent.ksql.execution.streams.timestamp.AbstractColumnTimestampExtractor;
import io.confluent.ksql.execution.streams.timestamp.KsqlTimestampExtractor;
import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicy;
import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
Expand All @@ -43,6 +43,8 @@
import org.apache.kafka.streams.processor.To;

public final class SinkBuilder {
private static final String TIMESTAMP_TRANSFORM_NAME = "ApplyTimestampTransform-";

private SinkBuilder() {
}

Expand Down Expand Up @@ -78,7 +80,8 @@ public static <K> void build(
);

final KStream<K, GenericRow> transformed = tsTransformer
.map(t -> stream.transform(t, Named.as(StreamsUtil.buildOpName(queryContext))))
.map(t -> stream.transform(t, Named.as(TIMESTAMP_TRANSFORM_NAME
+ StreamsUtil.buildOpName(queryContext))))
.orElse(stream);

transformed.to(topicName, Produced.with(keySerde, valueSerde));
Expand All @@ -105,9 +108,8 @@ private static <K> Optional<TransformTimestamp<K>> timestampTransformer(
.map(c -> sourceSchema.findValueColumn(c).orElseThrow(IllegalStateException::new))
.map(Column::index)
.map(timestampPolicy::create)
.filter(te -> te instanceof AbstractColumnTimestampExtractor)
.map(te -> new TransformTimestamp<>(
(AbstractColumnTimestampExtractor)te,
te,
queryBuilder
.getProcessingLogContext()
.getLoggerFactory()
Expand All @@ -123,11 +125,11 @@ private static <K> Optional<TransformTimestamp<K>> timestampTransformer(

static class TransformTimestamp<K>
implements TransformerSupplier<K, GenericRow, KeyValue<K, GenericRow>> {
private final AbstractColumnTimestampExtractor timestampExtractor;
private final KsqlTimestampExtractor timestampExtractor;
private final ProcessingLogger processingLogger;

TransformTimestamp(
final AbstractColumnTimestampExtractor timestampExtractor,
final KsqlTimestampExtractor timestampExtractor,
final ProcessingLogger processingLogger
) {
this.timestampExtractor = requireNonNull(timestampExtractor, "timestampExtractor");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.execution.streams.timestamp;

import io.confluent.ksql.GenericRow;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

public interface KsqlTimestampExtractor extends TimestampExtractor {
default long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
return extract((GenericRow) record.value());
}

long extract(GenericRow row);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.name.ColumnName;
import java.util.Objects;
import org.apache.kafka.streams.processor.TimestampExtractor;

@Immutable
public class LongColumnTimestampExtractionPolicy implements TimestampExtractionPolicy {
Expand All @@ -31,7 +30,7 @@ public LongColumnTimestampExtractionPolicy(final ColumnName timestampField) {
}

@Override
public TimestampExtractor create(final int columnIndex) {
public KsqlTimestampExtractor create(final int columnIndex) {
return new LongTimestampExtractor(columnIndex);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

import io.confluent.ksql.GenericRow;

public class LongTimestampExtractor extends AbstractColumnTimestampExtractor {
public class LongTimestampExtractor implements KsqlTimestampExtractor {
private final int timestampColumnindex;

LongTimestampExtractor(final int timestampColumnindex) {
super(timestampColumnindex);
this.timestampColumnindex = timestampColumnindex;
}

@Override
public long extract(final GenericRow row) {
return (long)row.get(timetampColumnIndex);
return (long)row.get(timestampColumnindex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public MetadataTimestampExtractionPolicy(final TimestampExtractor timestampExtra
}

@Override
public TimestampExtractor create(final int columnIndex) {
return timestampExtractor;
public KsqlTimestampExtractor create(final int columnIndex) {
return new MetadataTimestampExtractor(timestampExtractor);
}

@Override
Expand Down
Loading

0 comments on commit 2eece42

Please sign in to comment.