Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Support flat mapping with multiple threads #1098

Merged
merged 4 commits into from
Mar 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

/**
Expand Down Expand Up @@ -119,8 +120,8 @@ public <O> PipelineBuilder<O> thenProcess(
}

/**
* Adds a 1-to-1 processing stage to the pipeline. Multiple threads processes items in the
* pipeline concurrently with <i>processor</i> outputting its return value to the next stage.
* Adds a 1-to-1 processing stage to the pipeline. Multiple threads process items in the pipeline
* concurrently with <i>processor</i> outputting its return value to the next stage.
*
* <p>Note: The order of items is not preserved.
*
Expand All @@ -132,16 +133,8 @@ public <O> PipelineBuilder<O> thenProcess(
*/
public <O> PipelineBuilder<O> thenProcessInParallel(
final String stageName, final Function<T, O> processor, final int numberOfThreads) {
final Pipe<O> newPipeEnd = new Pipe<>(bufferSize, outputCounter.labels(stageName));
final WritePipe<O> outputPipe = new SharedWritePipe<>(newPipeEnd, numberOfThreads);
final ArrayList<Stage> newStages = new ArrayList<>(stages);
for (int i = 0; i < numberOfThreads; i++) {
final Stage processStage =
new ProcessingStage<>(stageName, pipeEnd, outputPipe, new MapProcessor<>(processor));
newStages.add(processStage);
}
return new PipelineBuilder<>(
inputPipe, newStages, concat(pipes, newPipeEnd), newPipeEnd, bufferSize, outputCounter);
return thenProcessInParallel(
stageName, () -> new MapProcessor<>(processor), numberOfThreads, bufferSize);
}

/**
Expand Down Expand Up @@ -206,6 +199,32 @@ public <O> PipelineBuilder<O> thenFlatMap(
return addStage(new FlatMapProcessor<>(mapper), newBufferSize, stageName);
}

/**
* Adds a 1-to-many processing stage to the pipeline. For each item in the stream, <i>mapper</i>
* is called and each item of the {@link Stream} it returns is output as an individual item. The
* returned Stream may be empty to remove an item. Multiple threads process items in the pipeline
* concurrently.
*
* <p>This can be used to reverse the effect of {@link #inBatches(String, int)} with:
*
* <pre>thenFlatMap(List::stream, newBufferSize)</pre>
*
* @param stageName the name of this stage. Used as the label for the output count metric.
* @param mapper the function to process each item with.
* @param numberOfThreads the number of threads to use for processing.
* @param newBufferSize the output buffer size to use from this stage onwards.
* @param <O> the type of items to be output from this stage.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public <O> PipelineBuilder<O> thenFlatMapInParallel(
final String stageName,
final Function<T, Stream<O>> mapper,
final int numberOfThreads,
final int newBufferSize) {
return thenProcessInParallel(
stageName, () -> new FlatMapProcessor<>(mapper), numberOfThreads, newBufferSize);
}

/**
* End the pipeline with a {@link Consumer} that is the last stage of the pipeline.
*
Expand All @@ -221,6 +240,23 @@ public Pipeline andFinishWith(final String stageName, final Consumer<T> complete
new CompleterStage<>(stageName, pipeEnd, completer, outputCounter.labels(stageName)));
}

private <O> PipelineBuilder<O> thenProcessInParallel(
final String stageName,
final Supplier<Processor<T, O>> createProcessor,
final int numberOfThreads,
final int newBufferSize) {
final Pipe<O> newPipeEnd = new Pipe<>(newBufferSize, outputCounter.labels(stageName));
final WritePipe<O> outputPipe = new SharedWritePipe<>(newPipeEnd, numberOfThreads);
final ArrayList<Stage> newStages = new ArrayList<>(stages);
for (int i = 0; i < numberOfThreads; i++) {
final Stage processStage =
new ProcessingStage<>(stageName, pipeEnd, outputPipe, createProcessor.get());
newStages.add(processStage);
}
return new PipelineBuilder<>(
inputPipe, newStages, concat(pipes, newPipeEnd), newPipeEnd, newBufferSize, outputCounter);
}

private <O> PipelineBuilder<O> addStage(final Processor<T, O> processor, final String stageName) {
return addStage(processor, bufferSize, stageName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,44 @@ public void shouldProcessInParallel() throws Exception {
"1", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "2");
}

@Test
public void shouldFlatMapInParallel() throws Exception {
final List<String> output = synchronizedList(new ArrayList<>());
final CountDownLatch latch = new CountDownLatch(1);
final Pipeline pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER)
.thenFlatMapInParallel(
"stageName",
value -> {
if (value == 2) {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return Stream.of(value.toString(), "x" + value);
},
2,
10)
.andFinishWith("end", output::add);
final CompletableFuture<?> result = pipeline.start(executorService);

// One thread will block but the other should process the remaining entries.
waitForSize(output, 28);
assertThat(result).isNotDone();

latch.countDown();

result.get(10, SECONDS);

assertThat(output)
.containsExactly(
"1", "x1", "3", "x3", "4", "x4", "5", "x5", "6", "x6", "7", "x7", "8", "x8", "9", "x9",
"10", "x10", "11", "x11", "12", "x12", "13", "x13", "14", "x14", "15", "x15", "2",
"x2");
}

@Test
public void shouldAbortPipeline() throws Exception {
final int allowProcessingUpTo = 5;
Expand Down