Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[NC-2058] initial scaffolding re block propagation #860

Merged
merged 12 commits into from
Feb 14, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import static com.google.common.base.Preconditions.checkArgument;

import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.ChainState.EstimatedHeightListener;
import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream;
Expand All @@ -23,6 +24,7 @@
import tech.pegasys.pantheon.ethereum.eth.messages.GetBlockHeadersMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.GetNodeDataMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.GetReceiptsMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.NewBlockMessage;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
Expand Down Expand Up @@ -130,6 +132,15 @@ public ResponseStream send(final MessageData messageData) throws PeerNotConnecte
}
}

public void propagateBlock(final Block block, final UInt256 totalDifficulty) {
final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty);
try {
connection.sendForProtocol(protocolName, newBlockMessage);
} catch (PeerNotConnected e) {
LOG.trace("Failed to broadcast new block to peer", e);
}
}

public ResponseStream getHeadersByHash(
final Hash hash, final int maxHeaders, final int skip, final boolean reverse)
throws PeerNotConnected {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,24 @@ private void onBlockAdded(final BlockAddedEvent blockAddedEvent, final Blockchai
}
}

private void handleNewBlockFromNetwork(final EthMessage message) {
void broadcastBlock(final Block block, final UInt256 difficulty) {
final List<EthPeer> availablePeers =
ethContext.getEthPeers().availablePeers().collect(Collectors.toList());
for (EthPeer ethPeer : availablePeers) {
ethPeer.propagateBlock(block, difficulty);
}
}

void handleNewBlockFromNetwork(final EthMessage message) {
final Blockchain blockchain = protocolContext.getBlockchain();
final NewBlockMessage newBlockMessage = NewBlockMessage.readFrom(message.getData());
try {
final Block block = newBlockMessage.block(protocolSchedule);
final UInt256 totalDifficulty = newBlockMessage.totalDifficulty(protocolSchedule);

// TODO: Extract broadcast functionality to independent class.
broadcastBlock(block, totalDifficulty);

message.getPeer().chainState().updateForAnnouncedBlock(block.getHeader(), totalDifficulty);

// Return early if we don't care about this block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,4 +561,58 @@ public void shouldNotImportBlocksThatAreAlreadyBeingImported() {

verify(ethScheduler, times(1)).scheduleSyncWorkerTask(any(Supplier.class));
}

@SuppressWarnings("unchecked")
@Test
public void broadcastBlockTest() {
BlockchainSetupUtil<Void> blockchainUtil = BlockchainSetupUtil.forTesting();

MutableBlockchain blockchain = spy(blockchainUtil.getBlockchain());
ProtocolSchedule<Void> protocolSchedule = blockchainUtil.getProtocolSchedule();
ProtocolContext<Void> tempProtocolContext = blockchainUtil.getProtocolContext();
ProtocolContext<Void> protocolContext =
new ProtocolContext<>(
blockchain,
tempProtocolContext.getWorldStateArchive(),
tempProtocolContext.getConsensusState());

EthProtocolManager ethProtocolManager =
EthProtocolManagerTestUtil.create(blockchain, blockchainUtil.getWorldArchive());

SynchronizerConfiguration syncConfig =
SynchronizerConfiguration.builder().blockPropagationRange(-3, 5).build();
SyncState syncState = new SyncState(blockchain, ethProtocolManager.ethContext().getEthPeers());
BlockPropagationManager<Void> blockPropagationManager =
spy(
new BlockPropagationManager<>(
syncConfig,
protocolSchedule,
protocolContext,
ethProtocolManager.ethContext(),
syncState,
pendingBlocks,
ethTasksTimer));

blockchainUtil.importFirstBlocks(2);
final Block nextBlock = blockchainUtil.getBlock(2);

blockPropagationManager.start();

// Setup peer and messages
final RespondingEthPeer peer0 = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final RespondingEthPeer peer1 = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final RespondingEthPeer peer2 = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);

final UInt256 totalDifficulty =
fullBlockchain.getTotalDifficultyByHash(nextBlock.getHash()).get();
final NewBlockMessage newBlockMessage = NewBlockMessage.create(nextBlock, totalDifficulty);

// Broadcast message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer0, newBlockMessage);

final Responder responder = RespondingEthPeer.blockchainResponder(fullBlockchain);
peer0.respondWhile(responder, peer0::hasOutstandingRequests);

verify(blockPropagationManager, times(1)).broadcastBlock(any(), any());
}
}