Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Handle the pipeline being aborted while finalizing an async operation. (
Browse files Browse the repository at this point in the history
  • Loading branch information
ajsutton authored Apr 30, 2019
1 parent a3bff1d commit 79ca24c
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@ public void processNextInput(final ReadPipe<I> inputPipe, final WritePipe<O> out
}

@Override
public void finalize(final WritePipe<O> outputPipe) {
while (!inProgress.isEmpty()) {
outputNextCompletedTask(outputPipe);
}
public boolean attemptFinalization(final WritePipe<O> outputPipe) {
outputNextCompletedTask(outputPipe);
return inProgress.isEmpty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ public void run() {
if (inputPipe.isAborted()) {
processor.abort();
}
processor.finalize(outputPipe);
while (!processor.attemptFinalization(outputPipe)) {
if (inputPipe.isAborted()) {
processor.abort();
break;
}
}
outputPipe.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
interface Processor<I, O> {
void processNextInput(final ReadPipe<I> inputPipe, final WritePipe<O> outputPipe);

default void finalize(final WritePipe<O> outputPipe) {}
default boolean attemptFinalization(final WritePipe<O> outputPipe) {
return true;
}

default void abort() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void shouldOutputRemainingInProgressTasksWhenFinalizing() {
task2.complete("b");

// Processing
processor.finalize(writePipe);
processor.attemptFinalization(writePipe);
verify(writePipe).put("a");
verify(writePipe).put("b");
}
Expand Down Expand Up @@ -174,7 +174,7 @@ public void shouldPreserveOrderWhenRequested() {
// And should finalize in order
task4.complete("d");
task3.complete("c");
processor.finalize(writePipe);
assertThat(processor.attemptFinalization(writePipe)).isTrue();
verify(writePipe).put("c");
verify(writePipe).put("d");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER;

import java.util.Locale;
Expand Down Expand Up @@ -51,6 +52,7 @@ public void setUp() {

@Test
public void shouldCallSingleStepStageForEachInput() {
when(singleStep.attemptFinalization(outputPipe)).thenReturn(true);
inputPipe.put("A");
inputPipe.put("B");
inputPipe.put("C");
Expand All @@ -68,22 +70,42 @@ public void shouldCallSingleStepStageForEachInput() {

@Test
public void shouldFinalizeSingleStepStageAndCloseOutputPipeWhenInputCloses() {
when(singleStep.attemptFinalization(outputPipe)).thenReturn(true);
inputPipe.close();

stage.run();

verify(singleStep).finalize(outputPipe);
verify(singleStep).attemptFinalization(outputPipe);
verifyNoMoreInteractions(singleStep);
assertThat(outputPipe.isOpen()).isFalse();
}

@Test
public void shouldAbortIfPipeIsCancelledWhileAttemptingToFinalise() {
when(singleStep.attemptFinalization(outputPipe))
.thenAnswer(
invocation -> {
inputPipe.abort();
return false;
});
inputPipe.close();

stage.run();

verify(singleStep).attemptFinalization(outputPipe);
verify(singleStep).abort();
verifyNoMoreInteractions(singleStep);
assertThat(outputPipe.isOpen()).isFalse();
}

@Test
public void shouldAbortProcessorIfReadPipeIsAborted() {
when(singleStep.attemptFinalization(outputPipe)).thenReturn(true);
inputPipe.abort();
stage.run();

verify(singleStep).abort();
verify(singleStep).finalize(outputPipe);
verify(singleStep).attemptFinalization(outputPipe);
assertThat(outputPipe.isOpen()).isFalse();
}
}

0 comments on commit 79ca24c

Please sign in to comment.