Skip to content

Commit

Permalink
[FLINK-32052][table-runtime] Introduce left and right state retention…
Browse files Browse the repository at this point in the history
… time to StreamingJoinOperator

This closes apache#22566
  • Loading branch information
LadyForest authored and godfreyhe committed May 16, 2023
1 parent 1957ef5 commit 5ba3f2b
Show file tree
Hide file tree
Showing 9 changed files with 1,164 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ protected Transformation<RowData> translateToPlanInternal(
leftInputSpec,
rightInputSpec,
joinSpec.getFilterNulls(),
minRetentionTime,
minRetentionTime);
} else {
boolean leftIsOuter = joinType == FlinkJoinType.LEFT || joinType == FlinkJoinType.FULL;
Expand All @@ -199,6 +200,7 @@ protected Transformation<RowData> translateToPlanInternal(
leftIsOuter,
rightIsOuter,
joinSpec.getFilterNulls(),
minRetentionTime,
minRetentionTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public abstract class AbstractStreamingJoinOperator extends AbstractStreamOperat

private final boolean[] filterNullKeys;

protected final long stateRetentionTime;
protected final long leftStateRetentionTime;
protected final long rightStateRetentionTime;

protected transient JoinConditionWithNullFilters joinCondition;
protected transient TimestampedCollector<RowData> collector;
Expand All @@ -72,13 +73,15 @@ public AbstractStreamingJoinOperator(
JoinInputSideSpec leftInputSideSpec,
JoinInputSideSpec rightInputSideSpec,
boolean[] filterNullKeys,
long stateRetentionTime) {
long leftStateRetentionTime,
long rightStateRetentionTime) {
this.leftType = leftType;
this.rightType = rightType;
this.generatedJoinCondition = generatedJoinCondition;
this.leftInputSideSpec = leftInputSideSpec;
this.rightInputSideSpec = rightInputSideSpec;
this.stateRetentionTime = stateRetentionTime;
this.leftStateRetentionTime = leftStateRetentionTime;
this.rightStateRetentionTime = rightStateRetentionTime;
this.filterNullKeys = filterNullKeys;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,17 @@ public StreamingJoinOperator(
boolean leftIsOuter,
boolean rightIsOuter,
boolean[] filterNullKeys,
long stateRetentionTime) {
long leftStateRetentionTime,
long rightStateRetentionTime) {
super(
leftType,
rightType,
generatedJoinCondition,
leftInputSideSpec,
rightInputSideSpec,
filterNullKeys,
stateRetentionTime);
leftStateRetentionTime,
rightStateRetentionTime);
this.leftIsOuter = leftIsOuter;
this.rightIsOuter = rightIsOuter;
}
Expand All @@ -89,15 +91,15 @@ public void open() throws Exception {
"left-records",
leftInputSideSpec,
leftType,
stateRetentionTime);
leftStateRetentionTime);
} else {
this.leftRecordStateView =
JoinRecordStateViews.create(
getRuntimeContext(),
"left-records",
leftInputSideSpec,
leftType,
stateRetentionTime);
leftStateRetentionTime);
}

if (rightIsOuter) {
Expand All @@ -107,15 +109,15 @@ public void open() throws Exception {
"right-records",
rightInputSideSpec,
rightType,
stateRetentionTime);
rightStateRetentionTime);
} else {
this.rightRecordStateView =
JoinRecordStateViews.create(
getRuntimeContext(),
"right-records",
rightInputSideSpec,
rightType,
stateRetentionTime);
rightStateRetentionTime);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class StreamingSemiAntiJoinOperator extends AbstractStreamingJoinOperator

private static final long serialVersionUID = -3135772379944924519L;

// true if it is anti join, otherwise is semi joinp
// true if it is anti join, otherwise is semi join
private final boolean isAntiJoin;

// left join state
Expand All @@ -51,15 +51,17 @@ public StreamingSemiAntiJoinOperator(
JoinInputSideSpec leftInputSideSpec,
JoinInputSideSpec rightInputSideSpec,
boolean[] filterNullKeys,
long stateRetentionTime) {
long leftStateRetentionTime,
long rightStateRetentionTIme) {
super(
leftType,
rightType,
generatedJoinCondition,
leftInputSideSpec,
rightInputSideSpec,
filterNullKeys,
stateRetentionTime);
leftStateRetentionTime,
rightStateRetentionTIme);
this.isAntiJoin = isAntiJoin;
}

Expand All @@ -73,15 +75,15 @@ public void open() throws Exception {
LEFT_RECORDS_STATE_NAME,
leftInputSideSpec,
leftType,
stateRetentionTime);
leftStateRetentionTime);

this.rightRecordStateView =
JoinRecordStateViews.create(
getRuntimeContext(),
RIGHT_RECORDS_STATE_NAME,
rightInputSideSpec,
rightType,
stateRetentionTime);
rightStateRetentionTime);
}

/**
Expand Down
Loading

0 comments on commit 5ba3f2b

Please sign in to comment.