-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: avoid spurious tombstones in table output #6405
Changes from 3 commits
a10e5eb
cc7f1da
fcee2d4
8bc96c1
64480bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -619,8 +619,9 @@ public void shouldQueryMaterializedTableWithMultipleAggregationColumns() { | |
} | ||
|
||
@Test | ||
public void shouldIgnoreHavingClause() { | ||
// Note: HAVING clause are handled centrally by KsqlMaterialization | ||
public void shouldHandleHavingClause() { | ||
// Note: HAVING clause are handled centrally by KsqlMaterialization. This logic will have been | ||
// installed as part of building the below statement: | ||
|
||
// Given: | ||
final PersistentQueryMetadata query = executeQuery( | ||
|
@@ -632,7 +633,11 @@ public void shouldIgnoreHavingClause() { | |
|
||
final LogicalSchema schema = schema("COUNT", SqlTypes.BIGINT); | ||
|
||
final Map<String, GenericRow> rows = waitForUniqueUserRows(STRING_DESERIALIZER, schema); | ||
final int matches = (int) USER_DATA_PROVIDER.data().values().stream() | ||
.filter(row -> ((Long) row.get(0)) > 2) | ||
.count(); | ||
|
||
final Map<String, GenericRow> rows = waitForUniqueUserRows(matches, STRING_DESERIALIZER, schema); | ||
Comment on lines
+636
to
+640
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The number of expected rows is now reduced as we no longer produce spurious tombstones. |
||
|
||
// When: | ||
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get(); | ||
|
@@ -641,16 +646,22 @@ public void shouldIgnoreHavingClause() { | |
final MaterializedTable table = materialization.nonWindowed(); | ||
|
||
rows.forEach((rowKey, value) -> { | ||
// Rows passing the HAVING clause: | ||
final Struct key = asKeyStruct(rowKey, query.getPhysicalSchema()); | ||
|
||
final Optional<Row> expected = Optional.ofNullable(value) | ||
.map(v -> Row.of(schema, key, v, -1L)); | ||
|
||
final Optional<Row> row = withRetry(() -> table.get(key)); | ||
assertThat(row.map(Row::schema), is(expected.map(Row::schema))); | ||
assertThat(row.map(Row::key), is(expected.map(Row::key))); | ||
assertThat(row.map(Row::value), is(expected.map(Row::value))); | ||
assertThat(row.map(Row::schema), is(Optional.of(schema))); | ||
assertThat(row.map(Row::key), is(Optional.of(key))); | ||
assertThat(row.map(Row::value), is(Optional.of(value))); | ||
}); | ||
|
||
USER_DATA_PROVIDER.data().entries().stream() | ||
.filter(e -> !rows.containsKey(e.getKey().getString("USERID"))) | ||
.forEach(e -> { | ||
// Rows filtered by the HAVING clause: | ||
final Optional<Row> row = withRetry(() -> table.get(e.getKey())); | ||
assertThat(row, is(Optional.empty())); | ||
}); | ||
Comment on lines
+658
to
+664
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Get's against the table for filtered out rows should return nothing. |
||
} | ||
|
||
private static void verifyRetainedWindows( | ||
|
@@ -677,10 +688,22 @@ private static void verifyRetainedWindows( | |
private <T> Map<T, GenericRow> waitForUniqueUserRows( | ||
final Deserializer<T> keyDeserializer, | ||
final LogicalSchema aggregateSchema | ||
) { | ||
return waitForUniqueUserRows( | ||
USER_DATA_PROVIDER.data().size(), | ||
keyDeserializer, | ||
aggregateSchema | ||
); | ||
} | ||
|
||
private <T> Map<T, GenericRow> waitForUniqueUserRows( | ||
final int count, | ||
final Deserializer<T> keyDeserializer, | ||
final LogicalSchema aggregateSchema | ||
) { | ||
return TEST_HARNESS.verifyAvailableUniqueRows( | ||
output.toUpperCase(), | ||
USER_DATA_PROVIDER.data().size(), | ||
count, | ||
VALUE_FORMAT, | ||
PhysicalSchema.from(aggregateSchema, SerdeFeatures.of(), SerdeFeatures.of()), | ||
keyDeserializer | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,9 +52,11 @@ public KudafAggregator( | |
|
||
@Override | ||
public GenericRow apply(final K k, final GenericRow rowValue, final GenericRow aggRowValue) { | ||
final GenericRow result = GenericRow.fromList(aggRowValue.values()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Kafka Streams does not expect the aggregator to mutate its parameters. The streams code is passing in the "old value", which ksqlDB was then mutating and returning as the "new value". This meant, when then function returned, the old and new values matched. This is obviously bad! Code now takes a copy and mutates that. There is a perf hit, obviously, but it's unavoidable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure I understand -- why did the old code work, in that case? Or did something change on the Streams side recently? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The old code works because we were never enabling the sending of old values. We now do, to avoid the spurious tombstones. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, still not understanding. What was being sent before, if not the old values? Was this method even being called, previously? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The processing nodes in the streams topology can optionally include the old/previous value, as well as the new/current value, to child nodes. This is not on by default. An upstream change to how table filters is handled means this is now turned on. The streams code for aggregation looks something like:
The two calls: Previously the nodes weren't configured to send old values, so where just sending |
||
|
||
// copy over group-by and aggregate parameter columns into the output row | ||
for (int idx = 0; idx < nonAggColumnCount; idx++) { | ||
aggRowValue.set(idx, rowValue.get(idx)); | ||
result.set(idx, rowValue.get(idx)); | ||
} | ||
|
||
// compute the aggregation and write it into the output row. Its assumed that | ||
|
@@ -63,12 +65,12 @@ public GenericRow apply(final K k, final GenericRow rowValue, final GenericRow a | |
for (int idx = nonAggColumnCount; idx < columnCount; idx++) { | ||
final KsqlAggregateFunction<Object, Object, Object> func = aggregateFunctionForColumn(idx); | ||
final Object currentValue = rowValue.get(func.getArgIndexInValue()); | ||
final Object currentAggregate = aggRowValue.get(idx); | ||
final Object currentAggregate = result.get(idx); | ||
final Object newAggregate = func.aggregate(currentValue, currentAggregate); | ||
aggRowValue.set(idx, newAggregate); | ||
result.set(idx, newAggregate); | ||
} | ||
|
||
return aggRowValue; | ||
return result; | ||
} | ||
|
||
public KsqlTransformer<K, GenericRow> getResultMapper() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,17 +51,19 @@ public KudafUndoAggregator( | |
@SuppressWarnings({"unchecked", "rawtypes"}) | ||
@Override | ||
public GenericRow apply(final Struct k, final GenericRow rowValue, final GenericRow aggRowValue) { | ||
final GenericRow result = GenericRow.fromList(aggRowValue.values()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As above. |
||
|
||
for (int idx = 0; idx < nonAggColumnCount; idx++) { | ||
aggRowValue.set(idx, rowValue.get(idx)); | ||
result.set(idx, rowValue.get(idx)); | ||
} | ||
|
||
for (int idx = nonAggColumnCount; idx < columnCount; idx++) { | ||
final TableAggregationFunction function = aggregateFunctions.get(idx - nonAggColumnCount); | ||
final Object argument = rowValue.get(function.getArgIndexInValue()); | ||
final Object previous = aggRowValue.get(idx); | ||
aggRowValue.set(idx, function.undo(argument, previous)); | ||
final Object previous = result.get(idx); | ||
result.set(idx, function.undo(argument, previous)); | ||
} | ||
|
||
return aggRowValue; | ||
return result; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
/* | ||
* Copyright 2020 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"); you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.execution.function.udaf; | ||
|
||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.is; | ||
import static org.hamcrest.Matchers.not; | ||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.Mockito.when; | ||
|
||
import com.google.common.collect.ImmutableList; | ||
import io.confluent.ksql.GenericRow; | ||
import io.confluent.ksql.execution.transform.KsqlProcessingContext; | ||
import io.confluent.ksql.function.KsqlAggregateFunction; | ||
import java.util.function.Function; | ||
import org.apache.kafka.connect.data.Struct; | ||
import org.apache.kafka.streams.kstream.Merger; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
import org.junit.runner.RunWith; | ||
import org.mockito.Mock; | ||
import org.mockito.junit.MockitoJUnitRunner; | ||
|
||
@RunWith(MockitoJUnitRunner.class) | ||
public class KudafAggregatorTest { | ||
|
||
@Mock | ||
private KsqlAggregateFunction<Long, String, String> func1; | ||
@Mock | ||
private Struct key; | ||
@Mock | ||
private Merger<Struct, String> func1Merger; | ||
@Mock | ||
private Function<String, String> func1ResultMapper; | ||
@Mock | ||
private KsqlProcessingContext ctx; | ||
private KudafAggregator<String> aggregator; | ||
|
||
@Before | ||
public void setUp() { | ||
aggregator = new KudafAggregator<>(2, ImmutableList.of(func1)); | ||
|
||
when(func1.getMerger()).thenReturn(func1Merger); | ||
when(func1.getResultMapper()).thenReturn(func1ResultMapper); | ||
|
||
when(func1.aggregate(any(), any())).thenReturn("func1-result"); | ||
when(func1Merger.apply(any(), any(), any())).thenReturn("func1-merged"); | ||
when(func1ResultMapper.apply(any())).thenReturn("func1-result"); | ||
} | ||
|
||
@Test | ||
public void shouldNotMutateParametersOnApply() { | ||
// Given: | ||
final GenericRow value = GenericRow.genericRow(1, 2L); | ||
final GenericRow agg = GenericRow.genericRow(1, 2L, 3); | ||
|
||
// When: | ||
final GenericRow result = aggregator.apply("key", value, agg); | ||
|
||
// Then: | ||
assertThat(value, is(GenericRow.genericRow(1, 2L))); | ||
assertThat(agg, is(GenericRow.genericRow(1, 2L, 3))); | ||
assertThat("invalid test", result, is(not(GenericRow.genericRow(1, 2L, 3)))); | ||
} | ||
|
||
@Test | ||
public void shouldNotMutateParametersOnMerge() { | ||
// Given: | ||
final GenericRow aggOne = GenericRow.genericRow(1, 2L, 4); | ||
final GenericRow aggTwo = GenericRow.genericRow(1, 2L, 3); | ||
|
||
// When: | ||
final GenericRow result = aggregator.getMerger().apply(key, aggOne, aggTwo); | ||
|
||
// Then: | ||
assertThat(aggOne, is(GenericRow.genericRow(1, 2L, 4))); | ||
assertThat(aggTwo, is(GenericRow.genericRow(1, 2L, 3))); | ||
assertThat("invalid test", result, is(not(GenericRow.genericRow(1, 2L, 4)))); | ||
assertThat("invalid test", result, is(not(GenericRow.genericRow(1, 2L, 3)))); | ||
} | ||
|
||
@Test | ||
public void shouldNotMutateParametersOnResultsMap() { | ||
// Given: | ||
final GenericRow agg = GenericRow.genericRow(1, 2L, 4); | ||
|
||
// When: | ||
final GenericRow result = aggregator.getResultMapper().transform("k", agg, ctx); | ||
|
||
// Then: | ||
assertThat(agg, is(GenericRow.genericRow(1, 2L, 4))); | ||
assertThat("invalid test", result, is(not(GenericRow.genericRow(1, 2L, 4)))); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
/* | ||
* Copyright 2020 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"); you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.execution.function.udaf; | ||
|
||
import static io.confluent.ksql.GenericRow.genericRow; | ||
import static org.hamcrest.CoreMatchers.equalTo; | ||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.is; | ||
import static org.hamcrest.Matchers.not; | ||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.Mockito.when; | ||
|
||
import com.google.common.collect.ImmutableList; | ||
import io.confluent.ksql.GenericRow; | ||
import io.confluent.ksql.execution.function.TableAggregationFunction; | ||
import org.apache.kafka.connect.data.Struct; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
import org.junit.runner.RunWith; | ||
import org.mockito.Mock; | ||
import org.mockito.junit.MockitoJUnitRunner; | ||
|
||
@RunWith(MockitoJUnitRunner.class) | ||
public class KudafUndoAggregatorTest { | ||
|
||
@Mock | ||
private TableAggregationFunction<Long, String, String> func1; | ||
@Mock | ||
private Struct key; | ||
private KudafUndoAggregator aggregator; | ||
|
||
@Before | ||
public void setUp() { | ||
aggregator = new KudafUndoAggregator(2, ImmutableList.of(func1)); | ||
|
||
when(func1.undo(any(), any())).thenReturn("func1-undone"); | ||
} | ||
|
||
@Test | ||
public void shouldNotMutateParametersOnApply() { | ||
// Given: | ||
final GenericRow value = GenericRow.genericRow(1, 2L); | ||
final GenericRow agg = GenericRow.genericRow(1, 2L, 3); | ||
|
||
// When: | ||
final GenericRow result = aggregator.apply(key, value, agg); | ||
|
||
// Then: | ||
assertThat(value, is(GenericRow.genericRow(1, 2L))); | ||
assertThat(agg, is(GenericRow.genericRow(1, 2L, 3))); | ||
assertThat("invalid test", result, is(not(GenericRow.genericRow(1, 2L, 3)))); | ||
} | ||
|
||
@Test | ||
public void shouldApplyUndoableAggregateFunctions() { | ||
// Given: | ||
final GenericRow value = genericRow(1, 2L); | ||
final GenericRow aggRow = genericRow(1, 2L, 3); | ||
|
||
// When: | ||
final GenericRow resultRow = aggregator.apply(key, value, aggRow); | ||
|
||
// Then: | ||
assertThat(resultRow, equalTo(genericRow(1, 2L, "func1-undone"))); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: test name and comments were misleading as the extra steps
KsqlMaterialization
adds to handle the HAVING clause are installed as part of this test.