Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Ibft pantheon controller #461

Merged
merged 6 commits into from
Dec 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
*/
package tech.pegasys.pantheon.consensus.ibft;

import tech.pegasys.pantheon.consensus.ibft.ibftevent.BlockTimerExpiry;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftEvent;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftReceivedMessageEvent;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.NewChainHead;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry;
import tech.pegasys.pantheon.consensus.ibft.statemachine.IbftController;

import java.util.Optional;
import java.util.concurrent.Executors;
Expand All @@ -25,45 +30,37 @@

/** Execution context for draining queued ibft events and applying them to a maintained state */
public class IbftProcessor implements Runnable {

private static final Logger LOG = LogManager.getLogger();

private final IbftEventQueue incomingQueue;
private final ScheduledExecutorService roundTimerExecutor;
private final RoundTimer roundTimer;
private final IbftStateMachine stateMachine;
private volatile boolean shutdown = false;
private final IbftController ibftController;

/**
* Construct a new IbftProcessor
*
* @param incomingQueue The event queue from which to drain new events
* @param baseRoundExpirySeconds The expiry time in milliseconds of round 0
* @param stateMachine an IbftStateMachine ready to process events and maintain state
* @param ibftController an object capable of handling any/all IBFT events
*/
public IbftProcessor(
final IbftEventQueue incomingQueue,
final int baseRoundExpirySeconds,
final IbftStateMachine stateMachine) {
public IbftProcessor(final IbftEventQueue incomingQueue, final IbftController ibftController) {
// Spawning the round timer with a single thread as we should never have more than 1 timer in
// flight at a time
this(
incomingQueue,
baseRoundExpirySeconds,
stateMachine,
Executors.newSingleThreadScheduledExecutor());
this(incomingQueue, ibftController, Executors.newSingleThreadScheduledExecutor());
}

@VisibleForTesting
IbftProcessor(
final IbftEventQueue incomingQueue,
final int baseRoundExpirySeconds,
final IbftStateMachine stateMachine,
final IbftController ibftController,
final ScheduledExecutorService roundTimerExecutor) {
this.incomingQueue = incomingQueue;
this.ibftController = ibftController;
this.roundTimerExecutor = roundTimerExecutor;
this.roundTimer = new RoundTimer(incomingQueue, baseRoundExpirySeconds, roundTimerExecutor);
this.stateMachine = stateMachine;
}

public void start() {
ibftController.start();
}

/** Indicate to the processor that it should gracefully stop at its next opportunity */
Expand All @@ -74,25 +71,46 @@ public void stop() {
@Override
public void run() {
while (!shutdown) {
Optional<IbftEvent> newEvent = Optional.empty();
try {
newEvent = Optional.ofNullable(incomingQueue.poll(2, TimeUnit.SECONDS));
} catch (final InterruptedException interrupt) {
// If the queue was interrupted propagate it and spin to check our shutdown status
Thread.currentThread().interrupt();
}

newEvent.ifPresent(
ibftEvent -> {
try {
stateMachine.processEvent(ibftEvent, roundTimer);
} catch (final Exception e) {
LOG.error(
"State machine threw exception while processing event {" + ibftEvent + "}", e);
}
});
nextIbftEvent().ifPresent(this::handleIbftEvent);
}
// Clean up the executor service the round timer has been utilising
roundTimerExecutor.shutdownNow();
}

private void handleIbftEvent(final IbftEvent ibftEvent) {
try {
switch (ibftEvent.getType()) {
case MESSAGE:
final IbftReceivedMessageEvent rxEvent = (IbftReceivedMessageEvent) ibftEvent;
ibftController.handleMessageEvent(rxEvent);
break;
case ROUND_EXPIRY:
final RoundExpiry roundExpiryEvent = (RoundExpiry) ibftEvent;
ibftController.handleRoundExpiry(roundExpiryEvent);
break;
case NEW_CHAIN_HEAD:
final NewChainHead newChainHead = (NewChainHead) ibftEvent;
ibftController.handleNewBlockEvent(newChainHead);
break;
case BLOCK_TIMER_EXPIRY:
final BlockTimerExpiry blockTimerExpiry = (BlockTimerExpiry) ibftEvent;
ibftController.handleBlockTimerExpiry(blockTimerExpiry);
break;
default:
throw new RuntimeException("Illegal event in queue.");
}
} catch (final Exception e) {
LOG.error("State machine threw exception while processing event {" + ibftEvent + "}", e);
}
}

private Optional<IbftEvent> nextIbftEvent() {
try {
return Optional.ofNullable(incomingQueue.poll(500, TimeUnit.MILLISECONDS));
} catch (final InterruptedException interrupt) {
// If the queue was interrupted propagate it and spin to check our shutdown status
Thread.currentThread().interrupt();
return Optional.empty();
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,77 @@
*/
package tech.pegasys.pantheon.consensus.ibft.blockcreation;

import static org.apache.logging.log4j.LogManager.getLogger;

import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue;
import tech.pegasys.pantheon.consensus.ibft.IbftProcessor;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.NewChainHead;
import tech.pegasys.pantheon.ethereum.blockcreation.MiningCoordinator;
import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent;
import tech.pegasys.pantheon.ethereum.chain.BlockAddedObserver;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.util.bytes.BytesValue;

public class IbftMiningCoordinator implements MiningCoordinator {
import org.apache.logging.log4j.Logger;

public class IbftMiningCoordinator implements MiningCoordinator, BlockAddedObserver {

private final IbftBlockCreatorFactory blockCreatorFactory;
private static final Logger LOG = getLogger();
protected final Blockchain blockchain;
private final IbftEventQueue eventQueue;
private final IbftProcessor ibftProcessor;

public IbftMiningCoordinator(final IbftBlockCreatorFactory blockCreatorFactory) {
public IbftMiningCoordinator(
final IbftProcessor ibftProcessor,
final IbftBlockCreatorFactory blockCreatorFactory,
final Blockchain blockchain,
final IbftEventQueue eventQueue) {
this.ibftProcessor = ibftProcessor;
this.blockCreatorFactory = blockCreatorFactory;
this.eventQueue = eventQueue;

this.blockchain = blockchain;
this.blockchain.observeBlockAdded(this);
}

@Override
public void enable() {}
public void enable() {
ibftProcessor.start();
// IbftProcessor is implicitly running (but maybe should have a discard" all)
}

@Override
public void disable() {}
public void disable() {
ibftProcessor.stop();
}

@Override
public boolean isRunning() {
return true;
}

@Override
public void setMinTransactionGasPrice(final Wei minGasPrice) {}
public void setMinTransactionGasPrice(final Wei minGasPrice) {
blockCreatorFactory.setMinTransactionGasPrice(minGasPrice);
}

@Override
public Wei getMinTransactionGasPrice() {
return null;
return blockCreatorFactory.getMinTransactionGasPrice();
}

@Override
public void setExtraData(final BytesValue extraData) {
blockCreatorFactory.setExtraData(extraData);
}

@Override
public void onBlockAdded(final BlockAddedEvent event, final Blockchain blockchain) {
LOG.info("New canonical head detected. {} ", event.isNewCanonicalHead());
if (event.isNewCanonicalHead()) {
eventQueue.add(new NewChainHead(event.getBlock().getHeader()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.mockito.Mockito.verify;

import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry;
import tech.pegasys.pantheon.consensus.ibft.statemachine.IbftController;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -39,20 +40,20 @@
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class IbftProcessorTest {
private ScheduledExecutorService mockExecutorService;
private IbftStateMachine mockStateMachine;
private IbftController mockIbftController;

@Before
public void initialise() {
mockExecutorService = mock(ScheduledExecutorService.class);
mockStateMachine = mock(IbftStateMachine.class);
mockIbftController = mock(IbftController.class);
}

@Test
public void handlesStopGracefully() throws InterruptedException {
final IbftEventQueue mockQueue = mock(IbftEventQueue.class);
Mockito.when(mockQueue.poll(anyLong(), any())).thenReturn(null);
final IbftProcessor processor =
new IbftProcessor(mockQueue, 1, mockStateMachine, mockExecutorService);
new IbftProcessor(mockQueue, mockIbftController, mockExecutorService);

// Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
Expand Down Expand Up @@ -81,7 +82,7 @@ public void handlesStopGracefully() throws InterruptedException {
@Test
public void cleanupExecutorsAfterShutdownNow() throws InterruptedException {
final IbftProcessor processor =
new IbftProcessor(new IbftEventQueue(), 1, mockStateMachine, mockExecutorService);
new IbftProcessor(new IbftEventQueue(), mockIbftController, mockExecutorService);

// Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
Expand Down Expand Up @@ -111,7 +112,7 @@ public void handlesQueueInterruptGracefully() throws InterruptedException {
Mockito.when(mockQueue.poll(anyLong(), any())).thenThrow(new InterruptedException());

final IbftProcessor processor =
new IbftProcessor(mockQueue, 1, mockStateMachine, mockExecutorService);
new IbftProcessor(mockQueue, mockIbftController, mockExecutorService);

// Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
Expand Down Expand Up @@ -144,7 +145,7 @@ public void handlesQueueInterruptGracefully() throws InterruptedException {
public void drainEventsIntoStateMachine() throws InterruptedException {
final IbftEventQueue queue = new IbftEventQueue();
final IbftProcessor processor =
new IbftProcessor(queue, 1, mockStateMachine, mockExecutorService);
new IbftProcessor(queue, mockIbftController, mockExecutorService);

// Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
Expand All @@ -160,6 +161,6 @@ public void drainEventsIntoStateMachine() throws InterruptedException {
processor.stop();
processorExecutor.shutdown();

verify(mockStateMachine, times(2)).processEvent(eq(roundExpiryEvent), any());
verify(mockIbftController, times(2)).handleRoundExpiry(eq(roundExpiryEvent));
}
}
Loading