Skip to content

Commit

Permalink
Fix data race in BitcoindDaemonTest
Browse files Browse the repository at this point in the history
Prevent intermittent test failures, caused by a race between checking
whether the mock socket is closed upon accepting a new connection and
setting 'socketClosed' to true during shutdown. Waiting to accept and
then checking the flag needs to be done in a synchronized block.
  • Loading branch information
stejbac committed Jan 12, 2021
1 parent 7c4d379 commit 224d903
Showing 1 changed file with 22 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand All @@ -37,8 +38,6 @@

import static org.mockito.Mockito.*;

// FIXME: There appears to be a data race in some of these tests, causing intermittent Mockito.verify failures.
// We should remove the two Thread.sleep lines which were added below to try to prevent them.
public class BitcoindDaemonTest {
private BitcoindDaemon daemon;
private int acceptAnotherCount;
Expand All @@ -52,13 +51,12 @@ public class BitcoindDaemonTest {
public void setUp() throws Exception {
var serverSocket = mock(ServerSocket.class);

when(serverSocket.accept()).then(invocation -> {
waitToAccept();
when(serverSocket.accept()).then(invocation -> waitToAccept(() -> {
if (socketClosed) {
throw new SocketException();
}
return socket;
});
}));
doAnswer((VoidAnswer) invocation -> {
socketClosed = true;
acceptAnother(1);
Expand All @@ -79,42 +77,35 @@ public void tearDown() {
public void testNoBlocksMissedDuringFloodOfIncomingBlocks() throws Exception {
var latch = new CountDownLatch(1); // to block all the daemon worker threads until shutdown, as if stuck

synchronized (this) {
doAnswer((VoidAnswer) invocation -> latch.await()).when(blockListener).blockDetected(any());
when(socket.getInputStream()).then(invocation -> new ByteArrayInputStream("foo".getBytes()));
doAnswer((VoidAnswer) invocation -> latch.await()).when(blockListener).blockDetected(any());
when(socket.getInputStream()).then(invocation -> new ByteArrayInputStream("foo".getBytes()));

acceptAnother(50);
waitUntilAllAccepted();
}
acceptAnother(50);
waitUntilAllAccepted();

// Unblock all the daemon worker threads and shut down.
latch.countDown();
daemon.shutdown();
Thread.sleep(100);

verify(blockListener, times(50)).blockDetected("foo");
}

@Test
public void testBlockHashIsTrimmed() throws Exception {
synchronized (this) {
when(socket.getInputStream()).then(invocation -> new ByteArrayInputStream("\r\nbar \n".getBytes()));
when(socket.getInputStream()).then(invocation -> new ByteArrayInputStream("\r\nbar \n".getBytes()));

acceptAnother(1);
waitUntilAllAccepted();
}
acceptAnother(1);
waitUntilAllAccepted();
daemon.shutdown();
Thread.sleep(100);

verify(blockListener).blockDetected("bar");
}

@Test
public void testBrokenSocketRead() throws Exception {
synchronized (this) {
when(socket.getInputStream()).thenThrow(IOException.class);
acceptAnother(1);
}
when(socket.getInputStream()).thenThrow(IOException.class);

acceptAnother(1);
errorHandlerLatch.await(5, TimeUnit.SECONDS);

verify(errorHandler).accept(argThat(t -> t instanceof NotificationHandlerException &&
Expand All @@ -123,13 +114,12 @@ public void testBrokenSocketRead() throws Exception {

@Test
public void testRuntimeExceptionInBlockListener() throws Exception {
synchronized (this) {
daemon.setBlockListener(blockHash -> {
throw new IndexOutOfBoundsException();
});
when(socket.getInputStream()).then(invocation -> new ByteArrayInputStream("foo".getBytes()));
acceptAnother(1);
}
daemon.setBlockListener(blockHash -> {
throw new IndexOutOfBoundsException();
});
when(socket.getInputStream()).then(invocation -> new ByteArrayInputStream("foo".getBytes()));

acceptAnother(1);
errorHandlerLatch.await(5, TimeUnit.SECONDS);

verify(errorHandler).accept(argThat(t -> t instanceof NotificationHandlerException &&
Expand Down Expand Up @@ -161,12 +151,14 @@ private synchronized void acceptAnother(int n) {
notifyAll();
}

private synchronized void waitToAccept() throws InterruptedException {
private synchronized <V> V waitToAccept(Callable<V> onAccept) throws Exception {
while (acceptAnotherCount == 0) {
wait();
}
var result = onAccept.call();
acceptAnotherCount--;
notifyAll();
return result;
}

private synchronized void waitUntilAllAccepted() throws InterruptedException {
Expand Down

0 comments on commit 224d903

Please sign in to comment.