-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-31963][state] Fix rescaling bug in recovery from unaligned checkpoints. #22584
[FLINK-31963][state] Fix rescaling bug in recovery from unaligned checkpoints. #22584
Conversation
This commit fixes problems in StateAssignmentOperation for unaligned checkpoints with stateless operators that have upstream operators with output partition state or downstream operators with input channel state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % some minor comments. Let's also verify CI results.
if ((upstreamOpState == null && downstreamOpState == null) | ||
|| (upstreamOpState != null && downstreamOpState != null)) { | ||
// Either upstream or downstream state must exist, but not both. | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkArgument(..., "Either upstream or downstream state must exist, but not both")
?
List<TaskStateSnapshot> upstreamRescalingDescriptors = | ||
getRescalingDescriptorsFromVertex(upstreamExecutionJobVertex); | ||
List<TaskStateSnapshot> downstreamRescalingDescriptors = | ||
getRescalingDescriptorsFromVertex(downstreamExecutionJobVertex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename RescalingDescriptors
-> TaskStateSnapshots
? You are obtaining descriptors from the TaskStateSnapshot
in the next step within checkMappings
.
checkMappings( | ||
upstreamRescalingDescriptors, | ||
TaskStateSnapshot::getOutputRescalingDescriptor, | ||
expectedUpstreamCount); | ||
|
||
checkMappings( | ||
downstreamRescalingDescriptors, | ||
TaskStateSnapshot::getInputRescalingDescriptor, | ||
expectedDownstreamCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: instead of lambda functions I would accept a little bit of code deduplication and replace those calls with:
checkMappings(
upstreamTaskStateSnapshots.stream().map(TaskStateSnapshot::getOutputRescalingDescriptor),
expectedUpstreamCount);
checkMappings(
downstreamTaskStateSnapshots.stream().map(TaskStateSnapshot::getInputRescalingDescriptor),
expectedDownstreamCount);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, but what is this improving?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When reading the code of checkMappings
it's tricky to understand what does the extractFn
do. But feel free to ignore this comment.
Assert.assertEquals( | ||
expectedCount, | ||
taskStateSnapshots.stream() | ||
.map(extractFun) | ||
.mapToInt( | ||
x -> { | ||
int len = x.getOldSubtaskIndexes(0).length; | ||
// Assert that there is a mapping. | ||
Assert.assertTrue(len > 0); | ||
return len; | ||
}) | ||
.sum()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of asserting length of the mappings, should we assert the actual mappings? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about this and decided to keep the test targeted at just checking that a remapping has happened. I'd hope there are already tests that check the correctness of such remappings thoroughly.
@Parameterized.Parameters( | ||
name = "{0} {1} from {2} to {3}, sourceSleepMs = {4}, buffersPerChannel = {5}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add a comment above this line explaining why do we want to have non zero sourceSleepMs
sometimes. That we want to test the rescaling without backpressure with only occasional a couple of captured in-flight records .
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
Show resolved
Hide resolved
...-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
// We use `sourceSleepMs` > 0 to test rescaling without backpressure and only very few | ||
// captured in-flight records, see FLINK-31963. | ||
Object[][] parameters = | ||
new Object[][] { | ||
new Object[] {"downscale", Topology.KEYED_DIFFERENT_PARALLELISM, 12, 7}, | ||
new Object[] {"upscale", Topology.KEYED_DIFFERENT_PARALLELISM, 7, 12}, | ||
new Object[] {"downscale", Topology.KEYED_BROADCAST, 7, 2}, | ||
new Object[] {"upscale", Topology.KEYED_BROADCAST, 2, 7}, | ||
new Object[] {"downscale", Topology.BROADCAST, 5, 2}, | ||
new Object[] {"upscale", Topology.BROADCAST, 2, 5}, | ||
new Object[] {"upscale", Topology.PIPELINE, 1, 2}, | ||
new Object[] {"upscale", Topology.PIPELINE, 2, 3}, | ||
new Object[] {"upscale", Topology.PIPELINE, 3, 7}, | ||
new Object[] {"upscale", Topology.PIPELINE, 4, 8}, | ||
new Object[] {"upscale", Topology.PIPELINE, 20, 21}, | ||
new Object[] {"downscale", Topology.PIPELINE, 2, 1}, | ||
new Object[] {"downscale", Topology.PIPELINE, 3, 2}, | ||
new Object[] {"downscale", Topology.PIPELINE, 7, 3}, | ||
new Object[] {"downscale", Topology.PIPELINE, 8, 4}, | ||
new Object[] {"downscale", Topology.PIPELINE, 21, 20}, | ||
new Object[] {"no scale", Topology.PIPELINE, 1, 1}, | ||
new Object[] {"no scale", Topology.PIPELINE, 3, 3}, | ||
new Object[] {"no scale", Topology.PIPELINE, 7, 7}, | ||
new Object[] {"no scale", Topology.PIPELINE, 20, 20}, | ||
new Object[] {"upscale", Topology.UNION, 1, 2}, | ||
new Object[] {"upscale", Topology.UNION, 2, 3}, | ||
new Object[] {"upscale", Topology.UNION, 3, 7}, | ||
new Object[] {"downscale", Topology.UNION, 2, 1}, | ||
new Object[] {"downscale", Topology.UNION, 3, 2}, | ||
new Object[] {"downscale", Topology.UNION, 7, 3}, | ||
new Object[] {"no scale", Topology.UNION, 1, 1}, | ||
new Object[] {"no scale", Topology.UNION, 7, 7}, | ||
new Object[] {"upscale", Topology.MULTI_INPUT, 1, 2}, | ||
new Object[] {"upscale", Topology.MULTI_INPUT, 2, 3}, | ||
new Object[] {"upscale", Topology.MULTI_INPUT, 3, 7}, | ||
new Object[] {"downscale", Topology.MULTI_INPUT, 2, 1}, | ||
new Object[] {"downscale", Topology.MULTI_INPUT, 3, 2}, | ||
new Object[] {"downscale", Topology.MULTI_INPUT, 7, 3}, | ||
new Object[] {"no scale", Topology.MULTI_INPUT, 1, 1}, | ||
new Object[] {"no scale", Topology.MULTI_INPUT, 7, 7}, | ||
new Object[] {"downscale", Topology.KEYED_DIFFERENT_PARALLELISM, 12, 7, 0L}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd consider combining a limited set of manually-crafted cases with a randomly generated ones (different on each run).
That would increase the coverage a bit given that there are a lot of runs on the CI.
…ckpoints. (apache#22584) This commit fixes problems in StateAssignmentOperation for unaligned checkpoints with stateless operators that have upstream operators with output partition state or downstream operators with input channel state. (cherry picked from commit 354c0f4)
…ckpoints. (apache#22584) This commit fixes problems in StateAssignmentOperation for unaligned checkpoints with stateless operators that have upstream operators with output partition state or downstream operators with input channel state. (cherry picked from commit 354c0f4)
…ckpoints. (apache#22584) (apache#22594) This commit fixes problems in StateAssignmentOperation for unaligned checkpoints with stateless operators that have upstream operators with output partition state or downstream operators with input channel state. (cherry picked from commit 354c0f4)
[FLINK-31963]
What is the purpose of the change
This commit fixes problems in StateAssignmentOperation for unaligned checkpoints with stateless operators that have upstream operators with output partition state or downstream operators with input channel state. With this fix, state assignment does now consider such upstream/downstream states for rescaling and no longer skip the state reassignment in such cases for otherwise stateless operators.
Brief change log
StateAssignmentOperation
before skipping reassignment.Verifying this change
This change added tests and can be verified as follows:
UnalignedCheckpointRescaleITCase
StateAssignmentOperationTest
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation