Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Update IbftBlockHeigntManager to accept new message types. #737

Merged
merged 5 commits into from
Feb 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@

import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry;
import tech.pegasys.pantheon.consensus.ibft.payload.CommitPayload;
import tech.pegasys.pantheon.consensus.ibft.payload.NewRoundPayload;
import tech.pegasys.pantheon.consensus.ibft.payload.PreparePayload;
import tech.pegasys.pantheon.consensus.ibft.payload.ProposalPayload;
import tech.pegasys.pantheon.consensus.ibft.payload.RoundChangePayload;
import tech.pegasys.pantheon.consensus.ibft.payload.SignedData;
import tech.pegasys.pantheon.consensus.ibft.messagewrappers.Commit;
import tech.pegasys.pantheon.consensus.ibft.messagewrappers.NewRound;
import tech.pegasys.pantheon.consensus.ibft.messagewrappers.Prepare;
import tech.pegasys.pantheon.consensus.ibft.messagewrappers.Proposal;
import tech.pegasys.pantheon.consensus.ibft.messagewrappers.RoundChange;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;

public interface BlockHeightManager {
Expand All @@ -30,15 +29,15 @@ public interface BlockHeightManager {

void roundExpired(RoundExpiry expire);

void handleProposalPayload(SignedData<ProposalPayload> signedPayload);
void handleProposalPayload(Proposal proposal);

void handlePreparePayload(SignedData<PreparePayload> signedPayload);
void handlePreparePayload(Prepare prepare);

void handleCommitPayload(SignedData<CommitPayload> payload);
void handleCommitPayload(Commit commit);

void handleRoundChangePayload(SignedData<RoundChangePayload> signedPayload);
void handleRoundChangePayload(RoundChange roundChange);

void handleNewRoundPayload(SignedData<NewRoundPayload> signedPayload);
void handleNewRoundPayload(NewRound newRound);

long getChainHeight();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@
import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.RoundTimer;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry;
import tech.pegasys.pantheon.consensus.ibft.messagewrappers.Commit;
import tech.pegasys.pantheon.consensus.ibft.messagewrappers.IbftMessage;
import tech.pegasys.pantheon.consensus.ibft.messagewrappers.NewRound;
import tech.pegasys.pantheon.consensus.ibft.messagewrappers.Prepare;
import tech.pegasys.pantheon.consensus.ibft.messagewrappers.Proposal;
import tech.pegasys.pantheon.consensus.ibft.messagewrappers.RoundChange;
import tech.pegasys.pantheon.consensus.ibft.network.IbftMessageTransmitter;
import tech.pegasys.pantheon.consensus.ibft.payload.CommitPayload;
import tech.pegasys.pantheon.consensus.ibft.payload.MessageFactory;
import tech.pegasys.pantheon.consensus.ibft.payload.NewRoundPayload;
import tech.pegasys.pantheon.consensus.ibft.payload.Payload;
import tech.pegasys.pantheon.consensus.ibft.payload.PreparePayload;
import tech.pegasys.pantheon.consensus.ibft.payload.PreparedCertificate;
import tech.pegasys.pantheon.consensus.ibft.payload.ProposalPayload;
import tech.pegasys.pantheon.consensus.ibft.payload.RoundChangeCertificate;
import tech.pegasys.pantheon.consensus.ibft.payload.RoundChangePayload;
import tech.pegasys.pantheon.consensus.ibft.payload.SignedData;
import tech.pegasys.pantheon.consensus.ibft.validation.MessageValidatorFactory;
import tech.pegasys.pantheon.consensus.ibft.validation.NewRoundMessageValidator;
Expand Down Expand Up @@ -143,9 +144,10 @@ public void roundExpired(final RoundExpiry expire) {

startNewRound(currentRound.getRoundIdentifier().getRoundNumber() + 1);

final SignedData<RoundChangePayload> localRoundChange =
messageFactory.createSignedRoundChangePayload(
currentRound.getRoundIdentifier(), latestPreparedCertificate);
final RoundChange localRoundChange =
new RoundChange(
messageFactory.createSignedRoundChangePayload(
currentRound.getRoundIdentifier(), latestPreparedCertificate));
transmitter.multicastRoundChange(currentRound.getRoundIdentifier(), latestPreparedCertificate);

// Its possible the locally created RoundChange triggers the transmission of a NewRound
Expand All @@ -154,55 +156,56 @@ public void roundExpired(final RoundExpiry expire) {
}

@Override
public void handleProposalPayload(final SignedData<ProposalPayload> signedPayload) {
public void handleProposalPayload(final Proposal proposal) {
LOG.debug("Received a Proposal Payload.");
actionOrBufferMessage(
signedPayload, currentRound::handleProposalMessage, RoundState::setProposedBlock);
proposal, currentRound::handleProposalMessage, RoundState::setProposedBlock);
}

@Override
public void handlePreparePayload(final SignedData<PreparePayload> signedPayload) {
public void handlePreparePayload(final Prepare prepare) {
LOG.debug("Received a Prepare Payload.");
actionOrBufferMessage(
signedPayload, currentRound::handlePrepareMessage, RoundState::addPrepareMessage);
prepare, currentRound::handlePrepareMessage, RoundState::addPrepareMessage);
}

@Override
public void handleCommitPayload(final SignedData<CommitPayload> payload) {
public void handleCommitPayload(final Commit commit) {
LOG.debug("Received a Commit Payload.");
actionOrBufferMessage(payload, currentRound::handleCommitMessage, RoundState::addCommitMessage);
actionOrBufferMessage(commit, currentRound::handleCommitMessage, RoundState::addCommitMessage);
}

private <T extends Payload> void actionOrBufferMessage(
final SignedData<T> msgData,
final Consumer<SignedData<T>> inRoundHandler,
final BiConsumer<RoundState, SignedData<T>> buffer) {
final Payload payload = msgData.getPayload();
final MessageAge messageAge = determineAgeOfPayload(payload);
private <P extends Payload, M extends IbftMessage<P>> void actionOrBufferMessage(
final M ibftMessage,
final Consumer<SignedData<P>> inRoundHandler,
final BiConsumer<RoundState, SignedData<P>> buffer) {
final MessageAge messageAge =
determineAgeOfPayload(ibftMessage.getRoundIdentifier().getRoundNumber());
if (messageAge == CURRENT_ROUND) {
inRoundHandler.accept(msgData);
inRoundHandler.accept(ibftMessage.getSignedPayload());
} else if (messageAge == FUTURE_ROUND) {
final ConsensusRoundIdentifier msgRoundId = payload.getRoundIdentifier();
final ConsensusRoundIdentifier msgRoundId = ibftMessage.getRoundIdentifier();
final RoundState roundstate =
futureRoundStateBuffer.computeIfAbsent(
msgRoundId.getRoundNumber(), k -> roundStateCreator.apply(msgRoundId));
buffer.accept(roundstate, msgData);
buffer.accept(roundstate, ibftMessage.getSignedPayload());
}
}

@Override
public void handleRoundChangePayload(final SignedData<RoundChangePayload> signedPayload) {
final ConsensusRoundIdentifier targetRound = signedPayload.getPayload().getRoundIdentifier();
public void handleRoundChangePayload(final RoundChange message) {
final ConsensusRoundIdentifier targetRound = message.getRoundIdentifier();
LOG.info("Received a RoundChange Payload for {}", targetRound.toString());

final MessageAge messageAge = determineAgeOfPayload(signedPayload.getPayload());
final MessageAge messageAge =
determineAgeOfPayload(message.getRoundIdentifier().getRoundNumber());
if (messageAge == PRIOR_ROUND) {
LOG.debug("Received RoundChange Payload for a prior round. targetRound={}", targetRound);
return;
}

final Optional<RoundChangeCertificate> result =
roundChangeManager.appendRoundChangeMessage(signedPayload);
roundChangeManager.appendRoundChangeMessage(message.getSignedPayload());
if (result.isPresent()) {
if (messageAge == FUTURE_ROUND) {
startNewRound(targetRound.getRoundNumber());
Expand Down Expand Up @@ -230,21 +233,22 @@ private void startNewRound(final int roundNumber) {
}

@Override
public void handleNewRoundPayload(final SignedData<NewRoundPayload> signedPayload) {
final NewRoundPayload payload = signedPayload.getPayload();
final MessageAge messageAge = determineAgeOfPayload(payload);
public void handleNewRoundPayload(final NewRound newRound) {
// final NewRoundPayload payload = newRound.getSignedPayload().getPayload();
final MessageAge messageAge =
determineAgeOfPayload(newRound.getRoundIdentifier().getRoundNumber());

if (messageAge == PRIOR_ROUND) {
LOG.info("Received NewRound Payload for a prior round={}", payload.getRoundIdentifier());
LOG.info("Received NewRound Payload for a prior round={}", newRound.getRoundIdentifier());
return;
}
LOG.info("Received NewRound Payload for {}", payload.getRoundIdentifier());
LOG.info("Received NewRound Payload for {}", newRound.getRoundIdentifier());

if (newRoundMessageValidator.validateNewRoundMessage(signedPayload)) {
if (newRoundMessageValidator.validateNewRoundMessage(newRound.getSignedPayload())) {
if (messageAge == FUTURE_ROUND) {
startNewRound(payload.getRoundIdentifier().getRoundNumber());
startNewRound(newRound.getRoundIdentifier().getRoundNumber());
}
currentRound.handleProposalFromNewRound(signedPayload);
currentRound.handleProposalFromNewRound(newRound.getSignedPayload());
}
}

Expand All @@ -258,8 +262,7 @@ public BlockHeader getParentBlockHeader() {
return parentHeader;
}

private MessageAge determineAgeOfPayload(final Payload payload) {
final int messageRoundNumber = payload.getRoundIdentifier().getRoundNumber();
private MessageAge determineAgeOfPayload(final int messageRoundNumber) {
final int currentRoundNumber = currentRound.getRoundIdentifier().getRoundNumber();
if (messageRoundNumber > currentRoundNumber) {
return FUTURE_ROUND;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import tech.pegasys.pantheon.consensus.ibft.messagedata.RoundChangeMessageData;
import tech.pegasys.pantheon.consensus.ibft.messagewrappers.IbftMessage;
import tech.pegasys.pantheon.consensus.ibft.payload.Authored;
import tech.pegasys.pantheon.consensus.ibft.payload.Payload;
import tech.pegasys.pantheon.consensus.ibft.payload.SignedData;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.p2p.api.Message;
Expand Down Expand Up @@ -132,17 +130,15 @@ private void handleMessage(final Message message) {
}
}

private <P extends Payload> void consumeMessage(
final Message message,
final IbftMessage<P> ibftMessage,
final Consumer<SignedData<P>> handleMessage) {
private <P extends IbftMessage<?>> void consumeMessage(
final Message message, final P ibftMessage, final Consumer<P> handleMessage) {
LOG.debug(
"Received IBFT message messageType={} payload={}",
ibftMessage.getMessageType(),
ibftMessage);
if (processMessage(ibftMessage, message)) {
gossiper.send(message);
handleMessage.accept(ibftMessage.getSignedPayload());
handleMessage.accept(ibftMessage);
}
}

Expand Down Expand Up @@ -205,7 +201,7 @@ private void startNewHeightManager(final BlockHeader parentHeader) {
futureMessages.remove(newChainHeight);
}

private boolean processMessage(final IbftMessage<? extends Payload> msg, final Message rawMsg) {
private boolean processMessage(final IbftMessage<?> msg, final Message rawMsg) {
final ConsensusRoundIdentifier msgRoundIdentifier = msg.getRoundIdentifier();
if (isMsgForCurrentHeight(msgRoundIdentifier)) {
return isMsgFromKnownValidator(msg) && ibftFinalState.isLocalNodeValidator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@

import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry;
import tech.pegasys.pantheon.consensus.ibft.payload.CommitPayload;
import tech.pegasys.pantheon.consensus.ibft.payload.NewRoundPayload;
import tech.pegasys.pantheon.consensus.ibft.payload.PreparePayload;
import tech.pegasys.pantheon.consensus.ibft.payload.ProposalPayload;
import tech.pegasys.pantheon.consensus.ibft.payload.RoundChangePayload;
import tech.pegasys.pantheon.consensus.ibft.payload.SignedData;
import tech.pegasys.pantheon.consensus.ibft.messagewrappers.Commit;
import tech.pegasys.pantheon.consensus.ibft.messagewrappers.NewRound;
import tech.pegasys.pantheon.consensus.ibft.messagewrappers.Prepare;
import tech.pegasys.pantheon.consensus.ibft.messagewrappers.Proposal;
import tech.pegasys.pantheon.consensus.ibft.messagewrappers.RoundChange;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;

public class NoOpBlockHeightManager implements BlockHeightManager {
Expand All @@ -40,19 +39,19 @@ public void handleBlockTimerExpiry(final ConsensusRoundIdentifier roundIdentifie
public void roundExpired(final RoundExpiry expire) {}

@Override
public void handleProposalPayload(final SignedData<ProposalPayload> signedPayload) {}
public void handleProposalPayload(final Proposal proposal) {}

@Override
public void handlePreparePayload(final SignedData<PreparePayload> signedPayload) {}
public void handlePreparePayload(final Prepare prepare) {}

@Override
public void handleCommitPayload(final SignedData<CommitPayload> payload) {}
public void handleCommitPayload(final Commit commit) {}

@Override
public void handleRoundChangePayload(final SignedData<RoundChangePayload> signedPayload) {}
public void handleRoundChangePayload(final RoundChange roundChange) {}

@Override
public void handleNewRoundPayload(final SignedData<NewRoundPayload> signedPayload) {}
public void handleNewRoundPayload(final NewRound newRound) {}

@Override
public long getChainHeight() {
Expand Down
Loading