diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java index da2800d246ffc..47544eeb6f886 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java @@ -184,6 +184,7 @@ protected Transformation translateToPlanInternal( leftInputSpec, rightInputSpec, joinSpec.getFilterNulls(), + minRetentionTime, minRetentionTime); } else { boolean leftIsOuter = joinType == FlinkJoinType.LEFT || joinType == FlinkJoinType.FULL; @@ -199,6 +200,7 @@ protected Transformation translateToPlanInternal( leftIsOuter, rightIsOuter, joinSpec.getFilterNulls(), + minRetentionTime, minRetentionTime); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/AbstractStreamingJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/AbstractStreamingJoinOperator.java index 64ada0f0db406..c7dad64663125 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/AbstractStreamingJoinOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/AbstractStreamingJoinOperator.java @@ -60,7 +60,8 @@ public abstract class AbstractStreamingJoinOperator extends AbstractStreamOperat private final boolean[] filterNullKeys; - protected final long stateRetentionTime; + protected final long leftStateRetentionTime; + protected final long rightStateRetentionTime; protected transient JoinConditionWithNullFilters joinCondition; protected transient TimestampedCollector collector; @@ -72,13 +73,15 @@ public AbstractStreamingJoinOperator( JoinInputSideSpec leftInputSideSpec, JoinInputSideSpec rightInputSideSpec, boolean[] filterNullKeys, - long stateRetentionTime) { + long leftStateRetentionTime, + long rightStateRetentionTime) { this.leftType = leftType; this.rightType = rightType; this.generatedJoinCondition = generatedJoinCondition; this.leftInputSideSpec = leftInputSideSpec; this.rightInputSideSpec = rightInputSideSpec; - this.stateRetentionTime = stateRetentionTime; + this.leftStateRetentionTime = leftStateRetentionTime; + this.rightStateRetentionTime = rightStateRetentionTime; this.filterNullKeys = filterNullKeys; } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java index d221c555996e0..308b98e279430 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java @@ -60,7 +60,8 @@ public StreamingJoinOperator( boolean leftIsOuter, boolean rightIsOuter, boolean[] filterNullKeys, - long stateRetentionTime) { + long leftStateRetentionTime, + long rightStateRetentionTime) { super( leftType, rightType, @@ -68,7 +69,8 @@ public StreamingJoinOperator( leftInputSideSpec, rightInputSideSpec, filterNullKeys, - stateRetentionTime); + leftStateRetentionTime, + rightStateRetentionTime); this.leftIsOuter = leftIsOuter; this.rightIsOuter = rightIsOuter; } @@ -89,7 +91,7 @@ public void open() throws Exception { "left-records", leftInputSideSpec, leftType, - stateRetentionTime); + leftStateRetentionTime); } else { this.leftRecordStateView = JoinRecordStateViews.create( @@ -97,7 +99,7 @@ public void open() throws Exception { "left-records", leftInputSideSpec, leftType, - stateRetentionTime); + leftStateRetentionTime); } if (rightIsOuter) { @@ -107,7 +109,7 @@ public void open() throws Exception { "right-records", rightInputSideSpec, rightType, - stateRetentionTime); + rightStateRetentionTime); } else { this.rightRecordStateView = JoinRecordStateViews.create( @@ -115,7 +117,7 @@ public void open() throws Exception { "right-records", rightInputSideSpec, rightType, - stateRetentionTime); + rightStateRetentionTime); } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingSemiAntiJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingSemiAntiJoinOperator.java index 3c841d7a0080e..63f5203fda765 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingSemiAntiJoinOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingSemiAntiJoinOperator.java @@ -35,7 +35,7 @@ public class StreamingSemiAntiJoinOperator extends AbstractStreamingJoinOperator private static final long serialVersionUID = -3135772379944924519L; - // true if it is anti join, otherwise is semi joinp + // true if it is anti join, otherwise is semi join private final boolean isAntiJoin; // left join state @@ -51,7 +51,8 @@ public StreamingSemiAntiJoinOperator( JoinInputSideSpec leftInputSideSpec, JoinInputSideSpec rightInputSideSpec, boolean[] filterNullKeys, - long stateRetentionTime) { + long leftStateRetentionTime, + long rightStateRetentionTIme) { super( leftType, rightType, @@ -59,7 +60,8 @@ public StreamingSemiAntiJoinOperator( leftInputSideSpec, rightInputSideSpec, filterNullKeys, - stateRetentionTime); + leftStateRetentionTime, + rightStateRetentionTIme); this.isAntiJoin = isAntiJoin; } @@ -73,7 +75,7 @@ public void open() throws Exception { LEFT_RECORDS_STATE_NAME, leftInputSideSpec, leftType, - stateRetentionTime); + leftStateRetentionTime); this.rightRecordStateView = JoinRecordStateViews.create( @@ -81,7 +83,7 @@ public void open() throws Exception { RIGHT_RECORDS_STATE_NAME, rightInputSideSpec, rightType, - stateRetentionTime); + rightStateRetentionTime); } /** diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTest.java new file mode 100644 index 0000000000000..b2e609a3ddb3f --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTest.java @@ -0,0 +1,656 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.stream; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +import java.util.function.Function; +import java.util.stream.Stream; + +import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; + +/** Harness tests for {@link StreamingJoinOperator}. */ +public class StreamingJoinOperatorTest extends StreamingJoinOperatorTestBase { + + @Override + protected StreamingJoinOperator createJoinOperator(TestInfo testInfo) { + Boolean[] joinTypeSpec = JOIN_TYPE_EXTRACTOR.apply(testInfo.getDisplayName()); + Long[] ttl = STATE_RETENTION_TIME_EXTRACTOR.apply(testInfo.getTags()); + return new StreamingJoinOperator( + leftTypeInfo, + rightTypeInfo, + joinCondition, + leftInputSpec, + rightInputSpec, + joinTypeSpec[0], + joinTypeSpec[1], + new boolean[] {true}, + ttl[0], + ttl[1]); + } + + @Override + protected RowType getOutputType() { + return RowType.of( + Stream.concat( + leftTypeInfo.toRowType().getChildren().stream(), + rightTypeInfo.toRowType().getChildren().stream()) + .toArray(LogicalType[]::new), + Stream.concat( + leftTypeInfo.toRowType().getFieldNames().stream(), + rightTypeInfo.toRowType().getFieldNames().stream()) + .toArray(String[]::new)); + } + + /** + * The equivalent SQL as follows. + * + *

{@code SELECT a.order_id, a.line_order_id, a.shipping_address, b.line_order_id, + * b.line_order_ship_mode FROM orders a JOIN line_orders b ON a.line_order_id = b.line_order_id} + */ + @Tag("leftStateRetentionTime=4000") + @Tag("rightStateRetentionTime=1000") + @Test + public void testInnerJoinWithDifferentStateRetentionTime() throws Exception { + testHarness.setStateTtlProcessingTime(1); + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmitNothing(testHarness); + + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmitNothing(testHarness); + + testHarness.processElement2(insertRecord("LineOrd#2", "AIR")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#2", + "3 Bellevue Drive, Pottstown, PA 19464", + "LineOrd#2", + "AIR")); + + // the right side state of LineOrd#2 has expired + testHarness.setStateTtlProcessingTime(3000); + testHarness.processElement1( + updateAfterRecord( + "Ord#1", "LineOrd#2", "68 Manor Station Street, Honolulu, HI 96815")); + assertor.shouldEmitNothing(testHarness); + + testHarness.processElement2(updateAfterRecord("LineOrd#2", "SHIP")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.UPDATE_AFTER, + "Ord#1", + "LineOrd#2", + "68 Manor Station Street, Honolulu, HI 96815", + "LineOrd#2", + "SHIP")); + + // the left side state of LineOrd#1 has expired + testHarness.setStateTtlProcessingTime(4001); + testHarness.processElement2(insertRecord("LineOrd#1", "TRUCK")); + assertor.shouldEmitNothing(testHarness); + + testHarness.processElement2(deleteRecord("LineOrd#2", "SHIP")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.DELETE, + "Ord#1", + "LineOrd#2", + "68 Manor Station Street, Honolulu, HI 96815", + "LineOrd#2", + "SHIP")); + + // the left side state of LineOrd#2 has expired + testHarness.setStateTtlProcessingTime(7000); + testHarness.processElement2(insertRecord("LineOrd#2", "RAIL")); + assertor.shouldEmitNothing(testHarness); + } + + /** + * The equivalent SQL is same with {@link #testInnerJoinWithDifferentStateRetentionTime}. The + * only difference is that the state retention is disabled. + */ + @Test + public void testInnerJoinWithStateRetentionDisabled() throws Exception { + testHarness.setStateTtlProcessingTime(1); + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmitNothing(testHarness); + + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmitNothing(testHarness); + + testHarness.processElement2(insertRecord("LineOrd#2", "AIR")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#2", + "3 Bellevue Drive, Pottstown, PA 19464", + "LineOrd#2", + "AIR")); + + testHarness.setStateTtlProcessingTime(3000); + testHarness.processElement1( + updateAfterRecord( + "Ord#1", "LineOrd#2", "68 Manor Station Street, Honolulu, HI 96815")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.UPDATE_AFTER, + "Ord#1", + "LineOrd#2", + "68 Manor Station Street, Honolulu, HI 96815", + "LineOrd#2", + "AIR")); + + testHarness.processElement2(updateAfterRecord("LineOrd#2", "SHIP")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.UPDATE_AFTER, + "Ord#1", + "LineOrd#2", + "68 Manor Station Street, Honolulu, HI 96815", + "LineOrd#2", + "SHIP")); + + testHarness.setStateTtlProcessingTime(4001); + testHarness.processElement2(insertRecord("LineOrd#1", "TRUCK")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#1", + "3 Bellevue Drive, Pottstown, PA 19464", + "LineOrd#1", + "TRUCK")); + + testHarness.processElement2(deleteRecord("LineOrd#2", "SHIP")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.DELETE, + "Ord#1", + "LineOrd#2", + "68 Manor Station Street, Honolulu, HI 96815", + "LineOrd#2", + "SHIP")); + + testHarness.setStateTtlProcessingTime(7000); + testHarness.processElement2(insertRecord("LineOrd#2", "RAIL")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#2", + "68 Manor Station Street, Honolulu, HI 96815", + "LineOrd#2", + "RAIL")); + } + + /** + * The equivalent SQL is same with testInnerJoinWithDifferentStateRetentionTime. The only + * difference is that the left and right state retention time are same. + */ + @Tag("leftStateRetentionTime=4000") + @Tag("rightStateRetentionTime=4000") + @Test + public void testInnerJoinWithSameStateRetentionTime() throws Exception { + testHarness.setStateTtlProcessingTime(1); + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmitNothing(testHarness); + + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmitNothing(testHarness); + + testHarness.processElement2(insertRecord("LineOrd#2", "AIR")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#2", + "3 Bellevue Drive, Pottstown, PA 19464", + "LineOrd#2", + "AIR")); + + // extend the expired time to 8000 for LineOrd#2 + testHarness.setStateTtlProcessingTime(4000); + testHarness.processElement1( + updateAfterRecord( + "Ord#1", "LineOrd#2", "68 Manor Station Street, Honolulu, HI 96815")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.UPDATE_AFTER, + "Ord#1", + "LineOrd#2", + "68 Manor Station Street, Honolulu, HI 96815", + "LineOrd#2", + "AIR")); + + // the state of LineOrd#1 has expired + testHarness.setStateTtlProcessingTime(4001); + testHarness.processElement2(insertRecord("LineOrd#1", "TRUCK")); + assertor.shouldEmitNothing(testHarness); + + // the expired time for left and right state of LineOrd#2 is 8000 + testHarness.setStateTtlProcessingTime(7999); + testHarness.processElement2(updateAfterRecord("LineOrd#2", "TRUCK")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.UPDATE_AFTER, + "Ord#1", + "LineOrd#2", + "68 Manor Station Street, Honolulu, HI 96815", + "LineOrd#2", + "TRUCK")); + + testHarness.setStateTtlProcessingTime(8000); + testHarness.processElement2(updateAfterRecord("LineOrd#2", "RAIL")); + assertor.shouldEmitNothing(testHarness); + } + + /** + * The equivalent SQL as follows. + * + *

{@code SELECT a.order_id, a.line_order_id, a.shipping_address, b.line_order_id, + * b.line_order_ship_mode FROM orders a LEFT JOIN line_orders b ON a.line_order_id = + * b.line_order_id} + */ + @Tag("leftStateRetentionTime=4000") + @Tag("rightStateRetentionTime=1000") + @Test + public void testLeftOuterJoinWithDifferentStateRetentionTime() throws Exception { + testHarness.setStateTtlProcessingTime(1); + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#1", + "3 Bellevue Drive, Pottstown, PA 19464", + null, + null)); + + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#2", + "3 Bellevue Drive, Pottstown, PA 19464", + null, + null)); + + testHarness.processElement2(insertRecord("LineOrd#2", "AIR")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.DELETE, + "Ord#1", + "LineOrd#2", + "3 Bellevue Drive, Pottstown, PA 19464", + null, + null), + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#2", + "3 Bellevue Drive, Pottstown, PA 19464", + "LineOrd#2", + "AIR")); + + // the right side state of LineOrd#2 has expired + testHarness.setStateTtlProcessingTime(3000); + testHarness.processElement1( + updateAfterRecord( + "Ord#1", "LineOrd#2", "68 Manor Station Street, Honolulu, HI 96815")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#2", + "68 Manor Station Street, Honolulu, HI 96815", + null, + null)); + + testHarness.processElement2(updateAfterRecord("LineOrd#2", "SHIP")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.DELETE, + "Ord#1", + "LineOrd#2", + "68 Manor Station Street, Honolulu, HI 96815", + null, + null), + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#2", + "68 Manor Station Street, Honolulu, HI 96815", + "LineOrd#2", + "SHIP")); + + // the left side state of LineOrd#1 has expired + testHarness.setStateTtlProcessingTime(4001); + testHarness.processElement2(insertRecord("LineOrd#1", "TRUCK")); + assertor.shouldEmitNothing(testHarness); + + testHarness.processElement2(deleteRecord("LineOrd#2", "SHIP")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.DELETE, + "Ord#1", + "LineOrd#2", + "68 Manor Station Street, Honolulu, HI 96815", + "LineOrd#2", + "SHIP"), + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#2", + "68 Manor Station Street, Honolulu, HI 96815", + null, + null)); + + // the left side state of LineOrd#2 has expired + testHarness.setStateTtlProcessingTime(8001); + testHarness.processElement2(insertRecord("LineOrd#2", "RAIL")); + assertor.shouldEmitNothing(testHarness); + } + + /** + * The equivalent SQL is the same as {@link + * #testLeftOuterJoinWithDifferentStateRetentionTime()}. The only difference is that the state + * retention is disabled. + */ + @Test + public void testLeftOuterJoinWithStateRetentionDisabled() throws Exception { + testHarness.setStateTtlProcessingTime(1); + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#1", + "3 Bellevue Drive, Pottstown, PA 19464", + null, + null)); + + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#2", + "3 Bellevue Drive, Pottstown, PA 19464", + null, + null)); + + testHarness.processElement2(insertRecord("LineOrd#2", "AIR")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.DELETE, + "Ord#1", + "LineOrd#2", + "3 Bellevue Drive, Pottstown, PA 19464", + null, + null), + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#2", + "3 Bellevue Drive, Pottstown, PA 19464", + "LineOrd#2", + "AIR")); + + testHarness.setStateTtlProcessingTime(3000); + testHarness.processElement1( + updateAfterRecord( + "Ord#1", "LineOrd#2", "68 Manor Station Street, Honolulu, HI 96815")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#2", + "68 Manor Station Street, Honolulu, HI 96815", + "LineOrd#2", + "AIR")); + + testHarness.processElement2(updateAfterRecord("LineOrd#2", "SHIP")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#2", + "68 Manor Station Street, Honolulu, HI 96815", + "LineOrd#2", + "SHIP")); + + testHarness.setStateTtlProcessingTime(4001); + testHarness.processElement2(insertRecord("LineOrd#1", "TRUCK")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.DELETE, + "Ord#1", + "LineOrd#1", + "3 Bellevue Drive, Pottstown, PA 19464", + null, + null), + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#1", + "3 Bellevue Drive, Pottstown, PA 19464", + "LineOrd#1", + "TRUCK")); + + testHarness.setStateTtlProcessingTime(8001); + testHarness.processElement2(deleteRecord("LineOrd#2", "SHIP")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.DELETE, + "Ord#1", + "LineOrd#2", + "68 Manor Station Street, Honolulu, HI 96815", + "LineOrd#2", + "SHIP")); + } + + /** + * The equivalent SQL as follows. + * + *

{@code SELECT a.order_id, a.line_order_id, a.shipping_address, b.line_order_id, + * b.line_order_ship_mode FROM orders a RIGHT JOIN line_orders b ON a.line_order_id = + * b.line_order_id} + */ + @Tag("leftStateRetentionTime=4000") + @Tag("rightStateRetentionTime=1000") + @Test + public void testRightOuterJoinWithDifferentStateRetentionTime() throws Exception { + testHarness.setStateTtlProcessingTime(1); + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmitNothing(testHarness); + + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmitNothing(testHarness); + + // left side state is expired + testHarness.setStateTtlProcessingTime(4001); + testHarness.processElement2(insertRecord("LineOrd#2", "AIR")); + assertor.shouldEmit( + testHarness, rowOfKind(RowKind.INSERT, null, null, null, "LineOrd#2", "AIR")); + + testHarness.processElement2(insertRecord("LineOrd#1", "TRUCK")); + assertor.shouldEmit( + testHarness, rowOfKind(RowKind.INSERT, null, null, null, "LineOrd#1", "TRUCK")); + + // the right side state has expired + testHarness.setStateTtlProcessingTime(5001); + testHarness.processElement1( + updateAfterRecord( + "Ord#1", "LineOrd#2", "68 Manor Station Street, Honolulu, HI 96815")); + assertor.shouldEmitNothing(testHarness); + + testHarness.processElement2(updateAfterRecord("LineOrd#2", "SHIP")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#2", + "68 Manor Station Street, Honolulu, HI 96815", + "LineOrd#2", + "SHIP")); + + testHarness.processElement2(updateAfterRecord("LineOrd#1", "RAIL")); + assertor.shouldEmit( + testHarness, rowOfKind(RowKind.INSERT, null, null, null, "LineOrd#1", "RAIL")); + + testHarness.setStateTtlProcessingTime(6000); + testHarness.processElement1( + updateAfterRecord( + "Ord#1", "LineOrd#1", "3 North Winchester Drive, Haines City, FL 33844")); + assertor.shouldEmit( + testHarness, + rowOfKind(RowKind.DELETE, null, null, null, "LineOrd#1", "RAIL"), + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#1", + "3 North Winchester Drive, Haines City, FL 33844", + "LineOrd#1", + "RAIL")); + + // right side state has expired + testHarness.setStateTtlProcessingTime(7000); + testHarness.processElement1( + deleteRecord( + "Ord#1", "LineOrd#1", "3 North Winchester Drive, Haines City, FL 33844")); + assertor.shouldEmitNothing(testHarness); + } + + /** + * The equivalent SQL is the same as {@link + * #testRightOuterJoinWithDifferentStateRetentionTime()}. The only difference is that the state + * retention is disabled. + */ + @Test + public void testRightOuterJoinWithDStateRetentionDisabled() throws Exception { + testHarness.setStateTtlProcessingTime(1); + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmitNothing(testHarness); + + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmitNothing(testHarness); + + testHarness.setStateTtlProcessingTime(4001); + testHarness.processElement2(insertRecord("LineOrd#2", "AIR")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#2", + "3 Bellevue Drive, Pottstown, PA 19464", + "LineOrd#2", + "AIR")); + + testHarness.setStateTtlProcessingTime(10000); + testHarness.processElement2(insertRecord("LineOrd#1", "TRUCK")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#1", + "3 Bellevue Drive, Pottstown, PA 19464", + "LineOrd#1", + "TRUCK")); + + testHarness.setStateTtlProcessingTime(20000); + testHarness.processElement1( + updateAfterRecord( + "Ord#1", "LineOrd#2", "68 Manor Station Street, Honolulu, HI 96815")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#2", + "68 Manor Station Street, Honolulu, HI 96815", + "LineOrd#2", + "AIR")); + } + + private static final Function JOIN_TYPE_EXTRACTOR = + (testDisplayName) -> { + if (testDisplayName.contains("InnerJoin")) { + return new Boolean[] {false, false}; + } else if (testDisplayName.contains("LeftOuterJoin")) { + return new Boolean[] {true, false}; + } else { + return new Boolean[] {false, true}; + } + }; +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java new file mode 100644 index 0000000000000..48cc9884e6648 --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.stream; + +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.utils.HandwrittenSelectorUtil; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; + +import java.util.Set; +import java.util.function.Function; + +/** Base test class for {@link AbstractStreamingJoinOperator}. */ +public abstract class StreamingJoinOperatorTestBase { + + protected final InternalTypeInfo leftTypeInfo = + InternalTypeInfo.of( + RowType.of( + new LogicalType[] { + new CharType(false, 20), + new CharType(false, 20), + VarCharType.STRING_TYPE + }, + new String[] {"order_id", "line_order_id", "shipping_address"})); + + protected final InternalTypeInfo rightTypeInfo = + InternalTypeInfo.of( + RowType.of( + new LogicalType[] {new CharType(false, 20), new CharType(true, 10)}, + new String[] {"line_order_id0", "line_order_ship_mode"})); + + protected final RowDataKeySelector leftKeySelector = + HandwrittenSelectorUtil.getRowDataSelector( + new int[] {1}, + leftTypeInfo.toRowType().getChildren().toArray(new LogicalType[0])); + protected final RowDataKeySelector rightKeySelector = + HandwrittenSelectorUtil.getRowDataSelector( + new int[] {0}, + rightTypeInfo.toRowType().getChildren().toArray(new LogicalType[0])); + + protected final JoinInputSideSpec leftInputSpec = + JoinInputSideSpec.withUniqueKeyContainedByJoinKey(leftTypeInfo, leftKeySelector); + protected final JoinInputSideSpec rightInputSpec = + JoinInputSideSpec.withUniqueKeyContainedByJoinKey(rightTypeInfo, rightKeySelector); + + protected final InternalTypeInfo joinKeyTypeInfo = + InternalTypeInfo.of(new CharType(false, 20)); + + protected final String funcCode = + "public class ConditionFunction extends org.apache.flink.api.common.functions.AbstractRichFunction " + + "implements org.apache.flink.table.runtime.generated.JoinCondition {\n" + + "\n" + + " public ConditionFunction(Object[] reference) {\n" + + " }\n" + + "\n" + + " @Override\n" + + " public boolean apply(org.apache.flink.table.data.RowData in1, org.apache.flink.table.data.RowData in2) {\n" + + " return true;\n" + + " }\n" + + "\n" + + " @Override\n" + + " public void close() throws Exception {\n" + + " super.close();\n" + + " }" + + "}\n"; + protected final GeneratedJoinCondition joinCondition = + new GeneratedJoinCondition("ConditionFunction", funcCode, new Object[0]); + + protected final RowDataHarnessAssertor assertor = + new RowDataHarnessAssertor(getOutputType().getChildren().toArray(new LogicalType[0])); + + protected KeyedTwoInputStreamOperatorTestHarness + testHarness; + + @BeforeEach + public void beforeEach(TestInfo testInfo) throws Exception { + testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + createJoinOperator(testInfo), + leftKeySelector, + rightKeySelector, + joinKeyTypeInfo); + testHarness.open(); + } + + @AfterEach + public void afterEach() throws Exception { + testHarness.close(); + } + + protected static final Function, Long[]> STATE_RETENTION_TIME_EXTRACTOR = + (tags) -> { + if (tags.isEmpty()) { + return new Long[] {0L, 0L}; + } + Long[] ttl = new Long[2]; + for (String tag : tags) { + String[] splits = tag.split("="); + long value = Long.parseLong(splits[1].trim()); + if (splits[0].trim().startsWith("left")) { + ttl[0] = value; + } else { + ttl[1] = value; + } + } + return ttl; + }; + + /** Create streaming join operator according to {@link TestInfo}. */ + protected abstract AbstractStreamingJoinOperator createJoinOperator(TestInfo testInfo); + + /** Get the output row type of join operator. */ + protected abstract RowType getOutputType(); +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingSemiAntiJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingSemiAntiJoinOperatorTest.java new file mode 100644 index 0000000000000..5aa112c9fe11a --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingSemiAntiJoinOperatorTest.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.stream; + +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +import java.util.function.Predicate; + +import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; + +/** Test for {@link StreamingSemiAntiJoinOperator}. */ +public class StreamingSemiAntiJoinOperatorTest extends StreamingJoinOperatorTestBase { + @Override + protected StreamingSemiAntiJoinOperator createJoinOperator(TestInfo testInfo) { + Long[] ttl = STATE_RETENTION_TIME_EXTRACTOR.apply(testInfo.getTags()); + return new StreamingSemiAntiJoinOperator( + ANTI_JOIN_CHECKER.test(testInfo.getDisplayName()), + leftTypeInfo, + rightTypeInfo, + joinCondition, + leftInputSpec, + rightInputSpec, + new boolean[] {true}, + ttl[0], + ttl[1]); + } + + @Override + protected RowType getOutputType() { + return leftTypeInfo.toRowType(); + } + + /** + * The equivalent SQL as follows. + * + *

{@code SELECT a.order_id, a.line_order_id, a.shipping_address FROM orders a WHERE + * a.line_order_id IN (SELECT b.line_order_id FROM line_orders b)} + */ + @Tag("leftStateRetentionTime=4000") + @Tag("rightStateRetentionTime=1000") + @Test + public void testLeftSemiJoinWithDifferentStateRetentionTime() throws Exception { + testHarness.setStateTtlProcessingTime(1); + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmitNothing(testHarness); + + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmitNothing(testHarness); + + testHarness.setStateTtlProcessingTime(3001); + testHarness.processElement2(insertRecord("LineOrd#2", "AIR")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#2", + "3 Bellevue Drive, Pottstown, PA 19464")); + + testHarness.processElement2(updateAfterRecord("LineOrd#2", "TRUCK")); + assertor.shouldEmitNothing(testHarness); + + testHarness.processElement2(deleteRecord("LineOrd#2", "TRUCK")); + assertor.shouldEmitNothing(testHarness); + + // numOfAssociations is reduced to 1, retract the record + testHarness.processElement2(deleteRecord("LineOrd#2", "AIR")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.DELETE, + "Ord#1", + "LineOrd#2", + "3 Bellevue Drive, Pottstown, PA 19464")); + + // the left side state of LineOrd#1 has expired + testHarness.setStateTtlProcessingTime(4001); + testHarness.processElement2(insertRecord("LineOrd#1", "SHIP")); + assertor.shouldEmitNothing(testHarness); + + // the right side state of LineOrd#1 has expired + testHarness.setStateTtlProcessingTime(5001); + testHarness.processElement1( + updateAfterRecord("Ord#1", "LineOrd#1", "7238 Marsh St., Birmingham, AL 35209")); + assertor.shouldEmitNothing(testHarness); + } + + /** + * The equivalent SQL is the same as {@link #testLeftSemiJoinWithDifferentStateRetentionTime()}. + * The only difference is that the state retention is disabled. + */ + @Test + public void testLeftSemiJoinWithStateRetentionDisabled() throws Exception { + testHarness.setStateTtlProcessingTime(1); + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmitNothing(testHarness); + + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmitNothing(testHarness); + + testHarness.setStateTtlProcessingTime(3001); + testHarness.processElement2(insertRecord("LineOrd#2", "AIR")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#2", + "3 Bellevue Drive, Pottstown, PA 19464")); + + testHarness.processElement2(updateAfterRecord("LineOrd#2", "TRUCK")); + assertor.shouldEmitNothing(testHarness); + + testHarness.processElement2(deleteRecord("LineOrd#2", "TRUCK")); + assertor.shouldEmitNothing(testHarness); + + // numOfAssociations is reduced to 1, retract the record + testHarness.processElement2(deleteRecord("LineOrd#2", "AIR")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.DELETE, + "Ord#1", + "LineOrd#2", + "3 Bellevue Drive, Pottstown, PA 19464")); + + testHarness.setStateTtlProcessingTime(4001); + testHarness.processElement2(insertRecord("LineOrd#1", "SHIP")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#1", + "3 Bellevue Drive, Pottstown, PA 19464")); + + // the right side state of LineOrd#1 has expired + testHarness.setStateTtlProcessingTime(5001); + testHarness.processElement1( + updateAfterRecord("Ord#1", "LineOrd#1", "7238 Marsh St., Birmingham, AL 35209")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.UPDATE_AFTER, + "Ord#1", + "LineOrd#1", + "7238 Marsh St., Birmingham, AL 35209")); + } + + /** + * The equivalent SQL as follows. + * + *

{@code SELECT a.order_id, a.line_order_id, a.shipping_address FROM orders a WHERE + * a.line_order_id NOT IN (SELECT b.line_order_id FROM line_orders b)} + */ + @Tag("leftStateRetentionTime=4000") + @Tag("rightStateRetentionTime=1000") + @Test + public void testLeftAntiJoinWithDifferentStateRetentionTime() throws Exception { + testHarness.setStateTtlProcessingTime(1); + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#1", + "3 Bellevue Drive, Pottstown, PA 19464")); + + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#2", + "3 Bellevue Drive, Pottstown, PA 19464")); + + testHarness.setStateTtlProcessingTime(3001); + testHarness.processElement2(insertRecord("LineOrd#2", "AIR")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.DELETE, + "Ord#1", + "LineOrd#2", + "3 Bellevue Drive, Pottstown, PA 19464")); + + // left side state of LineOrd#1 has expired + testHarness.setStateTtlProcessingTime(4001); + testHarness.processElement2(insertRecord("LineOrd#1", "RAIL")); + assertor.shouldEmitNothing(testHarness); + + // right side state of LineOrd#1 has expired + testHarness.setStateTtlProcessingTime(5001); + testHarness.processElement1( + updateAfterRecord( + "Ord#1", "LineOrd#1", "23 W. River Avenue, Port Orange, FL 32127")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.UPDATE_AFTER, + "Ord#1", + "LineOrd#1", + "23 W. River Avenue, Port Orange, FL 32127")); + } + + /** + * The equivalent SQL is the same as {@link #testLeftAntiJoinWithDifferentStateRetentionTime()}. + * The only difference is that the state retention is disabled. + */ + @Test + public void testLeftAntiJoinWithStateRetentionTimeDisabled() throws Exception { + testHarness.setStateTtlProcessingTime(1); + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#1", + "3 Bellevue Drive, Pottstown, PA 19464")); + + testHarness.processElement1( + insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.INSERT, + "Ord#1", + "LineOrd#2", + "3 Bellevue Drive, Pottstown, PA 19464")); + + testHarness.setStateTtlProcessingTime(3001); + testHarness.processElement2(insertRecord("LineOrd#2", "AIR")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.DELETE, + "Ord#1", + "LineOrd#2", + "3 Bellevue Drive, Pottstown, PA 19464")); + + testHarness.setStateTtlProcessingTime(4001); + testHarness.processElement2(insertRecord("LineOrd#1", "RAIL")); + assertor.shouldEmit( + testHarness, + rowOfKind( + RowKind.DELETE, + "Ord#1", + "LineOrd#1", + "3 Bellevue Drive, Pottstown, PA 19464")); + + testHarness.setStateTtlProcessingTime(5001); + testHarness.processElement1( + updateAfterRecord( + "Ord#1", "LineOrd#1", "23 W. River Avenue, Port Orange, FL 32127")); + assertor.shouldEmitNothing(testHarness); + } + + private static final Predicate ANTI_JOIN_CHECKER = + (testDisplayName) -> testDisplayName.contains("Anti"); +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java index 671b2ec07d678..372318c0476c6 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java @@ -19,15 +19,13 @@ package org.apache.flink.table.runtime.operators.sink; import org.apache.flink.api.common.state.StateTtlConfig; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; import org.apache.flink.table.runtime.generated.RecordEqualiser; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; import org.apache.flink.table.runtime.util.StateConfigUtil; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.IntType; @@ -38,14 +36,10 @@ import org.junit.Test; -import java.util.ArrayList; -import java.util.List; - import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; -import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link SinkUpsertMaterializer}. */ public class SinkUpsertMaterializerTest { @@ -56,6 +50,7 @@ public class SinkUpsertMaterializerTest { private final RowDataSerializer serializer = new RowDataSerializer(types); private final RowDataKeySelector keySelector = HandwrittenSelectorUtil.getRowDataSelector(new int[] {1}, types); + private final RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(types); private final GeneratedRecordEqualiser equaliser = new GeneratedRecordEqualiser("", "", new Object[0]) { @@ -89,30 +84,30 @@ public void test() throws Exception { testHarness.setStateTtlProcessingTime(1); testHarness.processElement(insertRecord(1L, 1, "a1")); - shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); testHarness.processElement(insertRecord(2L, 1, "a2")); - shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")); testHarness.processElement(insertRecord(3L, 1, "a3")); - shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); testHarness.processElement(deleteRecord(2L, 1, "a2")); - shouldEmitNothing(testHarness); + assertor.shouldEmitNothing(testHarness); testHarness.processElement(deleteRecord(3L, 1, "a3")); - shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1")); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1")); testHarness.processElement(deleteRecord(1L, 1, "a1")); - shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 1L, 1, "a1")); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 1L, 1, "a1")); testHarness.processElement(insertRecord(4L, 1, "a4")); - shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); testHarness.setStateTtlProcessingTime(1002); testHarness.processElement(deleteRecord(4L, 1, "a4")); - shouldEmitNothing(testHarness); + assertor.shouldEmitNothing(testHarness); testHarness.close(); } @@ -131,54 +126,31 @@ public void testInputHasUpsertKeyWithNonDeterministicColumn() throws Exception { testHarness.setStateTtlProcessingTime(1); testHarness.processElement(insertRecord(1L, 1, "a1")); - shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); testHarness.processElement(updateAfterRecord(1L, 1, "a11")); - shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a11")); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a11")); testHarness.processElement(insertRecord(3L, 1, "a3")); - shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); testHarness.processElement(deleteRecord(1L, 1, "a111")); - shouldEmitNothing(testHarness); + assertor.shouldEmitNothing(testHarness); testHarness.processElement(deleteRecord(3L, 1, "a33")); - shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 3L, 1, "a33")); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 3L, 1, "a33")); testHarness.processElement(insertRecord(4L, 1, "a4")); - shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); testHarness.setStateTtlProcessingTime(1002); testHarness.processElement(deleteRecord(4L, 1, "a4")); - shouldEmitNothing(testHarness); + assertor.shouldEmitNothing(testHarness); testHarness.close(); } - private void shouldEmitNothing(OneInputStreamOperatorTestHarness harness) { - assertThat(getEmittedRows(harness)).isEmpty(); - } - - private void shouldEmit( - OneInputStreamOperatorTestHarness harness, RowData expected) { - assertThat(getEmittedRows(harness)).containsExactly(expected); - } - - private static List getEmittedRows( - OneInputStreamOperatorTestHarness harness) { - final List rows = new ArrayList<>(); - Object o; - while ((o = harness.getOutput().poll()) != null) { - RowData value = (RowData) ((StreamRecord) o).getValue(); - GenericRowData newRow = - GenericRowData.of(value.getLong(0), value.getInt(1), value.getString(2)); - newRow.setRowKind(value.getRowKind()); - rows.add(newRow); - } - return rows; - } - private static class TestRecordEqualiser implements RecordEqualiser { @Override public boolean equals(RowData row1, RowData row2) { diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java index c60967349a9e4..05bfed74d6b64 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java @@ -20,6 +20,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; @@ -49,6 +50,17 @@ public RowDataHarnessAssertor(LogicalType[] types) { this(types, new StringComparator()); } + /** Assert the test harness should not emit any records. */ + public void shouldEmitNothing(AbstractStreamOperatorTestHarness harness) { + assertThat(getEmittedRows(harness)).isEmpty(); + } + + /** Assert the test harness should emit records exactly same as the expected records. */ + public void shouldEmit( + AbstractStreamOperatorTestHarness harness, RowData... expected) { + assertThat(getEmittedRows(harness)).containsExactly(expected); + } + /** * Compare the two queues containing operator/task output by converting them to an array first. * Asserts two converted array should be same. @@ -67,6 +79,26 @@ public void assertOutputEqualsSorted( assertOutputEquals(message, expected, actual, true); } + private List getEmittedRows(AbstractStreamOperatorTestHarness harness) { + final RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[types.length]; + for (int i = 0; i < types.length; i++) { + fieldGetters[i] = RowData.createFieldGetter(types[i], i); + } + final List rows = new ArrayList<>(); + Object o; + while ((o = harness.getOutput().poll()) != null) { + RowData value = (RowData) ((StreamRecord) o).getValue(); + Object[] row = new Object[types.length]; + for (int i = 0; i < types.length; i++) { + row[i] = fieldGetters[i].getFieldOrNull(value); + } + GenericRowData newRow = GenericRowData.of(row); + newRow.setRowKind(value.getRowKind()); + rows.add(newRow); + } + return rows; + } + private void assertOutputEquals( String message, Collection expected,