diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java index 6cb13a239c..483ea46cc9 100644 --- a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java @@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Stream; /** @@ -119,8 +120,8 @@ public PipelineBuilder thenProcess( } /** - * Adds a 1-to-1 processing stage to the pipeline. Multiple threads processes items in the - * pipeline concurrently with processor outputting its return value to the next stage. + * Adds a 1-to-1 processing stage to the pipeline. Multiple threads process items in the pipeline + * concurrently with processor outputting its return value to the next stage. * *

Note: The order of items is not preserved. * @@ -132,16 +133,8 @@ public PipelineBuilder thenProcess( */ public PipelineBuilder thenProcessInParallel( final String stageName, final Function processor, final int numberOfThreads) { - final Pipe newPipeEnd = new Pipe<>(bufferSize, outputCounter.labels(stageName)); - final WritePipe outputPipe = new SharedWritePipe<>(newPipeEnd, numberOfThreads); - final ArrayList newStages = new ArrayList<>(stages); - for (int i = 0; i < numberOfThreads; i++) { - final Stage processStage = - new ProcessingStage<>(stageName, pipeEnd, outputPipe, new MapProcessor<>(processor)); - newStages.add(processStage); - } - return new PipelineBuilder<>( - inputPipe, newStages, concat(pipes, newPipeEnd), newPipeEnd, bufferSize, outputCounter); + return thenProcessInParallel( + stageName, () -> new MapProcessor<>(processor), numberOfThreads, bufferSize); } /** @@ -206,6 +199,32 @@ public PipelineBuilder thenFlatMap( return addStage(new FlatMapProcessor<>(mapper), newBufferSize, stageName); } + /** + * Adds a 1-to-many processing stage to the pipeline. For each item in the stream, mapper + * is called and each item of the {@link Stream} it returns is output as an individual item. The + * returned Stream may be empty to remove an item. Multiple threads process items in the pipeline + * concurrently. + * + *

This can be used to reverse the effect of {@link #inBatches(String, int)} with: + * + *

thenFlatMap(List::stream, newBufferSize)
+ * + * @param stageName the name of this stage. Used as the label for the output count metric. + * @param mapper the function to process each item with. + * @param numberOfThreads the number of threads to use for processing. + * @param newBufferSize the output buffer size to use from this stage onwards. + * @param the type of items to be output from this stage. + * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. + */ + public PipelineBuilder thenFlatMapInParallel( + final String stageName, + final Function> mapper, + final int numberOfThreads, + final int newBufferSize) { + return thenProcessInParallel( + stageName, () -> new FlatMapProcessor<>(mapper), numberOfThreads, newBufferSize); + } + /** * End the pipeline with a {@link Consumer} that is the last stage of the pipeline. * @@ -221,6 +240,23 @@ public Pipeline andFinishWith(final String stageName, final Consumer complete new CompleterStage<>(stageName, pipeEnd, completer, outputCounter.labels(stageName))); } + private PipelineBuilder thenProcessInParallel( + final String stageName, + final Supplier> createProcessor, + final int numberOfThreads, + final int newBufferSize) { + final Pipe newPipeEnd = new Pipe<>(newBufferSize, outputCounter.labels(stageName)); + final WritePipe outputPipe = new SharedWritePipe<>(newPipeEnd, numberOfThreads); + final ArrayList newStages = new ArrayList<>(stages); + for (int i = 0; i < numberOfThreads; i++) { + final Stage processStage = + new ProcessingStage<>(stageName, pipeEnd, outputPipe, createProcessor.get()); + newStages.add(processStage); + } + return new PipelineBuilder<>( + inputPipe, newStages, concat(pipes, newPipeEnd), newPipeEnd, newBufferSize, outputCounter); + } + private PipelineBuilder addStage(final Processor processor, final String stageName) { return addStage(processor, bufferSize, stageName); } diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java index c01ab867dd..011d209e88 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java @@ -232,6 +232,44 @@ public void shouldProcessInParallel() throws Exception { "1", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "2"); } + @Test + public void shouldFlatMapInParallel() throws Exception { + final List output = synchronizedList(new ArrayList<>()); + final CountDownLatch latch = new CountDownLatch(1); + final Pipeline pipeline = + PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) + .thenFlatMapInParallel( + "stageName", + value -> { + if (value == 2) { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return Stream.of(value.toString(), "x" + value); + }, + 2, + 10) + .andFinishWith("end", output::add); + final CompletableFuture result = pipeline.start(executorService); + + // One thread will block but the other should process the remaining entries. + waitForSize(output, 28); + assertThat(result).isNotDone(); + + latch.countDown(); + + result.get(10, SECONDS); + + assertThat(output) + .containsExactly( + "1", "x1", "3", "x3", "4", "x4", "5", "x5", "6", "x6", "7", "x7", "8", "x8", "9", "x9", + "10", "x10", "11", "x11", "12", "x12", "13", "x13", "14", "x14", "15", "x15", "2", + "x2"); + } + @Test public void shouldAbortPipeline() throws Exception { final int allowProcessingUpTo = 5;