diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStep.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStep.java index 0dcc34afe3..7c7651f0b9 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStep.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStep.java @@ -41,7 +41,7 @@ public LoadLocalDataStep( public Stream> loadLocalData( final Task task, final Pipe> completedTasks) { final NodeDataRequest request = task.getData(); - final Optional existingData = worldStateStorage.getNodeData(request.getHash()); + final Optional existingData = request.getExistingData(worldStateStorage); if (existingData.isPresent()) { existingNodeCounter.inc(); request.setData(existingData.get()); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java index 06f08ce624..2fc4f17505 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java @@ -56,7 +56,7 @@ public void shouldReturnStreamWithUnchangedTaskWhenDataNotPresent() { @Test public void shouldReturnEmptyStreamAndSendTaskToCompletedPipeWhenDataIsPresent() { - when(worldStateStorage.getNodeData(HASH)).thenReturn(Optional.of(DATA)); + when(worldStateStorage.getCode(HASH)).thenReturn(Optional.of(DATA)); final Stream> output = loadLocalDataStep.loadLocalData(task, completedTasks); diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessor.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessor.java index f33341daf2..71d7896572 100644 --- a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessor.java +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessor.java @@ -46,11 +46,10 @@ public void processNextInput(final ReadPipe inputPipe, final WritePipe out // When the future completes, interrupt so if we're waiting for new input we wake up and // schedule the output. final Thread stageThread = Thread.currentThread(); - future.whenComplete((result, error) -> stageThread.interrupt()); inProgress.add(future); + future.whenComplete((result, error) -> stageThread.interrupt()); } - - outputCompletedTasks(0, outputPipe); + outputCompletedTasks(outputPipe); } else { outputNextCompletedTask(outputPipe); } @@ -66,7 +65,7 @@ public void finalize(final WritePipe outputPipe) { private void outputNextCompletedTask(final WritePipe outputPipe) { try { waitForAnyFutureToComplete(); - outputCompletedTasks(1, outputPipe); + outputCompletedTasks(outputPipe); } catch (final InterruptedException e) { LOG.trace("Interrupted while waiting for processing to complete", e); } catch (final ExecutionException e) { @@ -82,16 +81,13 @@ private void waitForAnyFutureToComplete() CompletableFuture.anyOf(inProgress.toArray(new CompletableFuture[0])).get(1, TimeUnit.SECONDS); } - private void outputCompletedTasks(final int minTasksToOutput, final WritePipe outputPipe) { - int outputTasks = 0; - for (final Iterator> i = inProgress.iterator(); - i.hasNext() && (outputTasks < minTasksToOutput || outputPipe.hasRemainingCapacity()); ) { + private void outputCompletedTasks(final WritePipe outputPipe) { + for (final Iterator> i = inProgress.iterator(); i.hasNext(); ) { final CompletableFuture process = i.next(); final O result = process.getNow(null); if (result != null) { outputPipe.put(result); i.remove(); - outputTasks++; } } } diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessorTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessorTest.java new file mode 100644 index 0000000000..64f2190cc8 --- /dev/null +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessorTest.java @@ -0,0 +1,113 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import org.junit.Test; + +public class AsyncOperationProcessorTest { + + @SuppressWarnings("unchecked") + private final ReadPipe> readPipe = mock(ReadPipe.class); + + @SuppressWarnings("unchecked") + private final WritePipe writePipe = mock(WritePipe.class); + + private final AsyncOperationProcessor, String> processor = + new AsyncOperationProcessor<>(Function.identity(), 3); + + @Test + public void shouldImmediatelyOutputTasksThatAreAlreadyCompleteEvenIfOutputPipeIsFull() { + when(writePipe.hasRemainingCapacity()).thenReturn(false); + when(readPipe.get()).thenReturn(completedFuture("a")); + + processor.processNextInput(readPipe, writePipe); + verify(writePipe).put("a"); + } + + @Test + public void shouldNotExceedConcurrentJobLimit() { + + final CompletableFuture task1 = new CompletableFuture<>(); + final CompletableFuture task2 = new CompletableFuture<>(); + final CompletableFuture task3 = new CompletableFuture<>(); + final CompletableFuture task4 = new CompletableFuture<>(); + when(readPipe.get()).thenReturn(task1).thenReturn(task2).thenReturn(task3).thenReturn(task4); + + // 3 tasks started + processor.processNextInput(readPipe, writePipe); + processor.processNextInput(readPipe, writePipe); + processor.processNextInput(readPipe, writePipe); + verify(readPipe, times(3)).get(); + + // Reached limit of concurrent tasks so this round does nothing. + processor.processNextInput(readPipe, writePipe); + verify(readPipe, times(3)).get(); + + task1.complete("a"); + + // Next round will output the completed task + processor.processNextInput(readPipe, writePipe); + verify(writePipe).put("a"); + + // And so now we are able to start another one. + processor.processNextInput(readPipe, writePipe); + verify(readPipe, times(4)).get(); + } + + @Test + public void shouldOutputRemainingInProgressTasksWhenFinalizing() { + final CompletableFuture task1 = new CompletableFuture<>(); + final CompletableFuture task2 = new CompletableFuture<>(); + when(readPipe.get()).thenReturn(task1).thenReturn(task2); + + // Start the two tasks + processor.processNextInput(readPipe, writePipe); + processor.processNextInput(readPipe, writePipe); + verify(readPipe, times(2)).get(); + verifyZeroInteractions(writePipe); + + task1.complete("a"); + task2.complete("b"); + + // Processing + processor.finalize(writePipe); + verify(writePipe).put("a"); + verify(writePipe).put("b"); + } + + @Test + public void shouldInterruptThreadWhenFutureCompletes() { + // Ensures that if we're waiting for the next input we wake up and output completed tasks + + final CompletableFuture task1 = new CompletableFuture<>(); + when(readPipe.get()).thenReturn(task1); + + // Start the two tasks + processor.processNextInput(readPipe, writePipe); + + task1.complete("a"); + + assertThat(Thread.currentThread().isInterrupted()).isTrue(); + } +} diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java index 0ca2a1bd35..9bffeea04f 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java @@ -117,4 +117,10 @@ public void shouldIncrementOutputCounterWhenItemRemovedToPipeWithGet() { assertThat(pipe.get()).isNull(); verify(outputCounter, times(2)).inc(); } + + @Test + public void shouldReturnNullFromGetImmediatelyIfThreadIsInterrupted() { + Thread.currentThread().interrupt(); + assertThat(pipe.get()).isNull(); + } }