Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Add pipe stage name to thread while executing #1097

Merged
merged 1 commit into from
Mar 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

class CompleterStage<T> implements Runnable {
class CompleterStage<T> implements Stage {
private final ReadPipe<T> input;
private final Consumer<T> completer;
private final Counter outputCounter;
private final String name;
private final CompletableFuture<?> future = new CompletableFuture<>();

CompleterStage(
final ReadPipe<T> input, final Consumer<T> completer, final Counter outputCounter) {
final String name,
final ReadPipe<T> input,
final Consumer<T> completer,
final Counter outputCounter) {
this.input = input;
this.completer = completer;
this.outputCounter = outputCounter;
this.name = name;
}

@Override
Expand All @@ -45,4 +50,9 @@ public void run() {
public CompletableFuture<?> getFuture() {
return future;
}

@Override
public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

import java.util.Iterator;

class IteratorSourceStage<T> implements Runnable {
class IteratorSourceStage<T> implements Stage {
private final Iterator<T> source;
private final Pipe<T> pipe;
private final String name;

IteratorSourceStage(final Iterator<T> source, final Pipe<T> pipe) {
IteratorSourceStage(final String name, final Iterator<T> source, final Pipe<T> pipe) {
this.source = source;
this.pipe = pipe;
this.name = name;
}

@Override
Expand All @@ -33,4 +35,9 @@ public void run() {
}
pipe.close();
}

@Override
public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
public class Pipeline {
private static final Logger LOG = LogManager.getLogger();
private final Pipe<?> inputPipe;
private final Collection<Runnable> stages;
private final Collection<Stage> stages;
private final Collection<Pipe<?>> pipes;
private final CompleterStage<?> completerStage;
private final AtomicBoolean started = new AtomicBoolean(false);
Expand All @@ -49,7 +49,7 @@ public class Pipeline {

Pipeline(
final Pipe<?> inputPipe,
final Collection<Runnable> stages,
final Collection<Stage> stages,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Collection? Does the order of the stages matter? That would imply List.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter at this point - we're going to execute them all in their own threads anyway. The order of stages is really enforced by the way the pipes connect them.

final Collection<Pipe<?>> pipes,
final CompleterStage<?> completerStage) {
this.inputPipe = inputPipe;
Expand Down Expand Up @@ -109,11 +109,13 @@ public void abort() {
abort(exception);
}

private Future<?> runWithErrorHandling(
final ExecutorService executorService, final Runnable task) {
private Future<?> runWithErrorHandling(final ExecutorService executorService, final Stage task) {
return executorService.submit(
() -> {
final Thread thread = Thread.currentThread();
final String originalName = thread.getName();
try {
thread.setName(originalName + " (" + task.getName() + ")");
task.run();
} catch (final Throwable t) {
LOG.debug("Unhandled exception in pipeline. Aborting.", t);
Expand All @@ -125,6 +127,8 @@ private Future<?> runWithErrorHandling(
// need to call get on it which would normally expose the error.
LOG.error("Failed to abort pipeline after error", t2);
}
} finally {
thread.setName(originalName);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@
public class PipelineBuilder<T> {

private final Pipe<?> inputPipe;
private final Collection<Runnable> stages;
private final Collection<Stage> stages;
private final Collection<Pipe<?>> pipes;
private final ReadPipe<T> pipeEnd;
private final int bufferSize;
private final LabelledMetric<Counter> outputCounter;

public PipelineBuilder(
final Pipe<?> inputPipe,
final Collection<Runnable> stages,
final Collection<Stage> stages,
final Collection<Pipe<?>> pipes,
final ReadPipe<T> pipeEnd,
final int bufferSize,
Expand Down Expand Up @@ -82,7 +82,7 @@ public static <T> PipelineBuilder<T> createPipelineFrom(
final int bufferSize,
final LabelledMetric<Counter> outputCounter) {
final Pipe<T> pipe = new Pipe<>(bufferSize, outputCounter.labels(sourceName));
final IteratorSourceStage<T> sourceStage = new IteratorSourceStage<>(source, pipe);
final IteratorSourceStage<T> sourceStage = new IteratorSourceStage<>(sourceName, source, pipe);
return new PipelineBuilder<>(
pipe, singleton(sourceStage), singleton(pipe), pipe, bufferSize, outputCounter);
}
Expand Down Expand Up @@ -134,10 +134,10 @@ public <O> PipelineBuilder<O> thenProcessInParallel(
final String stageName, final Function<T, O> processor, final int numberOfThreads) {
final Pipe<O> newPipeEnd = new Pipe<>(bufferSize, outputCounter.labels(stageName));
final WritePipe<O> outputPipe = new SharedWritePipe<>(newPipeEnd, numberOfThreads);
final ArrayList<Runnable> newStages = new ArrayList<>(stages);
final ArrayList<Stage> newStages = new ArrayList<>(stages);
for (int i = 0; i < numberOfThreads; i++) {
final Runnable processStage =
new ProcessingStage<>(pipeEnd, outputPipe, new MapProcessor<>(processor));
final Stage processStage =
new ProcessingStage<>(stageName, pipeEnd, outputPipe, new MapProcessor<>(processor));
newStages.add(processStage);
}
return new PipelineBuilder<>(
Expand Down Expand Up @@ -218,7 +218,7 @@ public Pipeline andFinishWith(final String stageName, final Consumer<T> complete
inputPipe,
stages,
pipes,
new CompleterStage<>(pipeEnd, completer, outputCounter.labels(stageName)));
new CompleterStage<>(stageName, pipeEnd, completer, outputCounter.labels(stageName)));
}

private <O> PipelineBuilder<O> addStage(final Processor<T, O> processor, final String stageName) {
Expand All @@ -228,12 +228,12 @@ private <O> PipelineBuilder<O> addStage(final Processor<T, O> processor, final S
private <O> PipelineBuilder<O> addStage(
final Processor<T, O> processor, final int newBufferSize, final String stageName) {
final Pipe<O> outputPipe = new Pipe<>(newBufferSize, outputCounter.labels(stageName));
final Runnable processStage = new ProcessingStage<>(pipeEnd, outputPipe, processor);
final Stage processStage = new ProcessingStage<>(stageName, pipeEnd, outputPipe, processor);
return addStage(processStage, outputPipe);
}

private <O> PipelineBuilder<O> addStage(final Runnable stage, final Pipe<O> outputPipe) {
final List<Runnable> newStages = concat(stages, stage);
private <O> PipelineBuilder<O> addStage(final Stage stage, final Pipe<O> outputPipe) {
final List<Stage> newStages = concat(stages, stage);
return new PipelineBuilder<>(
inputPipe, newStages, concat(pipes, outputPipe), outputPipe, bufferSize, outputCounter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@
*/
package tech.pegasys.pantheon.services.pipeline;

class ProcessingStage<I, O> implements Runnable {
class ProcessingStage<I, O> implements Stage {

private final String name;
private final ReadPipe<I> inputPipe;
private final WritePipe<O> outputPipe;
private final Processor<I, O> processor;

public ProcessingStage(
final ReadPipe<I> inputPipe, final WritePipe<O> outputPipe, final Processor<I, O> processor) {
final String name,
final ReadPipe<I> inputPipe,
final WritePipe<O> outputPipe,
final Processor<I, O> processor) {
this.name = name;
this.inputPipe = inputPipe;
this.outputPipe = outputPipe;
this.processor = processor;
Expand All @@ -33,4 +38,9 @@ public void run() {
processor.finalize(outputPipe);
outputPipe.close();
}

@Override
public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;

public interface Stage extends Runnable {
String getName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class CompleterStageTest {
private final List<String> output = new ArrayList<>();
private final Counter outputCounter = mock(Counter.class);
private final CompleterStage<String> stage =
new CompleterStage<>(pipe, output::add, outputCounter);
new CompleterStage<>("name", pipe, output::add, outputCounter);

@Test
public void shouldAddItemsToOutputUntilPipeHasNoMore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class IteratorSourceStageTest {
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER);

private final IteratorSourceStage<String> stage =
new IteratorSourceStage<>(Iterators.forArray("a", "b", "c", "d"), output);
new IteratorSourceStage<>("name", Iterators.forArray("a", "b", "c", "d"), output);

@Test
public void shouldOutputEntriesThenClosePipe() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class ProcessingStageTest {

@Before
public void setUp() {
stage = new ProcessingStage<>(inputPipe, outputPipe, singleStep);
stage = new ProcessingStage<>("name", inputPipe, outputPipe, singleStep);
doAnswer(
invocation -> {
outputPipe.put(inputPipe.get().toLowerCase(Locale.UK));
Expand Down