Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31963][state] Fix rescaling bug in recovery from unaligned checkpoints. #22584

Merged
merged 3 commits into from
May 16, 2023

Conversation

StefanRRichter
Copy link
Contributor

@StefanRRichter StefanRRichter commented May 15, 2023

[FLINK-31963]

What is the purpose of the change

This commit fixes problems in StateAssignmentOperation for unaligned checkpoints with stateless operators that have upstream operators with output partition state or downstream operators with input channel state. With this fix, state assignment does now consider such upstream/downstream states for rescaling and no longer skip the state reassignment in such cases for otherwise stateless operators.

Brief change log

  • Checking for upstream result partitions and downstream input channel states in StateAssignmentOperation before skipping reassignment.
  • Unit test
  • IT test

Verifying this change

This change added tests and can be verified as follows:

  • Run the added IT tests in UnalignedCheckpointRescaleITCase
  • Run the added unit tests in StateAssignmentOperationTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes: rescaling state)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

This commit fixes problems in StateAssignmentOperation for unaligned checkpoints with stateless operators that have upstream operators with output partition state or downstream operators with input channel state.
@flinkbot
Copy link
Collaborator

flinkbot commented May 15, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % some minor comments. Let's also verify CI results.

Comment on lines 832 to 836
if ((upstreamOpState == null && downstreamOpState == null)
|| (upstreamOpState != null && downstreamOpState != null)) {
// Either upstream or downstream state must exist, but not both.
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkArgument(..., "Either upstream or downstream state must exist, but not both")?

Comment on lines 882 to 885
List<TaskStateSnapshot> upstreamRescalingDescriptors =
getRescalingDescriptorsFromVertex(upstreamExecutionJobVertex);
List<TaskStateSnapshot> downstreamRescalingDescriptors =
getRescalingDescriptorsFromVertex(downstreamExecutionJobVertex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename RescalingDescriptors -> TaskStateSnapshots? You are obtaining descriptors from the TaskStateSnapshot in the next step within checkMappings.

Comment on lines 887 to 895
checkMappings(
upstreamRescalingDescriptors,
TaskStateSnapshot::getOutputRescalingDescriptor,
expectedUpstreamCount);

checkMappings(
downstreamRescalingDescriptors,
TaskStateSnapshot::getInputRescalingDescriptor,
expectedDownstreamCount);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: instead of lambda functions I would accept a little bit of code deduplication and replace those calls with:

        checkMappings(
                upstreamTaskStateSnapshots.stream().map(TaskStateSnapshot::getOutputRescalingDescriptor),
                expectedUpstreamCount);

        checkMappings(
                downstreamTaskStateSnapshots.stream().map(TaskStateSnapshot::getInputRescalingDescriptor),
                expectedDownstreamCount);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, but what is this improving?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When reading the code of checkMappings it's tricky to understand what does the extractFn do. But feel free to ignore this comment.

Comment on lines +902 to +913
Assert.assertEquals(
expectedCount,
taskStateSnapshots.stream()
.map(extractFun)
.mapToInt(
x -> {
int len = x.getOldSubtaskIndexes(0).length;
// Assert that there is a mapping.
Assert.assertTrue(len > 0);
return len;
})
.sum());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of asserting length of the mappings, should we assert the actual mappings? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this and decided to keep the test targeted at just checking that a remapping has happened. I'd hope there are already tests that check the correctness of such remappings thoroughly.

Comment on lines +481 to +482
@Parameterized.Parameters(
name = "{0} {1} from {2} to {3}, sourceSleepMs = {4}, buffersPerChannel = {5}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a comment above this line explaining why do we want to have non zero sourceSleepMs sometimes. That we want to test the rescaling without backpressure with only occasional a couple of captured in-flight records .

Copy link
Contributor

@rkhachatryan rkhachatryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines +484 to +488
// We use `sourceSleepMs` > 0 to test rescaling without backpressure and only very few
// captured in-flight records, see FLINK-31963.
Object[][] parameters =
new Object[][] {
new Object[] {"downscale", Topology.KEYED_DIFFERENT_PARALLELISM, 12, 7},
new Object[] {"upscale", Topology.KEYED_DIFFERENT_PARALLELISM, 7, 12},
new Object[] {"downscale", Topology.KEYED_BROADCAST, 7, 2},
new Object[] {"upscale", Topology.KEYED_BROADCAST, 2, 7},
new Object[] {"downscale", Topology.BROADCAST, 5, 2},
new Object[] {"upscale", Topology.BROADCAST, 2, 5},
new Object[] {"upscale", Topology.PIPELINE, 1, 2},
new Object[] {"upscale", Topology.PIPELINE, 2, 3},
new Object[] {"upscale", Topology.PIPELINE, 3, 7},
new Object[] {"upscale", Topology.PIPELINE, 4, 8},
new Object[] {"upscale", Topology.PIPELINE, 20, 21},
new Object[] {"downscale", Topology.PIPELINE, 2, 1},
new Object[] {"downscale", Topology.PIPELINE, 3, 2},
new Object[] {"downscale", Topology.PIPELINE, 7, 3},
new Object[] {"downscale", Topology.PIPELINE, 8, 4},
new Object[] {"downscale", Topology.PIPELINE, 21, 20},
new Object[] {"no scale", Topology.PIPELINE, 1, 1},
new Object[] {"no scale", Topology.PIPELINE, 3, 3},
new Object[] {"no scale", Topology.PIPELINE, 7, 7},
new Object[] {"no scale", Topology.PIPELINE, 20, 20},
new Object[] {"upscale", Topology.UNION, 1, 2},
new Object[] {"upscale", Topology.UNION, 2, 3},
new Object[] {"upscale", Topology.UNION, 3, 7},
new Object[] {"downscale", Topology.UNION, 2, 1},
new Object[] {"downscale", Topology.UNION, 3, 2},
new Object[] {"downscale", Topology.UNION, 7, 3},
new Object[] {"no scale", Topology.UNION, 1, 1},
new Object[] {"no scale", Topology.UNION, 7, 7},
new Object[] {"upscale", Topology.MULTI_INPUT, 1, 2},
new Object[] {"upscale", Topology.MULTI_INPUT, 2, 3},
new Object[] {"upscale", Topology.MULTI_INPUT, 3, 7},
new Object[] {"downscale", Topology.MULTI_INPUT, 2, 1},
new Object[] {"downscale", Topology.MULTI_INPUT, 3, 2},
new Object[] {"downscale", Topology.MULTI_INPUT, 7, 3},
new Object[] {"no scale", Topology.MULTI_INPUT, 1, 1},
new Object[] {"no scale", Topology.MULTI_INPUT, 7, 7},
new Object[] {"downscale", Topology.KEYED_DIFFERENT_PARALLELISM, 12, 7, 0L},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd consider combining a limited set of manually-crafted cases with a randomly generated ones (different on each run).
That would increase the coverage a bit given that there are a lot of runs on the CI.

@StefanRRichter StefanRRichter merged commit 354c0f4 into apache:master May 16, 2023
StefanRRichter added a commit to StefanRRichter/flink that referenced this pull request May 16, 2023
…ckpoints. (apache#22584)

This commit fixes problems in StateAssignmentOperation for unaligned checkpoints with stateless operators that have upstream operators with output partition state or downstream operators with input channel state.

(cherry picked from commit 354c0f4)
StefanRRichter added a commit to StefanRRichter/flink that referenced this pull request May 16, 2023
…ckpoints. (apache#22584)

This commit fixes problems in StateAssignmentOperation for unaligned checkpoints with stateless operators that have upstream operators with output partition state or downstream operators with input channel state.

(cherry picked from commit 354c0f4)
StefanRRichter added a commit that referenced this pull request May 16, 2023
…ckpoints. (#22584) (#22595)

This commit fixes problems in StateAssignmentOperation for unaligned checkpoints with stateless operators that have upstream operators with output partition state or downstream operators with input channel state.

(cherry picked from commit 354c0f4)
StefanRRichter added a commit that referenced this pull request May 17, 2023
…ckpoints. (#22584) (#22594)

This commit fixes problems in StateAssignmentOperation for unaligned checkpoints with stateless operators that have upstream operators with output partition state or downstream operators with input channel state.

(cherry picked from commit 354c0f4)
RocMarshal pushed a commit to RocMarshal/flink that referenced this pull request May 9, 2024
…ckpoints. (apache#22584) (apache#22594)

This commit fixes problems in StateAssignmentOperation for unaligned checkpoints with stateless operators that have upstream operators with output partition state or downstream operators with input channel state.

(cherry picked from commit 354c0f4)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants