Skip to content

Commit

Permalink
getProcessingTimesByStep returns a modifiable map (#29786) (#29799)
Browse files Browse the repository at this point in the history
* getProcessingTimesByStep returns a modifiable map

* return copy in getProcessingTimesByStep and update name accordingly

* Spotless

---------

Co-authored-by: clmccart <[email protected]>
Co-authored-by: Claire McCarthy <[email protected]>
  • Loading branch information
3 people authored Dec 18, 2023
1 parent 667027d commit ea477dd
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,9 @@ public Optional<ActiveMessageMetadata> getActiveMessageMetadata() {
return Optional.ofNullable(activeMessageMetadata);
}

public Map<String, IntSummaryStatistics> getProcessingTimesByStep() {
return Collections.unmodifiableMap(processingTimesByStep);
public Map<String, IntSummaryStatistics> getProcessingTimesByStepCopy() {
Map<String, IntSummaryStatistics> processingTimesCopy = processingTimesByStep;
return processingTimesCopy;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public void removeTracker(ExecutionStateTracker tracker) {
return;
}
DataflowExecutionStateTracker dfTracker = (DataflowExecutionStateTracker) tracker;
completedProcessingMetrics.put(dfTracker.getWorkItemId(), dfTracker.getProcessingTimesByStep());
completedProcessingMetrics.put(
dfTracker.getWorkItemId(), dfTracker.getProcessingTimesByStepCopy());
activeTrackersByWorkId.remove(dfTracker.getWorkItemId());

// Attribute any remaining time since the last sampling while removing the tracker.
Expand Down Expand Up @@ -126,7 +127,7 @@ public Map<String, IntSummaryStatistics> getProcessingDistributionsForWorkId(Str
DataflowExecutionStateTracker tracker = activeTrackersByWorkId.get(workId);
return mergeStepStatsMaps(
completedProcessingMetrics.getOrDefault(workId, new HashMap<>()),
tracker.getProcessingTimesByStep());
tracker.getProcessingTimesByStepCopy());
}

public void resetForWorkId(String workId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void testDataflowExecutionStateTrackerRecordsCompletedProcessingTimes()
tracker.enterState(newState);

// The first completed state should be recorded and the new state should be active.
Map<String, IntSummaryStatistics> gotProcessingTimes = tracker.getProcessingTimesByStep();
Map<String, IntSummaryStatistics> gotProcessingTimes = tracker.getProcessingTimesByStepCopy();
Assert.assertEquals(1, gotProcessingTimes.size());
Assert.assertEquals(
new HashSet<>(Arrays.asList(NameContextsForTests.nameContextForTest().userName())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void testRemoveTrackerCompletedProcessingTimesGetsUpdated() {
Map<String, IntSummaryStatistics> testCompletedProcessingTimes = new HashMap<>();
testCompletedProcessingTimes.put("some-step", new IntSummaryStatistics());
DataflowExecutionStateTracker trackerMock = createMockTracker(workId);
when(trackerMock.getProcessingTimesByStep()).thenReturn(testCompletedProcessingTimes);
when(trackerMock.getProcessingTimesByStepCopy()).thenReturn(testCompletedProcessingTimes);

sampler.addTracker(trackerMock);
sampler.removeTracker(trackerMock);
Expand All @@ -98,7 +98,7 @@ public void testGetCompletedProcessingTimesAndActiveMessageFromActiveTracker() {
ActiveMessageMetadata.create(step1act1.getStepName().userName(), clock.getMillis());
DataflowExecutionStateTracker trackerMock = createMockTracker(workId);
when(trackerMock.getActiveMessageMetadata()).thenReturn(Optional.of(testMetadata));
when(trackerMock.getProcessingTimesByStep()).thenReturn(testCompletedProcessingTimes);
when(trackerMock.getProcessingTimesByStepCopy()).thenReturn(testCompletedProcessingTimes);

sampler.addTracker(trackerMock);

Expand All @@ -122,21 +122,21 @@ public void testResetForWorkIdClearsMaps() {
equalTo(tracker1Mock.getActiveMessageMetadata()));
assertThat(
sampler.getProcessingDistributionsForWorkId(workId1),
equalTo(tracker1Mock.getProcessingTimesByStep()));
equalTo(tracker1Mock.getProcessingTimesByStepCopy()));
assertThat(
sampler.getActiveMessageMetadataForWorkId(workId2),
equalTo(tracker2Mock.getActiveMessageMetadata()));
assertThat(
sampler.getProcessingDistributionsForWorkId(workId2),
equalTo(tracker2Mock.getProcessingTimesByStep()));
equalTo(tracker2Mock.getProcessingTimesByStepCopy()));

sampler.removeTracker(tracker1Mock);
sampler.removeTracker(tracker2Mock);
sampler.resetForWorkId(workId2);

assertThat(
sampler.getProcessingDistributionsForWorkId(workId1),
equalTo(tracker1Mock.getProcessingTimesByStep()));
equalTo(tracker1Mock.getProcessingTimesByStepCopy()));
Assert.assertTrue(sampler.getProcessingDistributionsForWorkId(workId2).isEmpty());
}

Expand Down

0 comments on commit ea477dd

Please sign in to comment.