Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Fix deadlock scenario in AsyncOperationProcessor and re-enable WorldS…
Browse files Browse the repository at this point in the history
…tateDownloaderTest (#1126)
  • Loading branch information
ajsutton authored Mar 19, 2019
1 parent a9b3f34 commit e6c85bd
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public LoadLocalDataStep(
public Stream<Task<NodeDataRequest>> loadLocalData(
final Task<NodeDataRequest> task, final Pipe<Task<NodeDataRequest>> completedTasks) {
final NodeDataRequest request = task.getData();
final Optional<BytesValue> existingData = worldStateStorage.getNodeData(request.getHash());
final Optional<BytesValue> existingData = request.getExistingData(worldStateStorage);
if (existingData.isPresent()) {
existingNodeCounter.inc();
request.setData(existingData.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void shouldReturnStreamWithUnchangedTaskWhenDataNotPresent() {

@Test
public void shouldReturnEmptyStreamAndSendTaskToCompletedPipeWhenDataIsPresent() {
when(worldStateStorage.getNodeData(HASH)).thenReturn(Optional.of(DATA));
when(worldStateStorage.getCode(HASH)).thenReturn(Optional.of(DATA));

final Stream<Task<NodeDataRequest>> output =
loadLocalDataStep.loadLocalData(task, completedTasks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ public void processNextInput(final ReadPipe<I> inputPipe, final WritePipe<O> out
// When the future completes, interrupt so if we're waiting for new input we wake up and
// schedule the output.
final Thread stageThread = Thread.currentThread();
future.whenComplete((result, error) -> stageThread.interrupt());
inProgress.add(future);
future.whenComplete((result, error) -> stageThread.interrupt());
}

outputCompletedTasks(0, outputPipe);
outputCompletedTasks(outputPipe);
} else {
outputNextCompletedTask(outputPipe);
}
Expand All @@ -66,7 +65,7 @@ public void finalize(final WritePipe<O> outputPipe) {
private void outputNextCompletedTask(final WritePipe<O> outputPipe) {
try {
waitForAnyFutureToComplete();
outputCompletedTasks(1, outputPipe);
outputCompletedTasks(outputPipe);
} catch (final InterruptedException e) {
LOG.trace("Interrupted while waiting for processing to complete", e);
} catch (final ExecutionException e) {
Expand All @@ -82,16 +81,13 @@ private void waitForAnyFutureToComplete()
CompletableFuture.anyOf(inProgress.toArray(new CompletableFuture[0])).get(1, TimeUnit.SECONDS);
}

private void outputCompletedTasks(final int minTasksToOutput, final WritePipe<O> outputPipe) {
int outputTasks = 0;
for (final Iterator<CompletableFuture<O>> i = inProgress.iterator();
i.hasNext() && (outputTasks < minTasksToOutput || outputPipe.hasRemainingCapacity()); ) {
private void outputCompletedTasks(final WritePipe<O> outputPipe) {
for (final Iterator<CompletableFuture<O>> i = inProgress.iterator(); i.hasNext(); ) {
final CompletableFuture<O> process = i.next();
final O result = process.getNow(null);
if (result != null) {
outputPipe.put(result);
i.remove();
outputTasks++;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import org.junit.Test;

public class AsyncOperationProcessorTest {

@SuppressWarnings("unchecked")
private final ReadPipe<CompletableFuture<String>> readPipe = mock(ReadPipe.class);

@SuppressWarnings("unchecked")
private final WritePipe<String> writePipe = mock(WritePipe.class);

private final AsyncOperationProcessor<CompletableFuture<String>, String> processor =
new AsyncOperationProcessor<>(Function.identity(), 3);

@Test
public void shouldImmediatelyOutputTasksThatAreAlreadyCompleteEvenIfOutputPipeIsFull() {
when(writePipe.hasRemainingCapacity()).thenReturn(false);
when(readPipe.get()).thenReturn(completedFuture("a"));

processor.processNextInput(readPipe, writePipe);
verify(writePipe).put("a");
}

@Test
public void shouldNotExceedConcurrentJobLimit() {

final CompletableFuture<String> task1 = new CompletableFuture<>();
final CompletableFuture<String> task2 = new CompletableFuture<>();
final CompletableFuture<String> task3 = new CompletableFuture<>();
final CompletableFuture<String> task4 = new CompletableFuture<>();
when(readPipe.get()).thenReturn(task1).thenReturn(task2).thenReturn(task3).thenReturn(task4);

// 3 tasks started
processor.processNextInput(readPipe, writePipe);
processor.processNextInput(readPipe, writePipe);
processor.processNextInput(readPipe, writePipe);
verify(readPipe, times(3)).get();

// Reached limit of concurrent tasks so this round does nothing.
processor.processNextInput(readPipe, writePipe);
verify(readPipe, times(3)).get();

task1.complete("a");

// Next round will output the completed task
processor.processNextInput(readPipe, writePipe);
verify(writePipe).put("a");

// And so now we are able to start another one.
processor.processNextInput(readPipe, writePipe);
verify(readPipe, times(4)).get();
}

@Test
public void shouldOutputRemainingInProgressTasksWhenFinalizing() {
final CompletableFuture<String> task1 = new CompletableFuture<>();
final CompletableFuture<String> task2 = new CompletableFuture<>();
when(readPipe.get()).thenReturn(task1).thenReturn(task2);

// Start the two tasks
processor.processNextInput(readPipe, writePipe);
processor.processNextInput(readPipe, writePipe);
verify(readPipe, times(2)).get();
verifyZeroInteractions(writePipe);

task1.complete("a");
task2.complete("b");

// Processing
processor.finalize(writePipe);
verify(writePipe).put("a");
verify(writePipe).put("b");
}

@Test
public void shouldInterruptThreadWhenFutureCompletes() {
// Ensures that if we're waiting for the next input we wake up and output completed tasks

final CompletableFuture<String> task1 = new CompletableFuture<>();
when(readPipe.get()).thenReturn(task1);

// Start the two tasks
processor.processNextInput(readPipe, writePipe);

task1.complete("a");

assertThat(Thread.currentThread().isInterrupted()).isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,10 @@ public void shouldIncrementOutputCounterWhenItemRemovedToPipeWithGet() {
assertThat(pipe.get()).isNull();
verify(outputCounter, times(2)).inc();
}

@Test
public void shouldReturnNullFromGetImmediatelyIfThreadIsInterrupted() {
Thread.currentThread().interrupt();
assertThat(pipe.get()).isNull();
}
}

0 comments on commit e6c85bd

Please sign in to comment.