Skip to content

Commit

Permalink
Review comments Roman.
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanRRichter committed May 16, 2023
1 parent 419f0db commit 5fc3c0a
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class TaskStateAssignment {

@Nullable private TaskStateAssignment[] downstreamAssignments;
@Nullable private TaskStateAssignment[] upstreamAssignments;
@Nullable private Boolean hasUpstreamOutputStates;
@Nullable private Boolean hasDownstreamInputStates;

private final Map<IntermediateDataSetID, TaskStateAssignment> consumerAssignment;
private final Map<ExecutionJobVertex, TaskStateAssignment> vertexAssignments;
Expand Down Expand Up @@ -202,13 +204,21 @@ public OperatorSubtaskState getSubtaskState(OperatorInstanceID instanceID) {
}

public boolean hasUpstreamOutputStates() {
return Arrays.stream(getUpstreamAssignments())
.anyMatch(assignment -> assignment.hasOutputState);
if (hasUpstreamOutputStates == null) {
hasUpstreamOutputStates =
Arrays.stream(getUpstreamAssignments())
.anyMatch(assignment -> assignment.hasOutputState);
}
return hasUpstreamOutputStates;
}

public boolean hasDownstreamInputStates() {
return Arrays.stream(getDownstreamAssignments())
.anyMatch(assignment -> assignment.hasInputState);
if (hasDownstreamInputStates == null) {
hasDownstreamInputStates =
Arrays.stream(getDownstreamAssignments())
.anyMatch(assignment -> assignment.hasInputState);
}
return hasDownstreamInputStates;
}

private InflightDataGateOrPartitionRescalingDescriptor log(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -880,9 +880,9 @@ private void testOnlyUpstreamOrDownstreamRescalingInternal(
ExecutionJobVertex downstreamExecutionJobVertex = vertices.get(operatorIds.get(1));

List<TaskStateSnapshot> upstreamTaskStateSnapshots =
getRescalingDescriptorsFromVertex(upstreamExecutionJobVertex);
getTaskStateSnapshotFromVertex(upstreamExecutionJobVertex);
List<TaskStateSnapshot> downstreamTaskStateSnapshots =
getRescalingDescriptorsFromVertex(downstreamExecutionJobVertex);
getTaskStateSnapshotFromVertex(downstreamExecutionJobVertex);

checkMappings(
upstreamTaskStateSnapshots,
Expand Down Expand Up @@ -1192,7 +1192,7 @@ private JobVertex createJobVertex(
return jobVertex;
}

private List<TaskStateSnapshot> getRescalingDescriptorsFromVertex(
private List<TaskStateSnapshot> getTaskStateSnapshotFromVertex(
ExecutionJobVertex executionJobVertex) {
return Arrays.stream(executionJobVertex.getTaskVertices())
.map(ExecutionVertex::getCurrentExecutionAttempt)
Expand Down

0 comments on commit 5fc3c0a

Please sign in to comment.