-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: avoid spurious tombstones in table output #6405
feat: avoid spurious tombstones in table output #6405
Conversation
AK commit apache/kafka#9156 avoids filters emitting spurious tombstones. This means the sink topic now only receives the records for the two rows that pass the filter, not the other three rows. Hence the `waitForUniqueUserRows` call now only waits for the two records to be produced before running the test. Additionally, the name of the test was actually misleading as the logic in `KsqlMaterialization` to filter any records not passing the HAVING clause is actually installed as part of running the SQL in the test case, so those records are filtered from any pull request anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the fix!
More needed. Doing it now... |
public void shouldHandleHavingClause() { | ||
// Note: HAVING clause are handled centrally by KsqlMaterialization. This logic will have been | ||
// installed as part of building the below statement: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: test name and comments were misleading as the extra steps KsqlMaterialization
adds to handle the HAVING clause are installed as part of this test.
final int matches = (int) USER_DATA_PROVIDER.data().values().stream() | ||
.filter(row -> ((Long) row.get(0)) > 2) | ||
.count(); | ||
|
||
final Map<String, GenericRow> rows = waitForUniqueUserRows(matches, STRING_DESERIALIZER, schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The number of expected rows is now reduced as we no longer produce spurious tombstones.
USER_DATA_PROVIDER.data().entries().stream() | ||
.filter(e -> !rows.containsKey(e.getKey().getString("USERID"))) | ||
.forEach(e -> { | ||
// Rows filtered by the HAVING clause: | ||
final Optional<Row> row = withRetry(() -> table.get(e.getKey())); | ||
assertThat(row, is(Optional.empty())); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get's against the table for filtered out rows should return nothing.
@@ -52,9 +52,11 @@ public KudafAggregator( | |||
|
|||
@Override | |||
public GenericRow apply(final K k, final GenericRow rowValue, final GenericRow aggRowValue) { | |||
final GenericRow result = GenericRow.fromList(aggRowValue.values()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kafka Streams does not expect the aggregator to mutate its parameters. The streams code is passing in the "old value", which ksqlDB was then mutating and returning as the "new value". This meant, when then function returned, the old and new values matched. This is obviously bad!
Code now takes a copy and mutates that. There is a perf hit, obviously, but it's unavoidable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand -- why did the old code work, in that case? Or did something change on the Streams side recently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old code works because we were never enabling the sending of old values. We now do, to avoid the spurious tombstones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, still not understanding. What was being sent before, if not the old values? Was this method even being called, previously?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The processing nodes in the streams topology can optionally include the old/previous value, as well as the new/current value, to child nodes. This is not on by default. An upstream change to how table filters is handled means this is now turned on.
The streams code for aggregation looks something like:
V process(K key, Change<V> change) {
// Get old value from store:
final V oldAgg = store.get(key);
// Undo any previous value:
final T intermediateAgg = value.oldValue != null && oldAgg != null
? remove.apply(key, value.oldValue, oldAgg)
: oldAgg;
// Then add the new value
final T newAgg;
if (value.newValue != null) {
final T initializedAgg = intermediateAgg == null
? initializer.apply();
: intermediateAgg;
newAgg = add.apply(key, value.newValue, initializedAgg);
} else {
newAgg = intermediateAgg;
}
// update the store with the new value. & forard
store.put(key, newAgg);
tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
}
The two calls: remove.apply(key, value.oldValue, oldAgg)
and add.apply(key, value.newValue, initializedAgg)
are calling out to ksqlDB code. If these calls directly mutate the oldAgg
or initializedAgg
parameters passed, rather than creating copies, then the old and new values forwarded to child nodes will match. i.e. in tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null)
, the parameters newAgg
and oldAgg
will have the same updated value, rather than oldAgg
holding the previous value. This breaks downstream processes, which expect the old and new value.
Previously the nodes weren't configured to send old values, so where just sending null
for the old value and downstream could handle this correctly.
@@ -51,17 +51,19 @@ public KudafUndoAggregator( | |||
@SuppressWarnings({"unchecked", "rawtypes"}) | |||
@Override | |||
public GenericRow apply(final Struct k, final GenericRow rowValue, final GenericRow aggRowValue) { | |||
final GenericRow result = GenericRow.fromList(aggRowValue.values()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM besides the question inline, and the test failures in the build. Thanks!
@@ -52,9 +52,11 @@ public KudafAggregator( | |||
|
|||
@Override | |||
public GenericRow apply(final K k, final GenericRow rowValue, final GenericRow aggRowValue) { | |||
final GenericRow result = GenericRow.fromList(aggRowValue.values()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand -- why did the old code work, in that case? Or did something change on the Streams side recently?
Description
fixes: #3558
AK commit apache/kafka#9156 enhances Kafka Streams so that filters on tables now avoid emitting spurious tombstones. ksqlDB now benefits from this. Tombstones are no longer emitted to the sink topic when a HAVING clause excludes a row from the result that has never been in the result table.
BREAKING CHANGE: This change fixes a bug where unnecessary tombstones where being emitted when a
HAVING
clause filtered out a row from the source that is not in the output tableFor example, given:
Where previously the contents of the sink topic
BAR
would have contained records:1
, so no tombstone is required.1
, so no tombstone is required.Note: the first record in the tom
The topic will now contain:
Note: two historical tests are currently disabled. These need an upstream fix. See #6406 for details.
Testing done
usual
Reviewer checklist