From 87ba6af0c0f350a0e4284fa51f2c498e49f2dbd8 Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Tue, 19 Mar 2019 07:07:17 +1000 Subject: [PATCH 1/5] Ignore WorldStateDownloaderTest --- .../ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java index c6dc4ae033..e4a9a0ef71 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java @@ -86,10 +86,12 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.junit.After; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; +@Ignore("PIE-1434 - Ignored while working to make test more reliable") public class WorldStateDownloaderTest { @Rule public Timeout globalTimeout = Timeout.seconds(60); // 1 minute max per test From cea69e6f154454096c21dd4f7edaa14d587c0fd5 Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Tue, 19 Mar 2019 08:46:29 +1000 Subject: [PATCH 2/5] Fix deadlock in AsyncOperationProcessor. Re-enable WorldStateDownloaderTest --- .../eth/sync/worldstate/LoadLocalDataStep.java | 2 +- .../sync/worldstate/WorldStateDownloaderTest.java | 2 -- .../services/pipeline/AsyncOperationProcessor.java | 14 +++++--------- .../pantheon/services/pipeline/PipeTest.java | 6 ++++++ 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStep.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStep.java index 0dcc34afe3..7c7651f0b9 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStep.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStep.java @@ -41,7 +41,7 @@ public LoadLocalDataStep( public Stream> loadLocalData( final Task task, final Pipe> completedTasks) { final NodeDataRequest request = task.getData(); - final Optional existingData = worldStateStorage.getNodeData(request.getHash()); + final Optional existingData = request.getExistingData(worldStateStorage); if (existingData.isPresent()) { existingNodeCounter.inc(); request.setData(existingData.get()); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java index e4a9a0ef71..c6dc4ae033 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java @@ -86,12 +86,10 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.junit.After; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; -@Ignore("PIE-1434 - Ignored while working to make test more reliable") public class WorldStateDownloaderTest { @Rule public Timeout globalTimeout = Timeout.seconds(60); // 1 minute max per test diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessor.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessor.java index f33341daf2..71d7896572 100644 --- a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessor.java +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessor.java @@ -46,11 +46,10 @@ public void processNextInput(final ReadPipe inputPipe, final WritePipe out // When the future completes, interrupt so if we're waiting for new input we wake up and // schedule the output. final Thread stageThread = Thread.currentThread(); - future.whenComplete((result, error) -> stageThread.interrupt()); inProgress.add(future); + future.whenComplete((result, error) -> stageThread.interrupt()); } - - outputCompletedTasks(0, outputPipe); + outputCompletedTasks(outputPipe); } else { outputNextCompletedTask(outputPipe); } @@ -66,7 +65,7 @@ public void finalize(final WritePipe outputPipe) { private void outputNextCompletedTask(final WritePipe outputPipe) { try { waitForAnyFutureToComplete(); - outputCompletedTasks(1, outputPipe); + outputCompletedTasks(outputPipe); } catch (final InterruptedException e) { LOG.trace("Interrupted while waiting for processing to complete", e); } catch (final ExecutionException e) { @@ -82,16 +81,13 @@ private void waitForAnyFutureToComplete() CompletableFuture.anyOf(inProgress.toArray(new CompletableFuture[0])).get(1, TimeUnit.SECONDS); } - private void outputCompletedTasks(final int minTasksToOutput, final WritePipe outputPipe) { - int outputTasks = 0; - for (final Iterator> i = inProgress.iterator(); - i.hasNext() && (outputTasks < minTasksToOutput || outputPipe.hasRemainingCapacity()); ) { + private void outputCompletedTasks(final WritePipe outputPipe) { + for (final Iterator> i = inProgress.iterator(); i.hasNext(); ) { final CompletableFuture process = i.next(); final O result = process.getNow(null); if (result != null) { outputPipe.put(result); i.remove(); - outputTasks++; } } } diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java index bb163dd3fa..e8d9e4c266 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java @@ -80,4 +80,10 @@ public void shouldIncrementCounterWhenItemAddedToPipe() { pipe.put("B"); verify(itemCounter, times(2)).inc(); } + + @Test + public void shouldReturnNullFromGetImmediatelyIfThreadIsInterrupted() { + Thread.currentThread().interrupt(); + assertThat(pipe.get()).isNull(); + } } From ab834f9e5f3fdaec05df90c14794a693cd5ed5b2 Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Tue, 19 Mar 2019 09:01:22 +1000 Subject: [PATCH 3/5] Update test. --- .../ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java index eb6336aadc..2ff1477387 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java @@ -55,7 +55,7 @@ public void shouldReturnStreamWithUnchangedTaskWhenDataNotPresent() { @Test public void shouldReturnEmptyStreamAndSendTaskToCompletedPipeWhenDataIsPresent() { - when(worldStateStorage.getNodeData(HASH)).thenReturn(Optional.of(DATA)); + when(worldStateStorage.getCode(HASH)).thenReturn(Optional.of(DATA)); final Stream> output = loadLocalDataStep.loadLocalData(task, completedTasks); From 0caaf2e6823a404fff89e059ec639ef1af29dae1 Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Tue, 19 Mar 2019 09:45:16 +1000 Subject: [PATCH 4/5] Add tests for AsyncOperationProcessor. --- .../pipeline/AsyncOperationProcessorTest.java | 101 ++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessorTest.java diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessorTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessorTest.java new file mode 100644 index 0000000000..6e4526017c --- /dev/null +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessorTest.java @@ -0,0 +1,101 @@ +package tech.pegasys.pantheon.services.pipeline; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import org.junit.Test; + +public class AsyncOperationProcessorTest { + + @SuppressWarnings("unchecked") + private final ReadPipe> readPipe = mock(ReadPipe.class); + + @SuppressWarnings("unchecked") + private final WritePipe writePipe = mock(WritePipe.class); + + private final AsyncOperationProcessor, String> processor = + new AsyncOperationProcessor<>(Function.identity(), 3); + + @Test + public void shouldImmediatelyOutputTasksThatAreAlreadyCompleteEvenIfOutputPipeIsFull() { + when(writePipe.hasRemainingCapacity()).thenReturn(false); + when(readPipe.get()).thenReturn(completedFuture("a")); + + processor.processNextInput(readPipe, writePipe); + verify(writePipe).put("a"); + } + + @Test + public void shouldNotExceedConcurrentJobLimit() { + + final CompletableFuture task1 = new CompletableFuture<>(); + final CompletableFuture task2 = new CompletableFuture<>(); + final CompletableFuture task3 = new CompletableFuture<>(); + final CompletableFuture task4 = new CompletableFuture<>(); + when(readPipe.get()).thenReturn(task1).thenReturn(task2).thenReturn(task3).thenReturn(task4); + + // 3 tasks started + processor.processNextInput(readPipe, writePipe); + processor.processNextInput(readPipe, writePipe); + processor.processNextInput(readPipe, writePipe); + verify(readPipe, times(3)).get(); + + // Reached limit of concurrent tasks so this round does nothing. + processor.processNextInput(readPipe, writePipe); + verify(readPipe, times(3)).get(); + + task1.complete("a"); + + // Next round will output the completed task + processor.processNextInput(readPipe, writePipe); + verify(writePipe).put("a"); + + // And so now we are able to start another one. + processor.processNextInput(readPipe, writePipe); + verify(readPipe, times(4)).get(); + } + + @Test + public void shouldOutputRemainingInProgressTasksWhenFinalizing() { + final CompletableFuture task1 = new CompletableFuture<>(); + final CompletableFuture task2 = new CompletableFuture<>(); + when(readPipe.get()).thenReturn(task1).thenReturn(task2); + + // Start the two tasks + processor.processNextInput(readPipe, writePipe); + processor.processNextInput(readPipe, writePipe); + verify(readPipe, times(2)).get(); + verifyZeroInteractions(writePipe); + + task1.complete("a"); + task2.complete("b"); + + // Processing + processor.finalize(writePipe); + verify(writePipe).put("a"); + verify(writePipe).put("b"); + } + + @Test + public void shouldInterruptThreadWhenFutureCompletes() { + // Ensures that if we're waiting for the next input we wake up and output completed tasks + + final CompletableFuture task1 = new CompletableFuture<>(); + when(readPipe.get()).thenReturn(task1); + + // Start the two tasks + processor.processNextInput(readPipe, writePipe); + + task1.complete("a"); + + assertThat(Thread.currentThread().isInterrupted()).isTrue(); + } +} From 98999e5042ae61b945ce21cecbec0eff300cbe79 Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Tue, 19 Mar 2019 10:18:26 +1000 Subject: [PATCH 5/5] Spotless. --- .../pipeline/AsyncOperationProcessorTest.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessorTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessorTest.java index 6e4526017c..64f2190cc8 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessorTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessorTest.java @@ -1,3 +1,15 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ package tech.pegasys.pantheon.services.pipeline; import static java.util.concurrent.CompletableFuture.completedFuture;