-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid output null
values when HAVING clauses not met
#3558
Comments
@big-andy-coates Any work around for this? I keep see the ERROR log in KSQL: |
Hi @vector1983, I'm not sure that's the same issue! There's no work around at the moment that I'm aware of. |
+1 to fix this |
Hey @big-andy-coates , do you mind creating an AK Jira ticket to track this:
Thanks! |
Added https://issues.apache.org/jira/browse/KAFKA-10077 to track KS work. |
Thanks! |
This will also affect users with
|
Adding needs-triage label to discuss if we should include in either 0.12.0/0.13.0 |
Upstream Kafka Streams work for this is now complete: apache/kafka#9156 |
fixes: fixes: confluentinc#3558
* feat: avoid spurious tombstones in table output fixes: #3558 AK commit apache/kafka#9156 enhances Kafka Streams so that filters on tables now avoid emitting spurious tombstones. ksqlDB now benefits from this. Tombstones are no longer emitted to the sink topic when a HAVING clause excludes a row from the result _that has never been in the result table_. BREAKING CHANGE: This change fixes a _bug_ where unnecessary tombstones where being emitted when a `HAVING` clause filtered out a row from the source that is not in the output table For example, given: ```sql -- source stream: CREATE STREAM FOO (ID INT KEY, VAL INT) WITH (...); -- aggregate into a table: CREATE TABLE BAR AS SELECT ID, SUM(VAL) AS SUM FROM FOO GROUP BY ID HAVING SUM(VAL) > 0; -- insert some values into the stream: INSERT INTO FOO VALUES(1, -5); INSERT INTO FOO VALUES(1, 6); INSERT INTO FOO VALUES(1, -2); INSERT INTO FOO VALUES(1, -1); ``` Where previously the contents of the sink topic `BAR` would have contained records: | Key | Value | Notes | |-----|-------|------| | 1. | null. | Spurious tombstone: the table does not contain a row with key `1`, so no tombstone is required. | | 1. | {sum=1} | Row added as HAVING criteria now met | | 1. | null. | Row deleted as HAVING criteria now not met | | 1. | null. | Spurious tombstone: the table does not contain a row with key `1`, so no tombstone is required. | Note: the first record in the tom The topic will now contain: | Key | Value | |-----|-------| | 1. | {sum=1} | | 1. | null. | Co-authored-by: Andy Coates <[email protected]>
e.g. given this test case:
Notice the
null
value output for the first input row. This is because the sum of the aggregate after the first row isn't high enough to pass the HAVING criteria.The issue here is that the HAVING criteria is implemented using a
KTable::filter
call, which is stateless, and hence does not know there hasn't been a previous value that did pass the filter, and so it must emit a tombstone to clear any previous output row.The fix is actually a fix in KS, which is to turn on
KTableImpl:: enableSendingOldValues
for any filter that is downstream of a statestore. This will mean the filter receives a CDC event, (old, new), rather than just (new). Meaning it can be more intelligent and only send the tombstone if the change transitions from passing the filter for the old value to not passing for the new.The text was updated successfully, but these errors were encountered: