Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Ibft Height Manager (#418)
Browse files Browse the repository at this point in the history
The IbftHeightManager is responsible for all things 'meta-round'
related, such things as:
* Handling RoundTimeout and starting a new round
* Handling NewRound messages and starting a new round
* Ensuring RoundChange messages are sent at the correct time with
  appropriate content.
* Collating RoundChange messages and starting a new round using the
  best prepared certificate in the collection.
  • Loading branch information
rain-on authored Dec 19, 2018
1 parent c48604c commit cae7c90
Show file tree
Hide file tree
Showing 7 changed files with 633 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,242 @@
*/
package tech.pegasys.pantheon.consensus.ibft.statemachine;

import static tech.pegasys.pantheon.consensus.ibft.statemachine.IbftBlockHeightManager.MessageAge.CURRENT_ROUND;
import static tech.pegasys.pantheon.consensus.ibft.statemachine.IbftBlockHeightManager.MessageAge.FUTURE_ROUND;
import static tech.pegasys.pantheon.consensus.ibft.statemachine.IbftBlockHeightManager.MessageAge.PRIOR_ROUND;

import tech.pegasys.pantheon.consensus.ibft.BlockTimer;
import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.RoundTimer;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.CommitPayload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.MessageFactory;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.NewRoundPayload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.Payload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.PreparePayload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.PreparedCertificate;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.ProposalPayload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.RoundChangeCertificate;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.RoundChangePayload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.SignedData;
import tech.pegasys.pantheon.consensus.ibft.validation.MessageValidatorFactory;
import tech.pegasys.pantheon.consensus.ibft.validation.NewRoundMessageValidator;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;

import java.time.Clock;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/** This no-op version will be replaced with an implementation in another PR */
/**
* Responsible for starting/clearing Consensus rounds at a given block height. One of these is
* created when a new block is imported to the chain. It immediately then creates a Round-0 object,
* and sends a Proposal message. If the round times out prior to importing a block, this class is
* responsible for creating a RoundChange message and transmitting it.
*/
public class IbftBlockHeightManager {

public void handleProposalMessage(final SignedData<ProposalPayload> proposalMsg) {}
protected enum MessageAge {
PRIOR_ROUND,
CURRENT_ROUND,
FUTURE_ROUND
}

private static final Logger LOG = LogManager.getLogger();

public void handlePrepareMessage(final SignedData<PreparePayload> prepareMsg) {}
private final IbftRoundFactory roundFactory;
private final RoundChangeManager roundChangeManager;
private final BlockHeader parentHeader;
private final RoundTimer roundTimer;
private final BlockTimer blockTimer;
private final IbftMessageTransmitter transmitter;
private final MessageFactory messageFactory;
private final Map<Integer, RoundState> futureRoundStateBuffer = Maps.newHashMap();
private final NewRoundMessageValidator newRoundMessageValidator;
private final Clock clock;
private final Function<ConsensusRoundIdentifier, RoundState> roundStateCreator;
private final IbftFinalState finalState;

public void handleCommitMessage(final SignedData<CommitPayload> commitMsg) {}
private Optional<PreparedCertificate> latestPreparedCertificate = Optional.empty();

public void handleBlockTimerExpiry(final ConsensusRoundIdentifier roundIndentifier) {}
private IbftRound currentRound;

public void handleRoundChangeMessage(final SignedData<RoundChangePayload> roundChangeMsg) {}
public IbftBlockHeightManager(
final BlockHeader parentHeader,
final IbftFinalState finalState,
final RoundChangeManager roundChangeManager,
final IbftRoundFactory ibftRoundFactory,
final Clock clock,
final MessageValidatorFactory messageValidatorFactory) {
this.parentHeader = parentHeader;
this.roundFactory = ibftRoundFactory;
this.roundTimer = finalState.getRoundTimer();
this.blockTimer = finalState.getBlockTimer();
this.transmitter = finalState.getTransmitter();
this.messageFactory = finalState.getMessageFactory();
this.clock = clock;
this.roundChangeManager = roundChangeManager;
this.finalState = finalState;

public void handleNewRoundMessage(final SignedData<NewRoundPayload> newRoundMsg) {}
newRoundMessageValidator = messageValidatorFactory.createNewRoundValidator(parentHeader);

public void start() {}
roundStateCreator =
(roundIdentifier) ->
new RoundState(
roundIdentifier,
finalState.getQuorumSize(),
messageValidatorFactory.createMessageValidator(roundIdentifier, parentHeader));
}

public void start() {
startNewRound(0);
if (finalState.isLocalNodeProposerForRound(currentRound.getRoundIdentifier())) {
blockTimer.startTimer(currentRound.getRoundIdentifier(), parentHeader);
}
}

public void handleBlockTimerExpiry(final ConsensusRoundIdentifier roundIdentifier) {
if (roundIdentifier.equals(currentRound.getRoundIdentifier())) {
currentRound.createAndSendProposalMessage(clock.millis() / 1000);
} else {
LOG.info(
"Block timer expired for a round ({}) other than current ({})",
roundIdentifier,
currentRound.getRoundIdentifier());
}
}

public void roundExpired(final RoundExpiry expire) {
if (!expire.getView().equals(currentRound.getRoundIdentifier())) {
LOG.debug("Ignoring Round timer expired which does not match current round.");
return;
}

LOG.info("Round has expired, creating PreparedCertificate and notifying peers.");
final Optional<PreparedCertificate> preparedCertificate =
currentRound.createPrepareCertificate();

if (preparedCertificate.isPresent()) {
latestPreparedCertificate = preparedCertificate;
}

startNewRound(currentRound.getRoundIdentifier().getRoundNumber() + 1);

final SignedData<RoundChangePayload> localRoundChange =
messageFactory.createSignedRoundChangePayload(
currentRound.getRoundIdentifier(), latestPreparedCertificate);
transmitter.multicastRoundChange(currentRound.getRoundIdentifier(), latestPreparedCertificate);

// Its possible the locally created RoundChange triggers the transmission of a NewRound
// message - so it must be handled accordingly.
handleRoundChangeMessage(localRoundChange);
}

public void handleProposalMessage(final SignedData<ProposalPayload> msg) {
LOG.info("Received a Proposal message.");
actionOrBufferMessage(msg, currentRound::handleProposalMessage, RoundState::setProposedBlock);
}

public void handlePrepareMessage(final SignedData<PreparePayload> msg) {
LOG.info("Received a prepare message.");
actionOrBufferMessage(msg, currentRound::handlePrepareMessage, RoundState::addPrepareMessage);
}

public void handleCommitMessage(final SignedData<CommitPayload> msg) {
LOG.info("Received a commit message.");
actionOrBufferMessage(msg, currentRound::handleCommitMessage, RoundState::addCommitMessage);
}

private <T extends Payload> void actionOrBufferMessage(
final SignedData<T> msg,
final Consumer<SignedData<T>> inRoundHandler,
final BiConsumer<RoundState, SignedData<T>> buffer) {
final Payload payload = msg.getPayload();
final MessageAge messageAge = determineAgeOfPayload(payload);
if (messageAge == CURRENT_ROUND) {
inRoundHandler.accept(msg);
} else if (messageAge == FUTURE_ROUND) {
final ConsensusRoundIdentifier msgRoundId = payload.getRoundIdentifier();
final RoundState roundstate =
futureRoundStateBuffer.computeIfAbsent(
msgRoundId.getRoundNumber(), k -> roundStateCreator.apply(msgRoundId));
buffer.accept(roundstate, msg);
}
}

public void handleRoundChangeMessage(final SignedData<RoundChangePayload> msg) {
final Optional<RoundChangeCertificate> result =
roundChangeManager.appendRoundChangeMessage(msg);
final MessageAge messageAge = determineAgeOfPayload(msg.getPayload());
if (messageAge == PRIOR_ROUND) {
LOG.info("Received RoundChange Message for a prior round.");
return;
}
ConsensusRoundIdentifier targetRound = msg.getPayload().getRoundIdentifier();
LOG.info("Received a RoundChange message for {}", targetRound.toString());

if (result.isPresent()) {
if (messageAge == FUTURE_ROUND) {
startNewRound(targetRound.getRoundNumber());
}

if (finalState.isLocalNodeProposerForRound(targetRound)) {
currentRound.startRoundWith(result.get(), clock.millis() / 1000);
}
}
}

private void startNewRound(final int roundNumber) {
LOG.info("Starting new round {}", roundNumber);
if (futureRoundStateBuffer.containsKey(roundNumber)) {
currentRound =
roundFactory.createNewRoundWithState(
parentHeader, futureRoundStateBuffer.get(roundNumber));
futureRoundStateBuffer.keySet().removeIf(k -> k <= roundNumber);
} else {
currentRound = roundFactory.createNewRound(parentHeader, roundNumber);
}
// discard roundChange messages from the current and previous rounds
roundChangeManager.discardRoundsPriorTo(currentRound.getRoundIdentifier());
roundTimer.startTimer(currentRound.getRoundIdentifier());
}

public void handleNewRoundMessage(final SignedData<NewRoundPayload> msg) {
final NewRoundPayload payload = msg.getPayload();
final MessageAge messageAge = determineAgeOfPayload(payload);

if (messageAge == PRIOR_ROUND) {
LOG.info("Received NewRound Message for a prior round.");
return;
}
LOG.info("Received NewRound Message for {}", payload.getRoundIdentifier().toString());

if (newRoundMessageValidator.validateNewRoundMessage(msg)) {
if (messageAge == FUTURE_ROUND) {
startNewRound(payload.getRoundIdentifier().getRoundNumber());
}
currentRound.handleProposalMessage(payload.getProposalPayload());
}
}

public long getChainHeight() {
return 0;
return currentRound.getRoundIdentifier().getSequenceNumber();
}

public void roundExpired(final RoundExpiry expired) {}
private MessageAge determineAgeOfPayload(final Payload payload) {
int messageRoundNumber = payload.getRoundIdentifier().getRoundNumber();
int currentRoundNumber = currentRound.getRoundIdentifier().getRoundNumber();
if (messageRoundNumber > currentRoundNumber) {
return FUTURE_ROUND;
} else if (messageRoundNumber == currentRoundNumber) {
return CURRENT_ROUND;
}
return PRIOR_ROUND;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,43 @@
*/
package tech.pegasys.pantheon.consensus.ibft.statemachine;

import tech.pegasys.pantheon.consensus.ibft.IbftContext;
import tech.pegasys.pantheon.consensus.ibft.validation.MessageValidatorFactory;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;

/** This no-op version will be replaced with an implementation in another PR */
import java.time.Clock;

public class IbftBlockHeightManagerFactory {

private final IbftRoundFactory roundFactory;
private final IbftFinalState finalState;
private final ProtocolContext<IbftContext> protocolContext;
private final MessageValidatorFactory messageValidatorFactory;

public IbftBlockHeightManagerFactory(
final IbftFinalState finalState,
final IbftRoundFactory roundFactory,
final MessageValidatorFactory messageValidatorFactory,
final ProtocolContext<IbftContext> protocolContext) {
this.roundFactory = roundFactory;
this.finalState = finalState;
this.protocolContext = protocolContext;
this.messageValidatorFactory = messageValidatorFactory;
}

public IbftBlockHeightManager create(final BlockHeader parentHeader) {
return new IbftBlockHeightManager();
long nextChainHeight = parentHeader.getNumber() + 1;
return new IbftBlockHeightManager(
parentHeader,
finalState,
new RoundChangeManager(
nextChainHeight,
finalState.getValidators(),
(roundIdentifier) ->
messageValidatorFactory.createMessageValidator(roundIdentifier, parentHeader)),
roundFactory,
Clock.systemUTC(),
messageValidatorFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator;

import java.time.Clock;
import java.util.Collection;

/** This is the full data set, or context, required for many of the aspects of the IBFT workflow. */
Expand All @@ -42,6 +43,7 @@ public class IbftFinalState {
private final MessageFactory messageFactory;
private final BlockHeaderValidator<IbftContext> ibftContextBlockHeaderValidator;
private final IbftMessageTransmitter messageTransmitter;
private final Clock clock;

public IbftFinalState(
final ValidatorProvider validatorProvider,
Expand All @@ -53,7 +55,8 @@ public IbftFinalState(
final BlockTimer blockTimer,
final IbftBlockCreatorFactory blockCreatorFactory,
final MessageFactory messageFactory,
final BlockHeaderValidator<IbftContext> ibftContextBlockHeaderValidator) {
final BlockHeaderValidator<IbftContext> ibftContextBlockHeaderValidator,
final Clock clock) {
this.validatorProvider = validatorProvider;
this.nodeKeys = nodeKeys;
this.localAddress = localAddress;
Expand All @@ -65,6 +68,7 @@ public IbftFinalState(
this.messageFactory = messageFactory;
this.ibftContextBlockHeaderValidator = ibftContextBlockHeaderValidator;
this.messageTransmitter = new IbftMessageTransmitter(messageFactory, peers);
this.clock = clock;
}

public int getQuorumSize() {
Expand Down Expand Up @@ -122,4 +126,8 @@ public BlockHeaderValidator<IbftContext> getBlockHeaderValidator() {
public IbftMessageTransmitter getTransmitter() {
return messageTransmitter;
}

public Clock getClock() {
return clock;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public IbftRound createNewRound(final BlockHeader parentHeader, final int round)
long nextBlockHeight = parentHeader.getNumber() + 1;
final ConsensusRoundIdentifier roundIdentifier =
new ConsensusRoundIdentifier(nextBlockHeight, round);
final IbftBlockCreator blockCreator = blockCreatorFactory.create(parentHeader, round);

final RoundState roundState =
new RoundState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,12 @@ private RoundChangeStatus storeRoundChangeMessage(final SignedData<RoundChangePa
*
* @param completedRoundIdentifier round identifier that has been identified as superseded
*/
public void discardCompletedRound(final ConsensusRoundIdentifier completedRoundIdentifier) {
roundChangeCache
.entrySet()
.removeIf(entry -> isAnEarlierOrEqualRound(entry.getKey(), completedRoundIdentifier));
public void discardRoundsPriorTo(final ConsensusRoundIdentifier completedRoundIdentifier) {
roundChangeCache.keySet().removeIf(k -> isAnEarlierRound(k, completedRoundIdentifier));
}

private boolean isAnEarlierOrEqualRound(
private boolean isAnEarlierRound(
final ConsensusRoundIdentifier left, final ConsensusRoundIdentifier right) {
return left.getRoundNumber() <= right.getRoundNumber();
return left.getRoundNumber() < right.getRoundNumber();
}
}
Loading

0 comments on commit cae7c90

Please sign in to comment.