Skip to content

Commit

Permalink
feat: remove WindowStart() and WindowEnd() UDAFs (#4459)
Browse files Browse the repository at this point in the history
* feat: remove WindowStart() and WindowEnd() UDAFs

These two UDAFs were introduced to allow access to the start and end times of the window in a windowed source. `WINDOWSTART` and `WINDOWEND` are now accessible as columns to be used in the SELECT of a query, (outside of UDAFs).  This makes the two UDAFs redundant.

BREAKING CHANGE: The `WindowStart()` and `WindowEnd()` UDAFs have been removed from KSQL. Use the `WindowStart` and `WindowEnd` system columns to access the window bounds within the SELECT expression instead.
  • Loading branch information
big-andy-coates authored Feb 7, 2020
1 parent f8bb986 commit eda2e34
Show file tree
Hide file tree
Showing 40 changed files with 189 additions and 765 deletions.
4 changes: 2 additions & 2 deletions docs-md/concepts/time-and-windows-in-ksqldb-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ boundaries are named *windows*.
![Diagram showing the relationship between records and time in a ksqlDB stream](../img/ksql-window.png)

A window has a start time and an end time, which you access in your
queries by using the WINDOWSTART() and WINDOWEND() functions.
queries by using the WINDOWSTART and WINDOWEND system columns.

!!! important
ksqlDB is based on the Unix epoch time in the UTC timezone, and this can
Expand Down Expand Up @@ -339,7 +339,7 @@ session window to have zero records.

If a session window contains exactly one record, the record's ROWTIME
timestamp is identical to the window's own start and end times. Access
these by using the WINDOWSTART() and WINDOWEND() functions.
these by using the WINDOWSTART and WINDOWEND system columns.

If a session window contains two or more records, then the
earliest/oldest record's ROWTIME timestamp is identical to the
Expand Down
18 changes: 0 additions & 18 deletions docs-md/developer-guide/ksqldb-reference/aggregate-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,23 +170,5 @@ Stream
Return the distinct Top *K* values for the given column and window
Note: rows where `col1` is null will be ignored.

WindowStart
-----------

`WindowStart()`

Stream, Table

Extract the start time of the current window, in milliseconds.
If the query is not windowed the function will return null.

WindowEnd
---------

`WindowEnd()`

Extract the end time of the current window, in milliseconds.
If the query is not windowed the function will return null.


Page last revised on: {{ git_revision_date }}
4 changes: 2 additions & 2 deletions docs-md/tutorials/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ current session window into fields within output rows.
```sql
CREATE TABLE pageviews_per_region_per_session AS
SELECT regionid,
windowStart(),
windowEnd(),
windowStart AS WSTART,
windowEnd AS WEND,
count(*)
FROM pageviews_enriched
WINDOW SESSION (60 SECONDS)
Expand Down
4 changes: 2 additions & 2 deletions docs/concepts/time-and-windows-in-ksql-queries.rst
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ are named *windows*.
:alt: Diagram showing the relationship between records and time in a KSQL stream

A window has a start time and an end time, which you access in your queries by
using the WINDOWSTART() and WINDOWEND() functions.
using the WINDOWSTART and WINDOWEND system columns.

.. important::

Expand Down Expand Up @@ -339,7 +339,7 @@ window to have zero records.

If a session window contains exactly one record, the record's ROWTIME timestamp
is identical to the window's own start and end times. Access these by using the
WINDOWSTART() and WINDOWEND() functions.
WINDOWSTART and WINDOWEND system columns.

If a session window contains two or more records, then the earliest/oldest
record's ROWTIME timestamp is identical to the window's start time, and the
Expand Down
8 changes: 1 addition & 7 deletions docs/developer-guide/syntax-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,7 @@ Pulls the current value from the materialized table and terminate.
The result of this statement will not be persisted in a Kafka topic and will only be printed out in
the console.

The WHERE clause must contain a single value of ``ROWKEY`` to retieve and may optionally include
The WHERE clause must contain a single value of ``ROWKEY`` to retrieve and may optionally include
bounds on WINDOWSTART if the materialized table is windowed.

Example:
Expand Down Expand Up @@ -2157,12 +2157,6 @@ Aggregate functions
| TOPKDISTINCT | ``TOPKDISTINCT(col1, k)`` | Stream | Return the distinct Top *K* values for the given column and window |
| | | | Note: rows where ``col1`` is null will be ignored. |
+------------------------+---------------------------+------------+---------------------------------------------------------------------+
| WindowStart | ``WindowStart()`` | Stream | Extract the start time of the current window, in milliseconds. |
| | | Table | If the query is not windowed the function will return null. |
+------------------------+---------------------------+------------+---------------------------------------------------------------------+
| WindowEnd | ``WindowEnd()`` | Stream | Extract the end time of the current window, in milliseconds. |
| | | Table | If the query is not windowed the function will return null. |
+------------------------+---------------------------+------------+---------------------------------------------------------------------+

For more information, see :ref:`aggregate-streaming-data-with-ksql`.

Expand Down
4 changes: 2 additions & 2 deletions docs/tutorials/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ end time of the current session window into fields within output rows.
CREATE TABLE pageviews_per_region_per_session AS
SELECT regionid,
windowStart(),
windowEnd(),
windowStart AS WSTART,
windowEnd AS WEND,
count(*)
FROM pageviews_enriched
WINDOW SESSION (60 SECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private void throwOnWindowBoundColumnIfWindowedAggregate(final ColumnReferenceEx
if (SchemaUtil.isWindowBound(node.getReference())) {
throw new KsqlException(
"Window bounds column " + node + " can only be used in the SELECT clause of "
+ "windowed aggregations."
+ "windowed aggregations and can not be passed to aggregate functions."
+ System.lineSeparator()
+ "See https://github.com/confluentinc/ksql/issues/4397"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
import io.confluent.ksql.analyzer.Analysis.AliasedDataSource;
Expand Down Expand Up @@ -213,16 +214,13 @@ private static void enforceAggregateRules(
"GROUP BY requires columns using aggregate functions in SELECT clause.");
}

final Set<Expression> groupByExprs = ImmutableSet.copyOf(analysis.getGroupByExpressions());
final Set<Expression> groupByExprs = getGroupByExpressions(analysis);

final List<String> unmatchedSelects = aggregateAnalysis.getNonAggregateSelectExpressions()
.entrySet()
.stream()
// Remove any that exactly match a group by expression:
.filter(e -> !groupByExprs.contains(e.getKey()))
// Remove any window bounds expressions, which are implicit:
.filter(e -> !(e.getKey() instanceof ColumnReferenceExp
&& SchemaUtil.isWindowBound(((ColumnReferenceExp) e.getKey()).getReference())))
// Remove any that are constants,
// or expressions where all params exactly match a group by expression:
.filter(e -> !Sets.difference(e.getValue(), groupByExprs).isEmpty())
Expand Down Expand Up @@ -253,4 +251,23 @@ private static void enforceAggregateRules(
"Non-aggregate HAVING expression not part of GROUP BY: " + havingOnly);
}
}

private static Set<Expression> getGroupByExpressions(final Analysis analysis) {
if (!analysis.getWindowExpression().isPresent()) {
return ImmutableSet.copyOf(analysis.getGroupByExpressions());
}

// Add in window bounds columns as implicit group by columns:
final AliasedDataSource source = Iterables.getOnlyElement(analysis.getFromDataSources());

final Set<QualifiedColumnReferenceExp> windowBoundColumnRefs =
SchemaUtil.windowBoundsColumnNames().stream()
.map(cn -> new QualifiedColumnReferenceExp(source.getAlias(), cn))
.collect(Collectors.toSet());

return ImmutableSet.<Expression>builder()
.addAll(analysis.getGroupByExpressions())
.addAll(windowBoundColumnRefs)
.build();
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,6 @@ List<SelectExpression> getAggArgExpansionList() {
}

private Expression resolveToInternal(final Expression exp) {
if (isWindowBound(exp)) {
return exp;
}

final ColumnName name = expressionToInternalColumnName.get(exp.toString());
if (name != null) {
return new UnqualifiedColumnReferenceExp(
Expand All @@ -374,15 +370,6 @@ private Expression resolveToInternal(final Expression exp) {
return ExpressionTreeRewriter.rewriteWith(new ResolveToInternalRewriter()::process, exp);
}

private static boolean isWindowBound(final Expression exp) {
if (!(exp instanceof ColumnReferenceExp)) {
return false;
}

final ColumnReferenceExp column = (ColumnReferenceExp)exp;
return SchemaUtil.isWindowBound(column.getReference());
}

private final class ResolveToInternalRewriter
extends VisitParentExpressionVisitor<Optional<Expression>, Context<Void>> {
private ResolveToInternalRewriter() {
Expand All @@ -404,9 +391,15 @@ public Optional<Expression> visitColumnReference(
}

final boolean isAggregate = node.getReference().isAggregate();
final boolean windowBounds = SchemaUtil.isWindowBound(node.getReference());

if (isAggregate && windowBounds) {
throw new KsqlException("Window bound " + node + " is not available as a parameter "
+ "to aggregate functions");
}

if (!isAggregate) {
throw new KsqlException("Unknown source column: " + node.toString());
if (!isAggregate && !windowBounds) {
throw new KsqlException("Unknown source column: " + node);
}

return Optional.of(node);
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit eda2e34

Please sign in to comment.