diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/CompleterStage.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/CompleterStage.java index 4d83fcc058..0a14b36642 100644 --- a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/CompleterStage.java +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/CompleterStage.java @@ -17,17 +17,22 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; -class CompleterStage implements Runnable { +class CompleterStage implements Stage { private final ReadPipe input; private final Consumer completer; private final Counter outputCounter; + private final String name; private final CompletableFuture future = new CompletableFuture<>(); CompleterStage( - final ReadPipe input, final Consumer completer, final Counter outputCounter) { + final String name, + final ReadPipe input, + final Consumer completer, + final Counter outputCounter) { this.input = input; this.completer = completer; this.outputCounter = outputCounter; + this.name = name; } @Override @@ -45,4 +50,9 @@ public void run() { public CompletableFuture getFuture() { return future; } + + @Override + public String getName() { + return name; + } } diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStage.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStage.java index a393476e8b..d3d8be69d2 100644 --- a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStage.java +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStage.java @@ -14,13 +14,15 @@ import java.util.Iterator; -class IteratorSourceStage implements Runnable { +class IteratorSourceStage implements Stage { private final Iterator source; private final Pipe pipe; + private final String name; - IteratorSourceStage(final Iterator source, final Pipe pipe) { + IteratorSourceStage(final String name, final Iterator source, final Pipe pipe) { this.source = source; this.pipe = pipe; + this.name = name; } @Override @@ -33,4 +35,9 @@ public void run() { } pipe.close(); } + + @Override + public String getName() { + return name; + } } diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipeline.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipeline.java index c4cc851474..8cb3441604 100644 --- a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipeline.java +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipeline.java @@ -31,7 +31,7 @@ public class Pipeline { private static final Logger LOG = LogManager.getLogger(); private final Pipe inputPipe; - private final Collection stages; + private final Collection stages; private final Collection> pipes; private final CompleterStage completerStage; private final AtomicBoolean started = new AtomicBoolean(false); @@ -49,7 +49,7 @@ public class Pipeline { Pipeline( final Pipe inputPipe, - final Collection stages, + final Collection stages, final Collection> pipes, final CompleterStage completerStage) { this.inputPipe = inputPipe; @@ -109,11 +109,13 @@ public void abort() { abort(exception); } - private Future runWithErrorHandling( - final ExecutorService executorService, final Runnable task) { + private Future runWithErrorHandling(final ExecutorService executorService, final Stage task) { return executorService.submit( () -> { + final Thread thread = Thread.currentThread(); + final String originalName = thread.getName(); try { + thread.setName(originalName + " (" + task.getName() + ")"); task.run(); } catch (final Throwable t) { LOG.debug("Unhandled exception in pipeline. Aborting.", t); @@ -125,6 +127,8 @@ private Future runWithErrorHandling( // need to call get on it which would normally expose the error. LOG.error("Failed to abort pipeline after error", t2); } + } finally { + thread.setName(originalName); } }); } diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java index 666378b103..6cb13a239c 100644 --- a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java @@ -41,7 +41,7 @@ public class PipelineBuilder { private final Pipe inputPipe; - private final Collection stages; + private final Collection stages; private final Collection> pipes; private final ReadPipe pipeEnd; private final int bufferSize; @@ -49,7 +49,7 @@ public class PipelineBuilder { public PipelineBuilder( final Pipe inputPipe, - final Collection stages, + final Collection stages, final Collection> pipes, final ReadPipe pipeEnd, final int bufferSize, @@ -82,7 +82,7 @@ public static PipelineBuilder createPipelineFrom( final int bufferSize, final LabelledMetric outputCounter) { final Pipe pipe = new Pipe<>(bufferSize, outputCounter.labels(sourceName)); - final IteratorSourceStage sourceStage = new IteratorSourceStage<>(source, pipe); + final IteratorSourceStage sourceStage = new IteratorSourceStage<>(sourceName, source, pipe); return new PipelineBuilder<>( pipe, singleton(sourceStage), singleton(pipe), pipe, bufferSize, outputCounter); } @@ -134,10 +134,10 @@ public PipelineBuilder thenProcessInParallel( final String stageName, final Function processor, final int numberOfThreads) { final Pipe newPipeEnd = new Pipe<>(bufferSize, outputCounter.labels(stageName)); final WritePipe outputPipe = new SharedWritePipe<>(newPipeEnd, numberOfThreads); - final ArrayList newStages = new ArrayList<>(stages); + final ArrayList newStages = new ArrayList<>(stages); for (int i = 0; i < numberOfThreads; i++) { - final Runnable processStage = - new ProcessingStage<>(pipeEnd, outputPipe, new MapProcessor<>(processor)); + final Stage processStage = + new ProcessingStage<>(stageName, pipeEnd, outputPipe, new MapProcessor<>(processor)); newStages.add(processStage); } return new PipelineBuilder<>( @@ -218,7 +218,7 @@ public Pipeline andFinishWith(final String stageName, final Consumer complete inputPipe, stages, pipes, - new CompleterStage<>(pipeEnd, completer, outputCounter.labels(stageName))); + new CompleterStage<>(stageName, pipeEnd, completer, outputCounter.labels(stageName))); } private PipelineBuilder addStage(final Processor processor, final String stageName) { @@ -228,12 +228,12 @@ private PipelineBuilder addStage(final Processor processor, final S private PipelineBuilder addStage( final Processor processor, final int newBufferSize, final String stageName) { final Pipe outputPipe = new Pipe<>(newBufferSize, outputCounter.labels(stageName)); - final Runnable processStage = new ProcessingStage<>(pipeEnd, outputPipe, processor); + final Stage processStage = new ProcessingStage<>(stageName, pipeEnd, outputPipe, processor); return addStage(processStage, outputPipe); } - private PipelineBuilder addStage(final Runnable stage, final Pipe outputPipe) { - final List newStages = concat(stages, stage); + private PipelineBuilder addStage(final Stage stage, final Pipe outputPipe) { + final List newStages = concat(stages, stage); return new PipelineBuilder<>( inputPipe, newStages, concat(pipes, outputPipe), outputPipe, bufferSize, outputCounter); } diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/ProcessingStage.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/ProcessingStage.java index 1f73c7021f..90548ac41c 100644 --- a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/ProcessingStage.java +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/ProcessingStage.java @@ -12,14 +12,19 @@ */ package tech.pegasys.pantheon.services.pipeline; -class ProcessingStage implements Runnable { +class ProcessingStage implements Stage { + private final String name; private final ReadPipe inputPipe; private final WritePipe outputPipe; private final Processor processor; public ProcessingStage( - final ReadPipe inputPipe, final WritePipe outputPipe, final Processor processor) { + final String name, + final ReadPipe inputPipe, + final WritePipe outputPipe, + final Processor processor) { + this.name = name; this.inputPipe = inputPipe; this.outputPipe = outputPipe; this.processor = processor; @@ -33,4 +38,9 @@ public void run() { processor.finalize(outputPipe); outputPipe.close(); } + + @Override + public String getName() { + return name; + } } diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Stage.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Stage.java new file mode 100644 index 0000000000..cb6d689a27 --- /dev/null +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Stage.java @@ -0,0 +1,17 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +public interface Stage extends Runnable { + String getName(); +} diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/CompleterStageTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/CompleterStageTest.java index ddf9dcbe81..e7a1919ad7 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/CompleterStageTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/CompleterStageTest.java @@ -31,7 +31,7 @@ public class CompleterStageTest { private final List output = new ArrayList<>(); private final Counter outputCounter = mock(Counter.class); private final CompleterStage stage = - new CompleterStage<>(pipe, output::add, outputCounter); + new CompleterStage<>("name", pipe, output::add, outputCounter); @Test public void shouldAddItemsToOutputUntilPipeHasNoMore() { diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStageTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStageTest.java index 202ad5d088..3f47246723 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStageTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStageTest.java @@ -23,7 +23,7 @@ public class IteratorSourceStageTest { private final Pipe output = new Pipe<>(10, NO_OP_COUNTER); private final IteratorSourceStage stage = - new IteratorSourceStage<>(Iterators.forArray("a", "b", "c", "d"), output); + new IteratorSourceStage<>("name", Iterators.forArray("a", "b", "c", "d"), output); @Test public void shouldOutputEntriesThenClosePipe() { diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/ProcessingStageTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/ProcessingStageTest.java index 01e1d838e6..d800b442bf 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/ProcessingStageTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/ProcessingStageTest.java @@ -37,7 +37,7 @@ public class ProcessingStageTest { @Before public void setUp() { - stage = new ProcessingStage<>(inputPipe, outputPipe, singleStep); + stage = new ProcessingStage<>("name", inputPipe, outputPipe, singleStep); doAnswer( invocation -> { outputPipe.put(inputPipe.get().toLowerCase(Locale.UK));