Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
[NC-1909] IBFT message gossiping (#501)
Browse files Browse the repository at this point in the history
  • Loading branch information
Errorific authored Jan 9, 2019
1 parent 637af12 commit bd9ffac
Show file tree
Hide file tree
Showing 33 changed files with 867 additions and 268 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

import static org.assertj.core.api.Assertions.assertThat;

import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.IbftV2;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.Payload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.SignedData;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
Expand Down Expand Up @@ -60,15 +60,15 @@ public static boolean msgMatchesExpected(

switch (expectedPayload.getMessageType()) {
case IbftV2.PROPOSAL:
return ProposalMessage.fromMessage(actual).decode().equals(expected);
return ProposalMessageData.fromMessageData(actual).decode().equals(expected);
case IbftV2.PREPARE:
return PrepareMessage.fromMessage(actual).decode().equals(expected);
return PrepareMessageData.fromMessageData(actual).decode().equals(expected);
case IbftV2.COMMIT:
return CommitMessage.fromMessage(actual).decode().equals(expected);
return CommitMessageData.fromMessageData(actual).decode().equals(expected);
case IbftV2.NEW_ROUND:
return NewRoundMessage.fromMessage(actual).decode().equals(expected);
return NewRoundMessageData.fromMessageData(actual).decode().equals(expected);
case IbftV2.ROUND_CHANGE:
return RoundChangeMessage.fromMessage(actual).decode().equals(expected);
return RoundChangeMessageData.fromMessageData(actual).decode().equals(expected);
default:
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package tech.pegasys.pantheon.consensus.ibft.support;

import tech.pegasys.pantheon.consensus.ibft.network.IbftMulticaster;
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;

import java.util.Collection;
Expand All @@ -32,6 +33,15 @@ public void addNetworkPeers(final Collection<ValidatorPeer> nodes) {

@Override
public void multicastToValidators(final MessageData message) {
validatorNodes.forEach(v -> v.handleReceivedMessage(message));
validatorNodes.forEach(peer -> peer.handleReceivedMessage(message));
}

@Override
public void multicastToValidatorsExcept(
final MessageData message, final Collection<Address> exceptAddresses) {
validatorNodes
.stream()
.filter(peer -> !exceptAddresses.contains(peer.getNodeAddress()))
.forEach(peer -> peer.handleReceivedMessage(message));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft.support;

import static java.util.Collections.emptyList;

import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.net.SocketAddress;
import java.util.Set;

public class StubbedPeerConnection implements PeerConnection {

@Override
public void send(final Capability capability, final MessageData message)
throws PeerNotConnected {}

@Override
public Set<Capability> getAgreedCapabilities() {
return null;
}

@Override
public PeerInfo getPeer() {
return new PeerInfo(0, "IbftIntTestPeer", emptyList(), 0, BytesValue.EMPTY);
}

@Override
public void terminateConnection(final DisconnectReason reason, final boolean peerInitiated) {}

@Override
public void disconnect(final DisconnectReason reason) {}

@Override
public SocketAddress getLocalAddress() {
return null;
}

@Override
public SocketAddress getRemoteAddress() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package tech.pegasys.pantheon.consensus.ibft.support;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.Mockito.mock;
import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain;
import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryWorldStateArchive;

Expand All @@ -29,6 +30,7 @@
import tech.pegasys.pantheon.consensus.ibft.IbftContext;
import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue;
import tech.pegasys.pantheon.consensus.ibft.IbftExtraData;
import tech.pegasys.pantheon.consensus.ibft.IbftGossip;
import tech.pegasys.pantheon.consensus.ibft.IbftHelpers;
import tech.pegasys.pantheon.consensus.ibft.IbftProtocolSchedule;
import tech.pegasys.pantheon.consensus.ibft.RoundTimer;
Expand Down Expand Up @@ -64,6 +66,7 @@
import java.time.Instant;
import java.time.ZoneId;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -252,6 +255,9 @@ private static ControllerAndState createControllerAndFinalState(
final MessageValidatorFactory messageValidatorFactory =
new MessageValidatorFactory(proposerSelector, blockHeaderValidator, protocolContext);

// Disable Gossiping for integration tests.
final IbftGossip gossiper = mock(IbftGossip.class);

final IbftController ibftController =
new IbftController(
blockChain,
Expand All @@ -260,7 +266,9 @@ private static ControllerAndState createControllerAndFinalState(
finalState,
new IbftRoundFactory(finalState, protocolContext, protocolSchedule),
messageValidatorFactory,
protocolContext));
protocolContext),
new HashMap<>(),
gossiper);
//////////////////////////// END IBFT PantheonController ////////////////////////////

return new ControllerAndState(ibftController, finalState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftEvents;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftReceivedMessageEvent;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.CommitPayload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.MessageFactory;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.NewRoundPayload;
Expand All @@ -37,6 +37,7 @@
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.wire.DefaultMessage;

import java.util.Collections;
Expand All @@ -51,6 +52,7 @@ public class ValidatorPeer {
private final Address nodeAddress;
private final KeyPair nodeKeys;
private final MessageFactory messageFactory;
private final PeerConnection peerConnection = new StubbedPeerConnection();
private List<MessageData> receivedMessages = Lists.newArrayList();

private final IbftController localNodeController;
Expand All @@ -65,19 +67,24 @@ public ValidatorPeer(
this.localNodeController = localNodeController;
}

public Address getNodeAddress() {
return nodeAddress;
}

public SignedData<ProposalPayload> injectProposal(
final ConsensusRoundIdentifier rId, final Block block) {
final SignedData<ProposalPayload> payload =
messageFactory.createSignedProposalPayload(rId, block);
injectMessage(ProposalMessage.create(payload));

injectMessage(ProposalMessageData.create(payload));
return payload;
}

public SignedData<PreparePayload> injectPrepare(
final ConsensusRoundIdentifier rId, final Hash digest) {
final SignedData<PreparePayload> payload =
messageFactory.createSignedPreparePayload(rId, digest);
injectMessage(PrepareMessage.create(payload));
injectMessage(PrepareMessageData.create(payload));
return payload;
}

Expand All @@ -86,7 +93,7 @@ public SignedData<CommitPayload> injectCommit(
final Signature commitSeal = SECP256K1.sign(digest, nodeKeys);
final SignedData<CommitPayload> payload =
messageFactory.createSignedCommitPayload(rId, digest, commitSeal);
injectMessage(CommitMessage.create(payload));
injectMessage(CommitMessageData.create(payload));
return payload;
}

Expand All @@ -97,15 +104,15 @@ public SignedData<NewRoundPayload> injectNewRound(

final SignedData<NewRoundPayload> payload =
messageFactory.createSignedNewRoundPayload(rId, roundChangeCertificate, proposalPayload);
injectMessage(NewRoundMessage.create(payload));
injectMessage(NewRoundMessageData.create(payload));
return payload;
}

public SignedData<RoundChangePayload> injectRoundChange(
final ConsensusRoundIdentifier rId, final Optional<PreparedCertificate> preparedCertificate) {
final SignedData<RoundChangePayload> payload =
messageFactory.createSignedRoundChangePayload(rId, preparedCertificate);
injectMessage(RoundChangeMessage.create(payload));
injectMessage(RoundChangeMessageData.create(payload));
return payload;
}

Expand All @@ -122,7 +129,7 @@ public void clearReceivedMessages() {
}

public void injectMessage(final MessageData msgData) {
final DefaultMessage message = new DefaultMessage(null, msgData);
final DefaultMessage message = new DefaultMessage(peerConnection, msgData);
localNodeController.handleMessageEvent(
(IbftReceivedMessageEvent) IbftEvents.fromMessage(message));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft;

import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.IbftV2;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.SignedData;
import tech.pegasys.pantheon.consensus.ibft.network.IbftMulticaster;
import tech.pegasys.pantheon.crypto.SECP256K1.Signature;
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.p2p.api.Message;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.google.common.collect.Lists;

/** Class responsible for rebroadcasting IBFT messages to known validators */
public class IbftGossip {
private final IbftMulticaster peers;

// Size of the seenMessages cache, should end up utilising 65bytes * this number + some meta data
private final int maxSeenMessages;

// Set that starts evicting members when it hits capacity
private final Set<Signature> seenMessages =
Collections.newSetFromMap(
new LinkedHashMap<Signature, Boolean>() {
@Override
protected boolean removeEldestEntry(final Map.Entry<Signature, Boolean> eldest) {
return size() > maxSeenMessages;
}
});

IbftGossip(final IbftMulticaster peers, final int maxSeenMessages) {
this.maxSeenMessages = maxSeenMessages;
this.peers = peers;
}

/**
* Constructor that attaches gossip logic to a set of peers
*
* @param peers The always up to date set of connected peers that understand IBFT
*/
public IbftGossip(final IbftMulticaster peers) {
this(peers, 10_000);
}

/**
* Retransmit a given IBFT message to other known validators nodes
*
* @param message The raw message to be gossiped
* @return Whether the message was rebroadcast or has been ignored as a repeat
*/
public boolean gossipMessage(final Message message) {
final MessageData messageData = message.getData();
final SignedData<?> signedData;
switch (messageData.getCode()) {
case IbftV2.PROPOSAL:
signedData = ProposalMessageData.fromMessageData(messageData).decode();
break;
case IbftV2.PREPARE:
signedData = PrepareMessageData.fromMessageData(messageData).decode();
break;
case IbftV2.COMMIT:
signedData = CommitMessageData.fromMessageData(messageData).decode();
break;
case IbftV2.ROUND_CHANGE:
signedData = RoundChangeMessageData.fromMessageData(messageData).decode();
break;
case IbftV2.NEW_ROUND:
signedData = NewRoundMessageData.fromMessageData(messageData).decode();
break;
default:
throw new IllegalArgumentException(
"Received message does not conform to any recognised IBFT message structure.");
}
final Signature signature = signedData.getSignature();
if (seenMessages.contains(signature)) {
return false;
} else {
final List<Address> excludeAddressesList =
Lists.newArrayList(
message.getConnection().getPeer().getAddress(), signedData.getSender());
peers.multicastToValidatorsExcept(messageData, excludeAddressesList);
seenMessages.add(signature);
return true;
}
}
}
Loading

0 comments on commit bd9ffac

Please sign in to comment.