Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Add pipe stage name to thread while executing (#1097)
Browse files Browse the repository at this point in the history
Makes it easier to identify when profiling and debugging.
  • Loading branch information
ajsutton authored Mar 14, 2019
1 parent 961ba9e commit 04e7de5
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

class CompleterStage<T> implements Runnable {
class CompleterStage<T> implements Stage {
private final ReadPipe<T> input;
private final Consumer<T> completer;
private final Counter outputCounter;
private final String name;
private final CompletableFuture<?> future = new CompletableFuture<>();

CompleterStage(
final ReadPipe<T> input, final Consumer<T> completer, final Counter outputCounter) {
final String name,
final ReadPipe<T> input,
final Consumer<T> completer,
final Counter outputCounter) {
this.input = input;
this.completer = completer;
this.outputCounter = outputCounter;
this.name = name;
}

@Override
Expand All @@ -45,4 +50,9 @@ public void run() {
public CompletableFuture<?> getFuture() {
return future;
}

@Override
public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

import java.util.Iterator;

class IteratorSourceStage<T> implements Runnable {
class IteratorSourceStage<T> implements Stage {
private final Iterator<T> source;
private final Pipe<T> pipe;
private final String name;

IteratorSourceStage(final Iterator<T> source, final Pipe<T> pipe) {
IteratorSourceStage(final String name, final Iterator<T> source, final Pipe<T> pipe) {
this.source = source;
this.pipe = pipe;
this.name = name;
}

@Override
Expand All @@ -33,4 +35,9 @@ public void run() {
}
pipe.close();
}

@Override
public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
public class Pipeline {
private static final Logger LOG = LogManager.getLogger();
private final Pipe<?> inputPipe;
private final Collection<Runnable> stages;
private final Collection<Stage> stages;
private final Collection<Pipe<?>> pipes;
private final CompleterStage<?> completerStage;
private final AtomicBoolean started = new AtomicBoolean(false);
Expand All @@ -49,7 +49,7 @@ public class Pipeline {

Pipeline(
final Pipe<?> inputPipe,
final Collection<Runnable> stages,
final Collection<Stage> stages,
final Collection<Pipe<?>> pipes,
final CompleterStage<?> completerStage) {
this.inputPipe = inputPipe;
Expand Down Expand Up @@ -109,11 +109,13 @@ public void abort() {
abort(exception);
}

private Future<?> runWithErrorHandling(
final ExecutorService executorService, final Runnable task) {
private Future<?> runWithErrorHandling(final ExecutorService executorService, final Stage task) {
return executorService.submit(
() -> {
final Thread thread = Thread.currentThread();
final String originalName = thread.getName();
try {
thread.setName(originalName + " (" + task.getName() + ")");
task.run();
} catch (final Throwable t) {
LOG.debug("Unhandled exception in pipeline. Aborting.", t);
Expand All @@ -125,6 +127,8 @@ private Future<?> runWithErrorHandling(
// need to call get on it which would normally expose the error.
LOG.error("Failed to abort pipeline after error", t2);
}
} finally {
thread.setName(originalName);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@
public class PipelineBuilder<T> {

private final Pipe<?> inputPipe;
private final Collection<Runnable> stages;
private final Collection<Stage> stages;
private final Collection<Pipe<?>> pipes;
private final ReadPipe<T> pipeEnd;
private final int bufferSize;
private final LabelledMetric<Counter> outputCounter;

public PipelineBuilder(
final Pipe<?> inputPipe,
final Collection<Runnable> stages,
final Collection<Stage> stages,
final Collection<Pipe<?>> pipes,
final ReadPipe<T> pipeEnd,
final int bufferSize,
Expand Down Expand Up @@ -82,7 +82,7 @@ public static <T> PipelineBuilder<T> createPipelineFrom(
final int bufferSize,
final LabelledMetric<Counter> outputCounter) {
final Pipe<T> pipe = new Pipe<>(bufferSize, outputCounter.labels(sourceName));
final IteratorSourceStage<T> sourceStage = new IteratorSourceStage<>(source, pipe);
final IteratorSourceStage<T> sourceStage = new IteratorSourceStage<>(sourceName, source, pipe);
return new PipelineBuilder<>(
pipe, singleton(sourceStage), singleton(pipe), pipe, bufferSize, outputCounter);
}
Expand Down Expand Up @@ -134,10 +134,10 @@ public <O> PipelineBuilder<O> thenProcessInParallel(
final String stageName, final Function<T, O> processor, final int numberOfThreads) {
final Pipe<O> newPipeEnd = new Pipe<>(bufferSize, outputCounter.labels(stageName));
final WritePipe<O> outputPipe = new SharedWritePipe<>(newPipeEnd, numberOfThreads);
final ArrayList<Runnable> newStages = new ArrayList<>(stages);
final ArrayList<Stage> newStages = new ArrayList<>(stages);
for (int i = 0; i < numberOfThreads; i++) {
final Runnable processStage =
new ProcessingStage<>(pipeEnd, outputPipe, new MapProcessor<>(processor));
final Stage processStage =
new ProcessingStage<>(stageName, pipeEnd, outputPipe, new MapProcessor<>(processor));
newStages.add(processStage);
}
return new PipelineBuilder<>(
Expand Down Expand Up @@ -218,7 +218,7 @@ public Pipeline andFinishWith(final String stageName, final Consumer<T> complete
inputPipe,
stages,
pipes,
new CompleterStage<>(pipeEnd, completer, outputCounter.labels(stageName)));
new CompleterStage<>(stageName, pipeEnd, completer, outputCounter.labels(stageName)));
}

private <O> PipelineBuilder<O> addStage(final Processor<T, O> processor, final String stageName) {
Expand All @@ -228,12 +228,12 @@ private <O> PipelineBuilder<O> addStage(final Processor<T, O> processor, final S
private <O> PipelineBuilder<O> addStage(
final Processor<T, O> processor, final int newBufferSize, final String stageName) {
final Pipe<O> outputPipe = new Pipe<>(newBufferSize, outputCounter.labels(stageName));
final Runnable processStage = new ProcessingStage<>(pipeEnd, outputPipe, processor);
final Stage processStage = new ProcessingStage<>(stageName, pipeEnd, outputPipe, processor);
return addStage(processStage, outputPipe);
}

private <O> PipelineBuilder<O> addStage(final Runnable stage, final Pipe<O> outputPipe) {
final List<Runnable> newStages = concat(stages, stage);
private <O> PipelineBuilder<O> addStage(final Stage stage, final Pipe<O> outputPipe) {
final List<Stage> newStages = concat(stages, stage);
return new PipelineBuilder<>(
inputPipe, newStages, concat(pipes, outputPipe), outputPipe, bufferSize, outputCounter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@
*/
package tech.pegasys.pantheon.services.pipeline;

class ProcessingStage<I, O> implements Runnable {
class ProcessingStage<I, O> implements Stage {

private final String name;
private final ReadPipe<I> inputPipe;
private final WritePipe<O> outputPipe;
private final Processor<I, O> processor;

public ProcessingStage(
final ReadPipe<I> inputPipe, final WritePipe<O> outputPipe, final Processor<I, O> processor) {
final String name,
final ReadPipe<I> inputPipe,
final WritePipe<O> outputPipe,
final Processor<I, O> processor) {
this.name = name;
this.inputPipe = inputPipe;
this.outputPipe = outputPipe;
this.processor = processor;
Expand All @@ -33,4 +38,9 @@ public void run() {
processor.finalize(outputPipe);
outputPipe.close();
}

@Override
public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;

public interface Stage extends Runnable {
String getName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class CompleterStageTest {
private final List<String> output = new ArrayList<>();
private final Counter outputCounter = mock(Counter.class);
private final CompleterStage<String> stage =
new CompleterStage<>(pipe, output::add, outputCounter);
new CompleterStage<>("name", pipe, output::add, outputCounter);

@Test
public void shouldAddItemsToOutputUntilPipeHasNoMore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class IteratorSourceStageTest {
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER);

private final IteratorSourceStage<String> stage =
new IteratorSourceStage<>(Iterators.forArray("a", "b", "c", "d"), output);
new IteratorSourceStage<>("name", Iterators.forArray("a", "b", "c", "d"), output);

@Test
public void shouldOutputEntriesThenClosePipe() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class ProcessingStageTest {

@Before
public void setUp() {
stage = new ProcessingStage<>(inputPipe, outputPipe, singleStep);
stage = new ProcessingStage<>("name", inputPipe, outputPipe, singleStep);
doAnswer(
invocation -> {
outputPipe.put(inputPipe.get().toLowerCase(Locale.UK));
Expand Down

0 comments on commit 04e7de5

Please sign in to comment.