diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftProcessor.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftProcessor.java index 1c6a1a5796..e48b2df188 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftProcessor.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftProcessor.java @@ -15,11 +15,8 @@ import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftEvent; import java.util.Optional; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import com.google.common.annotations.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -29,7 +26,6 @@ public class IbftProcessor implements Runnable { private static final Logger LOG = LogManager.getLogger(); private final IbftEventQueue incomingQueue; - private final ScheduledExecutorService roundTimerExecutor; private volatile boolean shutdown = false; private final EventMultiplexer eventMultiplexer; @@ -41,19 +37,8 @@ public class IbftProcessor implements Runnable { */ public IbftProcessor( final IbftEventQueue incomingQueue, final EventMultiplexer eventMultiplexer) { - // Spawning the round timer with a single thread as we should never have more than 1 timer in - // flight at a time - this(incomingQueue, eventMultiplexer, Executors.newSingleThreadScheduledExecutor()); - } - - @VisibleForTesting - IbftProcessor( - final IbftEventQueue incomingQueue, - final EventMultiplexer eventMultiplexer, - final ScheduledExecutorService roundTimerExecutor) { this.incomingQueue = incomingQueue; this.eventMultiplexer = eventMultiplexer; - this.roundTimerExecutor = roundTimerExecutor; } /** Indicate to the processor that it should gracefully stop at its next opportunity */ @@ -68,7 +53,6 @@ public void run() { } // Clean up the executor service the round timer has been utilising LOG.info("Shutting down IBFT event processor"); - roundTimerExecutor.shutdownNow(); } private Optional nextIbftEvent() { diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/IbftProcessorTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/IbftProcessorTest.java index 08b0103829..61548c4e46 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/IbftProcessorTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/IbftProcessorTest.java @@ -27,7 +27,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.junit.Before; @@ -38,12 +37,10 @@ @RunWith(MockitoJUnitRunner.StrictStubs.class) public class IbftProcessorTest { - private ScheduledExecutorService mockExecutorService; private EventMultiplexer mockeEventMultiplexer; @Before public void initialise() { - mockExecutorService = mock(ScheduledExecutorService.class); mockeEventMultiplexer = mock(EventMultiplexer.class); } @@ -51,8 +48,7 @@ public void initialise() { public void handlesStopGracefully() throws InterruptedException { final IbftEventQueue mockQueue = mock(IbftEventQueue.class); Mockito.when(mockQueue.poll(anyLong(), any())).thenReturn(null); - final IbftProcessor processor = - new IbftProcessor(mockQueue, mockeEventMultiplexer, mockExecutorService); + final IbftProcessor processor = new IbftProcessor(mockQueue, mockeEventMultiplexer); // Start the IbftProcessor final ExecutorService processorExecutor = Executors.newSingleThreadExecutor(); @@ -73,15 +69,12 @@ public void handlesStopGracefully() throws InterruptedException { // The processor task has exited assertThat(processorFuture.isDone()).isTrue(); - - // Make sure the round timers executor got cleaned up - verify(mockExecutorService).shutdownNow(); } @Test public void cleanupExecutorsAfterShutdownNow() throws InterruptedException { final IbftProcessor processor = - new IbftProcessor(new IbftEventQueue(1000), mockeEventMultiplexer, mockExecutorService); + new IbftProcessor(new IbftEventQueue(1000), mockeEventMultiplexer); // Start the IbftProcessor final ExecutorService processorExecutor = Executors.newSingleThreadExecutor(); @@ -99,9 +92,6 @@ public void cleanupExecutorsAfterShutdownNow() throws InterruptedException { // The processor task has exited assertThat(processorFuture.isDone()).isTrue(); - - // Make sure the round timers executor got cleaned up - verify(mockExecutorService).shutdownNow(); } @Test @@ -110,8 +100,7 @@ public void handlesQueueInterruptGracefully() throws InterruptedException { final IbftEventQueue mockQueue = mock(IbftEventQueue.class); Mockito.when(mockQueue.poll(anyLong(), any())).thenThrow(new InterruptedException()); - final IbftProcessor processor = - new IbftProcessor(mockQueue, mockeEventMultiplexer, mockExecutorService); + final IbftProcessor processor = new IbftProcessor(mockQueue, mockeEventMultiplexer); // Start the IbftProcessor final ExecutorService processorExecutor = Executors.newSingleThreadExecutor(); @@ -135,16 +124,12 @@ public void handlesQueueInterruptGracefully() throws InterruptedException { // The processor task has woken up and exited assertThat(processorFuture.isDone()).isTrue(); - - // Make sure the round timers executor got cleaned up - verify(mockExecutorService).shutdownNow(); } @Test public void drainEventsIntoStateMachine() throws InterruptedException { final IbftEventQueue queue = new IbftEventQueue(1000); - final IbftProcessor processor = - new IbftProcessor(queue, mockeEventMultiplexer, mockExecutorService); + final IbftProcessor processor = new IbftProcessor(queue, mockeEventMultiplexer); // Start the IbftProcessor final ExecutorService processorExecutor = Executors.newSingleThreadExecutor(); diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java index 67d629f868..bc87db583e 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java @@ -13,6 +13,7 @@ package tech.pegasys.pantheon.controller; import static org.apache.logging.log4j.LogManager.getLogger; +import static tech.pegasys.pantheon.ethereum.eth.manager.MonitoredExecutors.newScheduledThreadPool; import tech.pegasys.pantheon.config.GenesisConfigFile; import tech.pegasys.pantheon.config.IbftConfigOptions; @@ -83,6 +84,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.Logger; @@ -214,6 +216,9 @@ public static PantheonController init( final IbftGossip gossiper = new IbftGossip(uniqueMessageMulticaster); + final ScheduledExecutorService timerExecutor = + newScheduledThreadPool("IbftTimerExecutor", 1, metricsSystem); + final IbftFinalState finalState = new IbftFinalState( voteTallyCache, @@ -221,14 +226,11 @@ public static PantheonController init( Util.publicKeyToAddress(nodeKeys.getPublicKey()), proposerSelector, uniqueMessageMulticaster, - new RoundTimer( - ibftEventQueue, - ibftConfig.getRequestTimeoutSeconds(), - Executors.newScheduledThreadPool(1)), + new RoundTimer(ibftEventQueue, ibftConfig.getRequestTimeoutSeconds(), timerExecutor), new BlockTimer( ibftEventQueue, ibftConfig.getBlockPeriodSeconds(), - Executors.newScheduledThreadPool(1), + timerExecutor, Clock.systemUTC()), blockCreatorFactory, new MessageFactory(nodeKeys), @@ -285,6 +287,12 @@ public static PantheonController init( } catch (final InterruptedException e) { LOG.error("Failed to shutdown ibft processor executor"); } + timerExecutor.shutdownNow(); + try { + timerExecutor.awaitTermination(5, TimeUnit.SECONDS); + } catch (final InterruptedException e) { + LOG.error("Failed to shutdown timer executor"); + } try { storageProvider.close(); } catch (final IOException e) {