Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Cleanup Ibft executors #951

Merged
merged 5 commits into from
Feb 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftEvent;

import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -29,7 +26,6 @@ public class IbftProcessor implements Runnable {
private static final Logger LOG = LogManager.getLogger();

private final IbftEventQueue incomingQueue;
private final ScheduledExecutorService roundTimerExecutor;
private volatile boolean shutdown = false;
private final EventMultiplexer eventMultiplexer;

Expand All @@ -41,19 +37,8 @@ public class IbftProcessor implements Runnable {
*/
public IbftProcessor(
final IbftEventQueue incomingQueue, final EventMultiplexer eventMultiplexer) {
// Spawning the round timer with a single thread as we should never have more than 1 timer in
// flight at a time
this(incomingQueue, eventMultiplexer, Executors.newSingleThreadScheduledExecutor());
}

@VisibleForTesting
IbftProcessor(
final IbftEventQueue incomingQueue,
final EventMultiplexer eventMultiplexer,
final ScheduledExecutorService roundTimerExecutor) {
this.incomingQueue = incomingQueue;
this.eventMultiplexer = eventMultiplexer;
this.roundTimerExecutor = roundTimerExecutor;
}

/** Indicate to the processor that it should gracefully stop at its next opportunity */
Expand All @@ -68,7 +53,6 @@ public void run() {
}
// Clean up the executor service the round timer has been utilising
LOG.info("Shutting down IBFT event processor");
roundTimerExecutor.shutdownNow();
}

private Optional<IbftEvent> nextIbftEvent() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.junit.Before;
Expand All @@ -38,21 +37,18 @@

@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class IbftProcessorTest {
private ScheduledExecutorService mockExecutorService;
private EventMultiplexer mockeEventMultiplexer;

@Before
public void initialise() {
mockExecutorService = mock(ScheduledExecutorService.class);
mockeEventMultiplexer = mock(EventMultiplexer.class);
}

@Test
public void handlesStopGracefully() throws InterruptedException {
final IbftEventQueue mockQueue = mock(IbftEventQueue.class);
Mockito.when(mockQueue.poll(anyLong(), any())).thenReturn(null);
final IbftProcessor processor =
new IbftProcessor(mockQueue, mockeEventMultiplexer, mockExecutorService);
final IbftProcessor processor = new IbftProcessor(mockQueue, mockeEventMultiplexer);

// Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
Expand All @@ -73,15 +69,12 @@ public void handlesStopGracefully() throws InterruptedException {

// The processor task has exited
assertThat(processorFuture.isDone()).isTrue();

// Make sure the round timers executor got cleaned up
verify(mockExecutorService).shutdownNow();
}

@Test
public void cleanupExecutorsAfterShutdownNow() throws InterruptedException {
final IbftProcessor processor =
new IbftProcessor(new IbftEventQueue(1000), mockeEventMultiplexer, mockExecutorService);
new IbftProcessor(new IbftEventQueue(1000), mockeEventMultiplexer);

// Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
Expand All @@ -99,9 +92,6 @@ public void cleanupExecutorsAfterShutdownNow() throws InterruptedException {

// The processor task has exited
assertThat(processorFuture.isDone()).isTrue();

// Make sure the round timers executor got cleaned up
verify(mockExecutorService).shutdownNow();
}

@Test
Expand All @@ -110,8 +100,7 @@ public void handlesQueueInterruptGracefully() throws InterruptedException {
final IbftEventQueue mockQueue = mock(IbftEventQueue.class);
Mockito.when(mockQueue.poll(anyLong(), any())).thenThrow(new InterruptedException());

final IbftProcessor processor =
new IbftProcessor(mockQueue, mockeEventMultiplexer, mockExecutorService);
final IbftProcessor processor = new IbftProcessor(mockQueue, mockeEventMultiplexer);

// Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
Expand All @@ -135,16 +124,12 @@ public void handlesQueueInterruptGracefully() throws InterruptedException {

// The processor task has woken up and exited
assertThat(processorFuture.isDone()).isTrue();

// Make sure the round timers executor got cleaned up
verify(mockExecutorService).shutdownNow();
}

@Test
public void drainEventsIntoStateMachine() throws InterruptedException {
final IbftEventQueue queue = new IbftEventQueue(1000);
final IbftProcessor processor =
new IbftProcessor(queue, mockeEventMultiplexer, mockExecutorService);
final IbftProcessor processor = new IbftProcessor(queue, mockeEventMultiplexer);

// Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package tech.pegasys.pantheon.controller;

import static org.apache.logging.log4j.LogManager.getLogger;
import static tech.pegasys.pantheon.ethereum.eth.manager.MonitoredExecutors.newScheduledThreadPool;

import tech.pegasys.pantheon.config.GenesisConfigFile;
import tech.pegasys.pantheon.config.IbftConfigOptions;
Expand Down Expand Up @@ -83,6 +84,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -214,21 +216,21 @@ public static PantheonController<IbftContext> init(

final IbftGossip gossiper = new IbftGossip(uniqueMessageMulticaster);

final ScheduledExecutorService timerExecutor =
newScheduledThreadPool("IbftTimerExecutor", 1, metricsSystem);

final IbftFinalState finalState =
new IbftFinalState(
voteTallyCache,
nodeKeys,
Util.publicKeyToAddress(nodeKeys.getPublicKey()),
proposerSelector,
uniqueMessageMulticaster,
new RoundTimer(
ibftEventQueue,
ibftConfig.getRequestTimeoutSeconds(),
Executors.newScheduledThreadPool(1)),
new RoundTimer(ibftEventQueue, ibftConfig.getRequestTimeoutSeconds(), timerExecutor),
new BlockTimer(
ibftEventQueue,
ibftConfig.getBlockPeriodSeconds(),
Executors.newScheduledThreadPool(1),
timerExecutor,
Clock.systemUTC()),
blockCreatorFactory,
new MessageFactory(nodeKeys),
Expand Down Expand Up @@ -285,6 +287,12 @@ public static PantheonController<IbftContext> init(
} catch (final InterruptedException e) {
LOG.error("Failed to shutdown ibft processor executor");
}
timerExecutor.shutdownNow();
try {
timerExecutor.awaitTermination(5, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
LOG.error("Failed to shutdown timer executor");
}
try {
storageProvider.close();
} catch (final IOException e) {
Expand Down