Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: avoid spurious tombstones in table output #6405

Merged
merged 5 commits into from
Oct 12, 2020

Conversation

big-andy-coates
Copy link
Contributor

@big-andy-coates big-andy-coates commented Oct 12, 2020

Description

fixes: #3558

AK commit apache/kafka#9156 enhances Kafka Streams so that filters on tables now avoid emitting spurious tombstones. ksqlDB now benefits from this. Tombstones are no longer emitted to the sink topic when a HAVING clause excludes a row from the result that has never been in the result table.

BREAKING CHANGE: This change fixes a bug where unnecessary tombstones where being emitted when a HAVING clause filtered out a row from the source that is not in the output table

For example, given:

-- source stream:
CREATE STREAM FOO (ID INT KEY, VAL INT) WITH (...);

-- aggregate into a table:
CREATE TABLE BAR AS
    SELECT ID, SUM(VAL) AS SUM
    FROM FOO
    GROUP BY ID
    HAVING SUM(VAL) > 0;


-- insert some values into the stream:
INSERT INTO FOO VALUES(1, -5); 
INSERT INTO FOO VALUES(1, 6); 
INSERT INTO FOO VALUES(1, -2); 
INSERT INTO FOO VALUES(1, -1); 

Where previously the contents of the sink topic BAR would have contained records:

Key Value Notes
1. null. Spurious tombstone: the table does not contain a row with key 1, so no tombstone is required.
1. {sum=1} Row added as HAVING criteria now met
1. null. Row deleted as HAVING criteria now not met
1. null. Spurious tombstone: the table does not contain a row with key 1, so no tombstone is required.

Note: the first record in the tom

The topic will now contain:

Key Value
1. {sum=1}
1. null.

Note: two historical tests are currently disabled. These need an upstream fix. See #6406 for details.

Testing done

usual

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

AK commit apache/kafka#9156 avoids filters emitting spurious tombstones. This means the sink topic now only receives the records for the two rows that pass the filter, not the other three rows. Hence the `waitForUniqueUserRows` call now only waits for the two records to be produced before running the test.

Additionally, the name of the test was actually misleading as the logic in `KsqlMaterialization` to filter any records not passing the HAVING clause is actually installed as part of running the SQL in the test case, so those records are filtered from any pull request anyway.
Copy link
Contributor

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the fix!

@big-andy-coates
Copy link
Contributor Author

More needed. Doing it now...

@big-andy-coates big-andy-coates changed the title chore: fix master build feat: avoid spurious tombstones in table output Oct 12, 2020
Comment on lines +622 to +624
public void shouldHandleHavingClause() {
// Note: HAVING clause are handled centrally by KsqlMaterialization. This logic will have been
// installed as part of building the below statement:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: test name and comments were misleading as the extra steps KsqlMaterialization adds to handle the HAVING clause are installed as part of this test.

Comment on lines +636 to +640
final int matches = (int) USER_DATA_PROVIDER.data().values().stream()
.filter(row -> ((Long) row.get(0)) > 2)
.count();

final Map<String, GenericRow> rows = waitForUniqueUserRows(matches, STRING_DESERIALIZER, schema);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of expected rows is now reduced as we no longer produce spurious tombstones.

Comment on lines +658 to +664
USER_DATA_PROVIDER.data().entries().stream()
.filter(e -> !rows.containsKey(e.getKey().getString("USERID")))
.forEach(e -> {
// Rows filtered by the HAVING clause:
final Optional<Row> row = withRetry(() -> table.get(e.getKey()));
assertThat(row, is(Optional.empty()));
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get's against the table for filtered out rows should return nothing.

@@ -52,9 +52,11 @@ public KudafAggregator(

@Override
public GenericRow apply(final K k, final GenericRow rowValue, final GenericRow aggRowValue) {
final GenericRow result = GenericRow.fromList(aggRowValue.values());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka Streams does not expect the aggregator to mutate its parameters. The streams code is passing in the "old value", which ksqlDB was then mutating and returning as the "new value". This meant, when then function returned, the old and new values matched. This is obviously bad!

Code now takes a copy and mutates that. There is a perf hit, obviously, but it's unavoidable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand -- why did the old code work, in that case? Or did something change on the Streams side recently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old code works because we were never enabling the sending of old values. We now do, to avoid the spurious tombstones.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, still not understanding. What was being sent before, if not the old values? Was this method even being called, previously?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The processing nodes in the streams topology can optionally include the old/previous value, as well as the new/current value, to child nodes. This is not on by default. An upstream change to how table filters is handled means this is now turned on.

The streams code for aggregation looks something like:

V process(K key, Change<V> change) {
   // Get old value from store:
   final V oldAgg = store.get(key);

   // Undo any previous value:
  final T intermediateAgg = value.oldValue != null && oldAgg != null
    ? remove.apply(key, value.oldValue, oldAgg)
    : oldAgg;

   // Then add the new value
   final T newAgg;
   if (value.newValue != null) {
       final T initializedAgg = intermediateAgg == null
         ?  initializer.apply();
          : intermediateAgg;

       newAgg = add.apply(key, value.newValue, initializedAgg);
   } else {
       newAgg = intermediateAgg;
   }

   // update the store with the new value. & forard
   store.put(key, newAgg);
   tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
}

The two calls: remove.apply(key, value.oldValue, oldAgg) and add.apply(key, value.newValue, initializedAgg) are calling out to ksqlDB code. If these calls directly mutate the oldAgg or initializedAgg parameters passed, rather than creating copies, then the old and new values forwarded to child nodes will match. i.e. in tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null), the parameters newAgg and oldAgg will have the same updated value, rather than oldAgg holding the previous value. This breaks downstream processes, which expect the old and new value.

Previously the nodes weren't configured to send old values, so where just sending null for the old value and downstream could handle this correctly.

@@ -51,17 +51,19 @@ public KudafUndoAggregator(
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public GenericRow apply(final Struct k, final GenericRow rowValue, final GenericRow aggRowValue) {
final GenericRow result = GenericRow.fromList(aggRowValue.values());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

@big-andy-coates big-andy-coates requested a review from a team October 12, 2020 13:45
Copy link
Contributor

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM besides the question inline, and the test failures in the build. Thanks!

@@ -52,9 +52,11 @@ public KudafAggregator(

@Override
public GenericRow apply(final K k, final GenericRow rowValue, final GenericRow aggRowValue) {
final GenericRow result = GenericRow.fromList(aggRowValue.values());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand -- why did the old code work, in that case? Or did something change on the Streams side recently?

@big-andy-coates big-andy-coates merged commit 4c7c9b5 into confluentinc:master Oct 12, 2020
@big-andy-coates big-andy-coates deleted the ifx_master branch October 12, 2020 19:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Avoid output null values when HAVING clauses not met
2 participants