Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: avoid spurious tombstones in table output #6405

Merged
merged 5 commits into from
Oct 12, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -619,8 +619,9 @@ public void shouldQueryMaterializedTableWithMultipleAggregationColumns() {
}

@Test
public void shouldIgnoreHavingClause() {
// Note: HAVING clause are handled centrally by KsqlMaterialization
public void shouldHandleHavingClause() {
// Note: HAVING clause are handled centrally by KsqlMaterialization. This logic will have been
// installed as part of building the below statement:
Comment on lines +622 to +624
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: test name and comments were misleading as the extra steps KsqlMaterialization adds to handle the HAVING clause are installed as part of this test.


// Given:
final PersistentQueryMetadata query = executeQuery(
Expand All @@ -632,7 +633,11 @@ public void shouldIgnoreHavingClause() {

final LogicalSchema schema = schema("COUNT", SqlTypes.BIGINT);

final Map<String, GenericRow> rows = waitForUniqueUserRows(STRING_DESERIALIZER, schema);
final int matches = (int) USER_DATA_PROVIDER.data().values().stream()
.filter(row -> ((Long) row.get(0)) > 2)
.count();

final Map<String, GenericRow> rows = waitForUniqueUserRows(matches, STRING_DESERIALIZER, schema);
Comment on lines +636 to +640
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of expected rows is now reduced as we no longer produce spurious tombstones.


// When:
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get();
Expand All @@ -641,16 +646,22 @@ public void shouldIgnoreHavingClause() {
final MaterializedTable table = materialization.nonWindowed();

rows.forEach((rowKey, value) -> {
// Rows passing the HAVING clause:
final Struct key = asKeyStruct(rowKey, query.getPhysicalSchema());

final Optional<Row> expected = Optional.ofNullable(value)
.map(v -> Row.of(schema, key, v, -1L));

final Optional<Row> row = withRetry(() -> table.get(key));
assertThat(row.map(Row::schema), is(expected.map(Row::schema)));
assertThat(row.map(Row::key), is(expected.map(Row::key)));
assertThat(row.map(Row::value), is(expected.map(Row::value)));
assertThat(row.map(Row::schema), is(Optional.of(schema)));
assertThat(row.map(Row::key), is(Optional.of(key)));
assertThat(row.map(Row::value), is(Optional.of(value)));
});

USER_DATA_PROVIDER.data().entries().stream()
.filter(e -> !rows.containsKey(e.getKey().getString("USERID")))
.forEach(e -> {
// Rows filtered by the HAVING clause:
final Optional<Row> row = withRetry(() -> table.get(e.getKey()));
assertThat(row, is(Optional.empty()));
});
Comment on lines +658 to +664
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get's against the table for filtered out rows should return nothing.

}

private static void verifyRetainedWindows(
Expand All @@ -677,10 +688,22 @@ private static void verifyRetainedWindows(
private <T> Map<T, GenericRow> waitForUniqueUserRows(
final Deserializer<T> keyDeserializer,
final LogicalSchema aggregateSchema
) {
return waitForUniqueUserRows(
USER_DATA_PROVIDER.data().size(),
keyDeserializer,
aggregateSchema
);
}

private <T> Map<T, GenericRow> waitForUniqueUserRows(
final int count,
final Deserializer<T> keyDeserializer,
final LogicalSchema aggregateSchema
) {
return TEST_HARNESS.verifyAvailableUniqueRows(
output.toUpperCase(),
USER_DATA_PROVIDER.data().size(),
count,
VALUE_FORMAT,
PhysicalSchema.from(aggregateSchema, SerdeFeatures.of(), SerdeFeatures.of()),
keyDeserializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ public KudafAggregator(

@Override
public GenericRow apply(final K k, final GenericRow rowValue, final GenericRow aggRowValue) {
final GenericRow result = GenericRow.fromList(aggRowValue.values());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka Streams does not expect the aggregator to mutate its parameters. The streams code is passing in the "old value", which ksqlDB was then mutating and returning as the "new value". This meant, when then function returned, the old and new values matched. This is obviously bad!

Code now takes a copy and mutates that. There is a perf hit, obviously, but it's unavoidable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand -- why did the old code work, in that case? Or did something change on the Streams side recently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old code works because we were never enabling the sending of old values. We now do, to avoid the spurious tombstones.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, still not understanding. What was being sent before, if not the old values? Was this method even being called, previously?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The processing nodes in the streams topology can optionally include the old/previous value, as well as the new/current value, to child nodes. This is not on by default. An upstream change to how table filters is handled means this is now turned on.

The streams code for aggregation looks something like:

V process(K key, Change<V> change) {
   // Get old value from store:
   final V oldAgg = store.get(key);

   // Undo any previous value:
  final T intermediateAgg = value.oldValue != null && oldAgg != null
    ? remove.apply(key, value.oldValue, oldAgg)
    : oldAgg;

   // Then add the new value
   final T newAgg;
   if (value.newValue != null) {
       final T initializedAgg = intermediateAgg == null
         ?  initializer.apply();
          : intermediateAgg;

       newAgg = add.apply(key, value.newValue, initializedAgg);
   } else {
       newAgg = intermediateAgg;
   }

   // update the store with the new value. & forard
   store.put(key, newAgg);
   tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
}

The two calls: remove.apply(key, value.oldValue, oldAgg) and add.apply(key, value.newValue, initializedAgg) are calling out to ksqlDB code. If these calls directly mutate the oldAgg or initializedAgg parameters passed, rather than creating copies, then the old and new values forwarded to child nodes will match. i.e. in tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null), the parameters newAgg and oldAgg will have the same updated value, rather than oldAgg holding the previous value. This breaks downstream processes, which expect the old and new value.

Previously the nodes weren't configured to send old values, so where just sending null for the old value and downstream could handle this correctly.


// copy over group-by and aggregate parameter columns into the output row
for (int idx = 0; idx < nonAggColumnCount; idx++) {
aggRowValue.set(idx, rowValue.get(idx));
result.set(idx, rowValue.get(idx));
}

// compute the aggregation and write it into the output row. Its assumed that
Expand All @@ -63,12 +65,12 @@ public GenericRow apply(final K k, final GenericRow rowValue, final GenericRow a
for (int idx = nonAggColumnCount; idx < columnCount; idx++) {
final KsqlAggregateFunction<Object, Object, Object> func = aggregateFunctionForColumn(idx);
final Object currentValue = rowValue.get(func.getArgIndexInValue());
final Object currentAggregate = aggRowValue.get(idx);
final Object currentAggregate = result.get(idx);
final Object newAggregate = func.aggregate(currentValue, currentAggregate);
aggRowValue.set(idx, newAggregate);
result.set(idx, newAggregate);
}

return aggRowValue;
return result;
}

public KsqlTransformer<K, GenericRow> getResultMapper() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,19 @@ public KudafUndoAggregator(
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public GenericRow apply(final Struct k, final GenericRow rowValue, final GenericRow aggRowValue) {
final GenericRow result = GenericRow.fromList(aggRowValue.values());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.


for (int idx = 0; idx < nonAggColumnCount; idx++) {
aggRowValue.set(idx, rowValue.get(idx));
result.set(idx, rowValue.get(idx));
}

for (int idx = nonAggColumnCount; idx < columnCount; idx++) {
final TableAggregationFunction function = aggregateFunctions.get(idx - nonAggColumnCount);
final Object argument = rowValue.get(function.getArgIndexInValue());
final Object previous = aggRowValue.get(idx);
aggRowValue.set(idx, function.undo(argument, previous));
final Object previous = result.get(idx);
result.set(idx, function.undo(argument, previous));
}

return aggRowValue;
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.execution.function.udaf;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.transform.KsqlProcessingContext;
import io.confluent.ksql.function.KsqlAggregateFunction;
import java.util.function.Function;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.Merger;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class KudafAggregatorTest {

@Mock
private KsqlAggregateFunction<Long, String, String> func1;
@Mock
private Struct key;
@Mock
private Merger<Struct, String> func1Merger;
@Mock
private Function<String, String> func1ResultMapper;
@Mock
private KsqlProcessingContext ctx;
private KudafAggregator<String> aggregator;

@Before
public void setUp() {
aggregator = new KudafAggregator<>(2, ImmutableList.of(func1));

when(func1.getMerger()).thenReturn(func1Merger);
when(func1.getResultMapper()).thenReturn(func1ResultMapper);

when(func1.aggregate(any(), any())).thenReturn("func1-result");
when(func1Merger.apply(any(), any(), any())).thenReturn("func1-merged");
when(func1ResultMapper.apply(any())).thenReturn("func1-result");
}

@Test
public void shouldNotMutateParametersOnApply() {
// Given:
final GenericRow value = GenericRow.genericRow(1, 2L);
final GenericRow agg = GenericRow.genericRow(1, 2L, 3);

// When:
final GenericRow result = aggregator.apply("key", value, agg);

// Then:
assertThat(value, is(GenericRow.genericRow(1, 2L)));
assertThat(agg, is(GenericRow.genericRow(1, 2L, 3)));
assertThat("invalid test", result, is(not(GenericRow.genericRow(1, 2L, 3))));
}

@Test
public void shouldNotMutateParametersOnMerge() {
// Given:
final GenericRow aggOne = GenericRow.genericRow(1, 2L, 4);
final GenericRow aggTwo = GenericRow.genericRow(1, 2L, 3);

// When:
final GenericRow result = aggregator.getMerger().apply(key, aggOne, aggTwo);

// Then:
assertThat(aggOne, is(GenericRow.genericRow(1, 2L, 4)));
assertThat(aggTwo, is(GenericRow.genericRow(1, 2L, 3)));
assertThat("invalid test", result, is(not(GenericRow.genericRow(1, 2L, 4))));
assertThat("invalid test", result, is(not(GenericRow.genericRow(1, 2L, 3))));
}

@Test
public void shouldNotMutateParametersOnResultsMap() {
// Given:
final GenericRow agg = GenericRow.genericRow(1, 2L, 4);

// When:
final GenericRow result = aggregator.getResultMapper().transform("k", agg, ctx);

// Then:
assertThat(agg, is(GenericRow.genericRow(1, 2L, 4)));
assertThat("invalid test", result, is(not(GenericRow.genericRow(1, 2L, 4))));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.execution.function.udaf;

import static io.confluent.ksql.GenericRow.genericRow;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.function.TableAggregationFunction;
import org.apache.kafka.connect.data.Struct;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class KudafUndoAggregatorTest {

@Mock
private TableAggregationFunction<Long, String, String> func1;
@Mock
private Struct key;
private KudafUndoAggregator aggregator;

@Before
public void setUp() {
aggregator = new KudafUndoAggregator(2, ImmutableList.of(func1));

when(func1.undo(any(), any())).thenReturn("func1-undone");
}

@Test
public void shouldNotMutateParametersOnApply() {
// Given:
final GenericRow value = GenericRow.genericRow(1, 2L);
final GenericRow agg = GenericRow.genericRow(1, 2L, 3);

// When:
final GenericRow result = aggregator.apply(key, value, agg);

// Then:
assertThat(value, is(GenericRow.genericRow(1, 2L)));
assertThat(agg, is(GenericRow.genericRow(1, 2L, 3)));
assertThat("invalid test", result, is(not(GenericRow.genericRow(1, 2L, 3))));
}

@Test
public void shouldApplyUndoableAggregateFunctions() {
// Given:
final GenericRow value = genericRow(1, 2L);
final GenericRow aggRow = genericRow(1, 2L, 3);

// When:
final GenericRow resultRow = aggregator.apply(key, value, aggRow);

// Then:
assertThat(resultRow, equalTo(genericRow(1, 2L, "func1-undone")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,6 @@
"value" : {
"SUM" : 10
}
}, {
"topic" : "OUTPUT",
"key" : "1|+|20",
"value" : null
}, {
"topic" : "OUTPUT",
"key" : "0|+|10",
Expand Down
Loading