Skip to content

Commit

Permalink
fix: support WindowStart() and WindowEnd() in pull queries (#4435)
Browse files Browse the repository at this point in the history
fixes: #4015

At present `WindowStart()` and `WindowEnd()` UDAFs are special cased.  This special casing was not being applied to pull queries. This change corrects this.

Consider:

```json
-- setup:
CREATE STREAM ALL_TWEETS (LANG STRING) WITH (kafka_topic='test_topic', value_format='JSON');
CREATE TABLE TWEET_LANG AS SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss Z', 'UTC') AS WSTART, TIMESTAMPTOSTRING(WINDOWEND(),'yyyy-MM-dd HH:mm:ss Z', 'UTC') AS WEND, LANG, COUNT(*) AS TWEET_COUNT FROM ALL_TWEETS WINDOW TUMBLING (SIZE 1 SECOND) GROUP BY LANG;

-- pull query:
SELECT WSTART, WEND, LANG, TWEET_COUNT FROM TWEET_LANG WHERE ROWKEY='en';
```

Before this change the pull query would return `null` values for `WSTART` and `WEND`, even though the data was correctly populated in the `TWEET_LANG` topic. With this change the correct values for `WSTART` and `WEND` are returned.

The main fix is in `StreamAggregateBuilder`, which now applies a suitable `map` call that will apply the special processing to any pull query.
  • Loading branch information
big-andy-coates authored Feb 4, 2020
1 parent c03c9b7 commit 8da2b63
Show file tree
Hide file tree
Showing 16 changed files with 155 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;

Expand Down Expand Up @@ -60,15 +61,15 @@ public boolean hasSelects() {
return !windowSelects.isEmpty();
}

public <K> KsqlTransformer<Windowed<K>, GenericRow> getTransformer() {
return new Transformer<>();
public KsqlTransformer<Windowed<Struct>, GenericRow> getTransformer() {
return new Transformer();
}

private final class Transformer<K> implements KsqlTransformer<Windowed<K>, GenericRow> {
private final class Transformer implements KsqlTransformer<Windowed<Struct>, GenericRow> {

@Override
public GenericRow transform(
final Windowed<K> readOnlyKey,
final Windowed<Struct> readOnlyKey,
final GenericRow value,
final KsqlProcessingContext ctx
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@
*/
public final class StructKeyUtil {

private static final Schema ROWKEY_STRUCT_SCHEMA = SchemaBuilder
.struct()
.field(SchemaUtil.ROWKEY_NAME.name(), Schema.OPTIONAL_STRING_SCHEMA)
.build();

private static final org.apache.kafka.connect.data.Field ROWKEY_FIELD =
ROWKEY_STRUCT_SCHEMA.fields().get(0);

private StructKeyUtil() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.transform.KsqlProcessingContext;
import io.confluent.ksql.execution.transform.KsqlTransformer;
import io.confluent.ksql.execution.util.StructKeyUtil;
import io.confluent.ksql.function.KsqlAggregateFunction;
import io.confluent.ksql.name.FunctionName;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
Expand All @@ -39,6 +42,9 @@
@RunWith(MockitoJUnitRunner.class)
public class WindowSelectMapperTest {

private static final Struct A_KEY = StructKeyUtil.keyBuilder(SqlTypes.STRING)
.build("key");

@Mock
private KsqlAggregateFunction<?, ?, ?> windowStartFunc;
@Mock
Expand Down Expand Up @@ -76,7 +82,7 @@ public void shouldDetectWindowEndSelects() {
@Test
public void shouldUpdateRowWithWindowBounds() {
// Given:
final KsqlTransformer<Windowed<Object>, GenericRow> mapper = new WindowSelectMapper(
final KsqlTransformer<Windowed<Struct>, GenericRow> mapper = new WindowSelectMapper(
1,
ImmutableList.of(otherFunc, windowStartFunc, windowEndFunc, windowStartFunc)
).getTransformer();
Expand All @@ -85,7 +91,7 @@ public void shouldUpdateRowWithWindowBounds() {
final GenericRow row = genericRow(0, 1, 2, 3, 4, 5);

// When:
final GenericRow result = mapper.transform(new Windowed<>("k", window), row, ctx);
final GenericRow result = mapper.transform(new Windowed<>(A_KEY, window), row, ctx);

// Then:
assertThat(result, is(sameInstance(row)));
Expand All @@ -95,7 +101,7 @@ public void shouldUpdateRowWithWindowBounds() {
@Test(expected = IndexOutOfBoundsException.class)
public void shouldThrowIfRowNotBigEnough() {
// Given:
final KsqlTransformer<Windowed<Object>, GenericRow> mapper = new WindowSelectMapper(
final KsqlTransformer<Windowed<Struct>, GenericRow> mapper = new WindowSelectMapper(
0,
ImmutableList.of(windowStartFunc)
).getTransformer();
Expand All @@ -104,6 +110,6 @@ public void shouldThrowIfRowNotBigEnough() {
final GenericRow row = genericRow();

// When:
mapper.transform(new Windowed<>("k", window), row, ctx);
mapper.transform(new Windowed<>(A_KEY, window), row, ctx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,28 @@
{"row":{"columns":["10", "2020-01-29 17:19:45 +0000", "2020-01-29 17:19:46 +0000", 1]}}
]}
]
},
{
"name": "windowStart and windowEnd UDAFs",
"statements": [
"CREATE STREAM ALL_TWEETS (LANG STRING) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE TABLE TWEET_LANG AS SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss Z', 'UTC') AS WSTART, TIMESTAMPTOSTRING(WINDOWEND(),'yyyy-MM-dd HH:mm:ss Z', 'UTC') AS WEND, LANG, COUNT(*) AS TWEET_COUNT FROM ALL_TWEETS WINDOW TUMBLING (SIZE 1 SECOND) GROUP BY LANG;",
"SELECT WSTART, WEND, LANG, TWEET_COUNT FROM TWEET_LANG WHERE ROWKEY='en';"
],
"inputs": [
{"topic": "test_topic", "timestamp": 1580223282123, "value": {"lang": "en"}},
{"topic": "test_topic", "timestamp": 1580223282323, "value": {"lang": "en"}},
{"topic": "test_topic", "timestamp": 1580223283123, "value": {"lang": "en"}}
],
"responses": [
{"admin": {"@type": "currentStatus"}},
{"admin": {"@type": "currentStatus"}},
{"query": [
{"header":{"schema":"`WSTART` STRING, `WEND` STRING, `LANG` STRING, `TWEET_COUNT` BIGINT"}},
{"row":{"columns":["2020-01-28 14:54:42 +0000", "2020-01-28 14:54:43 +0000", "en", 2]}},
{"row":{"columns":["2020-01-28 14:54:43 +0000", "2020-01-28 14:54:44 +0000", "en", 1]}}
]}
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.streams.materialization.Row;
import io.confluent.ksql.execution.streams.materialization.TableRow;
import io.confluent.ksql.execution.streams.materialization.Window;
import io.confluent.ksql.execution.streams.materialization.WindowedRow;
import io.confluent.ksql.execution.util.StructKeyUtil;
import io.confluent.ksql.execution.util.StructKeyUtil.KeyBuilder;
Expand All @@ -36,6 +35,8 @@
import io.confluent.ksql.util.SchemaUtil;
import java.time.Instant;
import java.util.List;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.junit.Test;

public class TableRowsEntityFactoryTest {
Expand Down Expand Up @@ -87,21 +88,19 @@ public void shouldAddNonWindowedRowToValues() {
public void shouldAddWindowedRowToValues() {
// Given:
final Instant now = Instant.now();
final Window window0 = Window.of(now, now.plusMillis(2));
final Window window1 = Window.of(now, now.plusMillis(1));
final TimeWindow window0 = new TimeWindow(now.toEpochMilli(), now.plusMillis(2).toEpochMilli());
final TimeWindow window1 = new TimeWindow(now.toEpochMilli(), now.plusMillis(1).toEpochMilli());

final List<? extends TableRow> input = ImmutableList.of(
WindowedRow.of(
SIMPLE_SCHEMA,
STRING_KEY_BUILDER.build("x"),
window0,
new Windowed<>(STRING_KEY_BUILDER.build("x"), window0),
genericRow(true),
ROWTIME
),
WindowedRow.of(
SIMPLE_SCHEMA,
STRING_KEY_BUILDER.build("y"),
window1,
new Windowed<>(STRING_KEY_BUILDER.build("y"), window1),
genericRow(false),
ROWTIME
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.rest.entity.HostStatusEntity;
import io.confluent.ksql.rest.entity.HostStoreLags;
import io.confluent.ksql.rest.entity.KsqlHostEntity;
import io.confluent.ksql.rest.entity.LagInfoEntity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.confluent.ksql.execution.plan.StreamAggregate;
import io.confluent.ksql.execution.plan.StreamWindowedAggregate;
import io.confluent.ksql.execution.streams.transform.KsTransformer;
import io.confluent.ksql.execution.transform.KsqlTransformer;
import io.confluent.ksql.execution.transform.window.WindowSelectMapper;
import io.confluent.ksql.execution.windows.HoppingWindowExpression;
import io.confluent.ksql.execution.windows.KsqlWindowExpression;
Expand Down Expand Up @@ -140,6 +141,7 @@ public static KTableHolder<Windowed<Struct>> build(
);
}

@SuppressWarnings({"rawtypes", "unchecked"})
static KTableHolder<Windowed<Struct>> build(
final KGroupedStreamHolder groupedStream,
final StreamWindowedAggregate aggregate,
Expand Down Expand Up @@ -191,6 +193,12 @@ static KTableHolder<Windowed<Struct>> build(
() -> new KsTransformer<>(windowSelectMapper.getTransformer()),
Named.as(StreamsUtil.buildOpName(AggregateBuilderUtils.windowSelectContext(aggregate)))
);

materializationBuilder.map(
pl -> (KsqlTransformer) windowSelectMapper.getTransformer(),
aggregateSchema,
AggregateBuilderUtils.windowSelectContext(aggregate)
);
}

return KTableHolder.materialized(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public MaterializedWindowedTable windowed() {
}

private Optional<GenericRow> filterAndTransform(
final Struct key,
final Object key,
final GenericRow value,
final long rowTime
) {
Expand Down Expand Up @@ -157,7 +157,7 @@ public List<WindowedRow> get(final Struct key, final Range<Instant> windowStart)
final Builder<WindowedRow> builder = ImmutableList.builder();

for (final WindowedRow row : result) {
filterAndTransform(key, row.value(), row.rowTime())
filterAndTransform(row.windowedKey(), row.value(), row.rowTime())
.ifPresent(v -> builder.add(row.withValue(v, schema())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,43 +24,40 @@
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.Windowed;

public final class WindowedRow implements TableRow {

private final LogicalSchema schema;
private final Window window;
private final Struct key;
private final Windowed<Struct> key;
private final GenericRow value;
private final long rowTime;
private final Validator validator;

public static WindowedRow of(
final LogicalSchema schema,
final Struct key,
final Window window,
final Windowed<Struct> key,
final GenericRow value,
final long rowTime
) {
return new WindowedRow(schema, key, window, value, rowTime, TableRowValidation::validate);
return new WindowedRow(schema, key, value, rowTime, TableRowValidation::validate);
}

@VisibleForTesting
WindowedRow(
final LogicalSchema schema,
final Struct key,
final Window window,
final Windowed<Struct> key,
final GenericRow value,
final long rowTime,
final Validator validator
) {
this.schema = requireNonNull(schema, "schema");
this.key = requireNonNull(key, "key");
this.window = requireNonNull(window, "window");
this.value = requireNonNull(value, "value");
this.rowTime = rowTime;
this.validator = requireNonNull(validator, "validator");

validator.validate(schema, key, value);
validator.validate(schema, key.key(), value);
}

@Override
Expand All @@ -75,12 +72,19 @@ public long rowTime() {

@Override
public Struct key() {
return key.key();
}

public Windowed<Struct> windowedKey() {
return key;
}

@Override
public Optional<Window> window() {
return Optional.of(window);
return Optional.of(Window.of(
key.window().startTime(),
key.window().endTime()
));
}

@Override
Expand All @@ -96,7 +100,6 @@ public WindowedRow withValue(
return new WindowedRow(
newSchema,
key,
window,
newValue,
rowTime,
validator
Expand All @@ -114,21 +117,19 @@ public boolean equals(final Object o) {
final WindowedRow that = (WindowedRow) o;
return Objects.equals(schema, that.schema)
&& Objects.equals(key, that.key)
&& Objects.equals(window, that.window)
&& Objects.equals(value, that.value)
&& Objects.equals(rowTime, that.rowTime);
}

@Override
public int hashCode() {
return Objects.hash(key, window, value, schema, rowTime);
return Objects.hash(key, value, schema, rowTime);
}

@Override
public String toString() {
return "WindowedRow{"
+ "key=" + key
+ ", window=" + window
+ ", value=" + value
+ ", rowTime=" + rowTime
+ ", schema=" + schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.streams.materialization.MaterializationException;
import io.confluent.ksql.execution.streams.materialization.MaterializedWindowedTable;
import io.confluent.ksql.execution.streams.materialization.Window;
import io.confluent.ksql.execution.streams.materialization.WindowedRow;
import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -73,16 +72,13 @@ private List<WindowedRow> findSession(

if (windowStart.contains(next.key.window().startTime())) {

final Window window = Window.of(
next.key.window().startTime(),
next.key.window().endTime()
);
final long rowTime = next.key.window().end();

final WindowedRow row = WindowedRow.of(
stateStore.schema(),
key, window,
next.key,
next.value,
next.key.window().end()
rowTime
);

builder.add(row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.streams.materialization.MaterializationException;
import io.confluent.ksql.execution.streams.materialization.MaterializedWindowedTable;
import io.confluent.ksql.execution.streams.materialization.Window;
import io.confluent.ksql.execution.streams.materialization.WindowedRow;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
Expand Down Expand Up @@ -76,12 +77,12 @@ public List<WindowedRow> get(

final Instant windowEnd = windowStart.plus(windowSize);

final Window window = Window.of(windowStart, windowEnd);
final TimeWindow window =
new TimeWindow(windowStart.toEpochMilli(), windowEnd.toEpochMilli());

final WindowedRow row = WindowedRow.of(
stateStore.schema(),
key,
window,
new Windowed<>(key, window),
next.value.value(),
next.value.timestamp()
);
Expand Down
Loading

0 comments on commit 8da2b63

Please sign in to comment.