From bc0c50a4ae93f174b0907485a567f736f85ecd08 Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Thu, 14 Mar 2019 13:28:31 +1000 Subject: [PATCH 1/3] Support flat mapping with multiple threads. --- .../services/pipeline/PipelineBuilder.java | 59 +++++++++++++++---- .../pipeline/PipelineBuilderTest.java | 36 +++++++++++ 2 files changed, 83 insertions(+), 12 deletions(-) diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java index 6cb13a239c..1c324ba6de 100644 --- a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java @@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Stream; /** @@ -119,8 +120,8 @@ public PipelineBuilder thenProcess( } /** - * Adds a 1-to-1 processing stage to the pipeline. Multiple threads processes items in the - * pipeline concurrently with processor outputting its return value to the next stage. + * Adds a 1-to-1 processing stage to the pipeline. Multiple threads process items in the pipeline + * concurrently with processor outputting its return value to the next stage. * *

Note: The order of items is not preserved. * @@ -132,16 +133,8 @@ public PipelineBuilder thenProcess( */ public PipelineBuilder thenProcessInParallel( final String stageName, final Function processor, final int numberOfThreads) { - final Pipe newPipeEnd = new Pipe<>(bufferSize, outputCounter.labels(stageName)); - final WritePipe outputPipe = new SharedWritePipe<>(newPipeEnd, numberOfThreads); - final ArrayList newStages = new ArrayList<>(stages); - for (int i = 0; i < numberOfThreads; i++) { - final Stage processStage = - new ProcessingStage<>(stageName, pipeEnd, outputPipe, new MapProcessor<>(processor)); - newStages.add(processStage); - } - return new PipelineBuilder<>( - inputPipe, newStages, concat(pipes, newPipeEnd), newPipeEnd, bufferSize, outputCounter); + return thenProcessInParallel( + stageName, () -> new MapProcessor<>(processor), numberOfThreads, bufferSize); } /** @@ -206,6 +199,31 @@ public PipelineBuilder thenFlatMap( return addStage(new FlatMapProcessor<>(mapper), newBufferSize, stageName); } + /** + * Adds a 1-to-many processing stage to the pipeline. For each item in the stream, mapper + * is called and each item of the {@link Stream} it returns is output as an individual item. The + * returned Stream may be empty to remove an item. Multiple threads process items in the pipeline + * concurrently. + * + *

This can be used to reverse the effect of {@link #inBatches(String, int)} with: + * + *

thenFlatMap(List::stream, newBufferSize)
+ * + * @param stageName the name of this stage. Used as the label for the output count metric. + * @param mapper the function to process each item with. + * @param newBufferSize the output buffer size to use from this stage onwards. + * @param the type of items to be output from this stage. + * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. + */ + public PipelineBuilder thenFlatMapInParallel( + final String stageName, + final Function> mapper, + final int numberOfThreads, + final int newBufferSize) { + return thenProcessInParallel( + stageName, () -> new FlatMapProcessor<>(mapper), numberOfThreads, newBufferSize); + } + /** * End the pipeline with a {@link Consumer} that is the last stage of the pipeline. * @@ -221,6 +239,23 @@ public Pipeline andFinishWith(final String stageName, final Consumer complete new CompleterStage<>(stageName, pipeEnd, completer, outputCounter.labels(stageName))); } + private PipelineBuilder thenProcessInParallel( + final String stageName, + final Supplier> createProcessor, + final int numberOfThreads, + final int newBufferSize) { + final Pipe newPipeEnd = new Pipe<>(newBufferSize, outputCounter.labels(stageName)); + final WritePipe outputPipe = new SharedWritePipe<>(newPipeEnd, numberOfThreads); + final ArrayList newStages = new ArrayList<>(stages); + for (int i = 0; i < numberOfThreads; i++) { + final Stage processStage = + new ProcessingStage<>(stageName, pipeEnd, outputPipe, createProcessor.get()); + newStages.add(processStage); + } + return new PipelineBuilder<>( + inputPipe, newStages, concat(pipes, newPipeEnd), newPipeEnd, newBufferSize, outputCounter); + } + private PipelineBuilder addStage(final Processor processor, final String stageName) { return addStage(processor, bufferSize, stageName); } diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java index c01ab867dd..e79e0cebd3 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java @@ -232,6 +232,42 @@ public void shouldProcessInParallel() throws Exception { "1", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "2"); } + @Test + public void shouldFlatMapInParallel() throws Exception { + final List output = synchronizedList(new ArrayList<>()); + final CountDownLatch latch = new CountDownLatch(1); + final Pipeline pipeline = + PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) + .thenFlatMapInParallel( + "stageName", + value -> { + if (value == 2) { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return Stream.of(value.toString()); + }, + 2, + 10) + .andFinishWith("end", output::add); + final CompletableFuture result = pipeline.start(executorService); + + // One thread will block but the other should process the remaining entries. + waitForSize(output, 14); + assertThat(result).isNotDone(); + + latch.countDown(); + + result.get(10, SECONDS); + + assertThat(output) + .containsExactly( + "1", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "2"); + } + @Test public void shouldAbortPipeline() throws Exception { final int allowProcessingUpTo = 5; From c13404de2ceb7332da4013e7747391426acf860c Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Thu, 14 Mar 2019 13:35:19 +1000 Subject: [PATCH 2/3] Add missing param. --- .../tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java | 1 + 1 file changed, 1 insertion(+) diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java index 1c324ba6de..483ea46cc9 100644 --- a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java @@ -211,6 +211,7 @@ public PipelineBuilder thenFlatMap( * * @param stageName the name of this stage. Used as the label for the output count metric. * @param mapper the function to process each item with. + * @param numberOfThreads the number of threads to use for processing. * @param newBufferSize the output buffer size to use from this stage onwards. * @param the type of items to be output from this stage. * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. From ff2e4f1236a0df7b7e2b47d715a6b82a02120873 Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Fri, 15 Mar 2019 08:23:59 +1000 Subject: [PATCH 3/3] Return multiple items in flapMapInParallel test. --- .../pantheon/services/pipeline/PipelineBuilderTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java index e79e0cebd3..011d209e88 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java @@ -248,7 +248,7 @@ public void shouldFlatMapInParallel() throws Exception { throw new RuntimeException(e); } } - return Stream.of(value.toString()); + return Stream.of(value.toString(), "x" + value); }, 2, 10) @@ -256,7 +256,7 @@ public void shouldFlatMapInParallel() throws Exception { final CompletableFuture result = pipeline.start(executorService); // One thread will block but the other should process the remaining entries. - waitForSize(output, 14); + waitForSize(output, 28); assertThat(result).isNotDone(); latch.countDown(); @@ -265,7 +265,9 @@ public void shouldFlatMapInParallel() throws Exception { assertThat(output) .containsExactly( - "1", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "2"); + "1", "x1", "3", "x3", "4", "x4", "5", "x5", "6", "x6", "7", "x7", "8", "x8", "9", "x9", + "10", "x10", "11", "x11", "12", "x12", "13", "x13", "14", "x14", "15", "x15", "2", + "x2"); } @Test