Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PAN-2333] Use only fully validated peers for fast sync pivot selection #21

Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.DaoForkPeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidatorRunner;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
Expand Down Expand Up @@ -245,7 +244,9 @@ public BesuController<C> build() {
}));

final boolean fastSyncEnabled = syncConfig.getSyncMode().equals(SyncMode.FAST);
ethProtocolManager = createEthProtocolManager(protocolContext, fastSyncEnabled);
ethProtocolManager =
createEthProtocolManager(
protocolContext, fastSyncEnabled, createPeerValidators(protocolSchedule));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move validator management into EthProtocolManager so that every EthPeer can track its validation state.

final SyncState syncState =
new SyncState(blockchain, ethProtocolManager.ethContext().getEthPeers());
final Synchronizer synchronizer =
Expand All @@ -262,17 +263,6 @@ public BesuController<C> build() {
clock,
metricsSystem);

final OptionalLong daoBlock =
genesisConfig.getConfigOptions(genesisConfigOverrides).getDaoForkBlock();
if (daoBlock.isPresent()) {
// Setup dao validator
final EthContext ethContext = ethProtocolManager.ethContext();
final DaoForkPeerValidator daoForkPeerValidator =
new DaoForkPeerValidator(
ethContext, protocolSchedule, metricsSystem, daoBlock.getAsLong());
PeerValidatorRunner.runValidator(ethContext, daoForkPeerValidator);
}

final TransactionPool transactionPool =
TransactionPoolFactory.createTransactionPool(
protocolSchedule,
Expand Down Expand Up @@ -356,11 +346,14 @@ protected abstract C createConsensusContext(
Blockchain blockchain, WorldStateArchive worldStateArchive);

protected EthProtocolManager createEthProtocolManager(
final ProtocolContext<C> protocolContext, final boolean fastSyncEnabled) {
final ProtocolContext<C> protocolContext,
final boolean fastSyncEnabled,
final List<PeerValidator> peerValidators) {
return new EthProtocolManager(
protocolContext.getBlockchain(),
protocolContext.getWorldStateArchive(),
networkId,
peerValidators,
fastSyncEnabled,
syncConfig.getDownloaderParallelism(),
syncConfig.getTransactionsParallelism(),
Expand All @@ -369,4 +362,18 @@ protected EthProtocolManager createEthProtocolManager(
metricsSystem,
ethereumWireProtocolConfiguration);
}

protected List<PeerValidator> createPeerValidators(final ProtocolSchedule<C> protocolSchedule) {
final List<PeerValidator> validators = new ArrayList<>();

final OptionalLong daoBlock =
genesisConfig.getConfigOptions(genesisConfigOverrides).getDaoForkBlock();
if (daoBlock.isPresent()) {
// Setup dao validator
validators.add(
new DaoForkPeerValidator(protocolSchedule, metricsSystem, daoBlock.getAsLong()));
}

return validators;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.config.SubProtocolConfiguration;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;

import java.util.List;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -97,12 +100,15 @@ protected void validateContext(final ProtocolContext<IbftContext> context) {

@Override
protected EthProtocolManager createEthProtocolManager(
final ProtocolContext<IbftContext> protocolContext, final boolean fastSyncEnabled) {
final ProtocolContext<IbftContext> protocolContext,
final boolean fastSyncEnabled,
final List<PeerValidator> peerValidators) {
LOG.info("Operating on IBFT-1.0 network.");
return new Istanbul64ProtocolManager(
protocolContext.getBlockchain(),
protocolContext.getWorldStateArchive(),
networkId,
peerValidators,
fastSyncEnabled,
syncConfig.getDownloaderParallelism(),
syncConfig.getTransactionsParallelism(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import org.hyperledger.besu.plugin.services.MetricsSystem;
Expand All @@ -32,6 +33,7 @@ public Istanbul64ProtocolManager(
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final BigInteger networkId,
final List<PeerValidator> peerValidators,
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers,
Expand All @@ -43,6 +45,7 @@ public Istanbul64ProtocolManager(
blockchain,
worldStateArchive,
networkId,
peerValidators,
fastSyncEnabled,
syncWorkers,
txWorkers,
Expand All @@ -52,28 +55,6 @@ public Istanbul64ProtocolManager(
ethereumWireProtocolConfiguration);
}

public Istanbul64ProtocolManager(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code

final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final BigInteger networkId,
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers,
final int computationWorkers,
final Clock clock,
final MetricsSystem metricsSystem) {
super(
blockchain,
worldStateArchive,
networkId,
fastSyncEnabled,
syncWorkers,
txWorkers,
computationWorkers,
clock,
metricsSystem);
}

@Override
public List<Capability> getSupportedCapabilities() {
return singletonList(Istanbul64Protocol.ISTANBUL64);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.hyperledger.besu.ethereum.eth.messages.GetBlockHeadersMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetNodeDataMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
Expand All @@ -30,6 +31,7 @@

import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -66,11 +68,13 @@ public class EthPeer {

private final AtomicReference<Consumer<EthPeer>> onStatusesExchanged = new AtomicReference<>();
private final PeerReputation reputation = new PeerReputation();
private final Map<PeerValidator, Boolean> validationStatus = new HashMap<>();

EthPeer(
final PeerConnection connection,
final String protocolName,
final Consumer<EthPeer> onStatusesExchanged,
final List<PeerValidator> peerValidators,
final Clock clock) {
this.connection = connection;
this.protocolName = protocolName;
Expand All @@ -86,6 +90,30 @@ protected boolean removeEldestEntry(final Map.Entry<Hash, Boolean> eldest) {
}));
this.chainHeadState = new ChainState();
this.onStatusesExchanged.set(onStatusesExchanged);
for (PeerValidator peerValidator : peerValidators) {
validationStatus.put(peerValidator, false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Track validation status within each EthPeer.

}
}

public void markValidated(final PeerValidator validator) {
if (!validationStatus.containsKey(validator)) {
throw new IllegalArgumentException("Attempt to update unknown validation status");
}
validationStatus.put(validator, true);
}

/**
* Check if this peer has been fully validated.
*
* @return {@code true} if all peer validation logic has run and successfully validated this peer
*/
public boolean isFullyValidated() {
for (Boolean isValid : validationStatus.values()) {
if (!isValid) {
return false;
}
}
return true;
}

public boolean isDisconnected() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.hyperledger.besu.ethereum.eth.manager;

import org.hyperledger.besu.ethereum.eth.manager.EthPeer.DisconnectCallback;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
Expand All @@ -22,9 +23,11 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -58,9 +61,11 @@ public EthPeers(final String protocolName, final Clock clock, final MetricsSyste
pendingRequests::size);
}

void registerConnection(final PeerConnection peerConnection) {
void registerConnection(
final PeerConnection peerConnection, final List<PeerValidator> peerValidators) {
final EthPeer peer =
new EthPeer(peerConnection, protocolName, this::invokeConnectionCallbacks, clock);
new EthPeer(
peerConnection, protocolName, this::invokeConnectionCallbacks, peerValidators, clock);
connections.putIfAbsent(peerConnection, peer);
}

Expand Down Expand Up @@ -127,7 +132,11 @@ public Optional<EthPeer> bestPeer() {
}

public Optional<EthPeer> bestPeerWithHeightEstimate() {
return streamAvailablePeers().filter(p -> p.chainState().hasEstimatedHeight()).max(BEST_CHAIN);
return bestPeerMatchingCriteria(p -> p.chainState().hasEstimatedHeight());
}

public Optional<EthPeer> bestPeerMatchingCriteria(final Predicate<EthPeer> matchesCriteria) {
return streamAvailablePeers().filter(matchesCriteria::test).max(BEST_CHAIN);
}

@FunctionalInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.StatusMessage;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidatorRunner;
import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster;
import org.hyperledger.besu.ethereum.p2p.network.ProtocolManager;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
Expand Down Expand Up @@ -66,17 +68,20 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
private List<Capability> supportedCapabilities;
private final Blockchain blockchain;
private final BlockBroadcaster blockBroadcaster;
private final List<PeerValidator> peerValidators;

public EthProtocolManager(
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final BigInteger networkId,
final List<PeerValidator> peerValidators,
final boolean fastSyncEnabled,
final EthScheduler scheduler,
final EthProtocolConfiguration ethereumWireProtocolConfiguration,
final Clock clock,
final MetricsSystem metricsSystem) {
this.networkId = networkId;
this.peerValidators = peerValidators;
this.scheduler = scheduler;
this.blockchain = blockchain;
this.fastSyncEnabled = fastSyncEnabled;
Expand All @@ -90,6 +95,11 @@ public EthProtocolManager(

this.blockBroadcaster = new BlockBroadcaster(ethContext);

// Run validators
for (final PeerValidator peerValidator : this.peerValidators) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: How fast is the validation ? Is it worth validating in parallel ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The runValidator method just sets up a listener, so I think we should be fine with a plain loop here.

PeerValidatorRunner.runValidator(ethContext, peerValidator);
}

// Set up request handlers
new EthServer(blockchain, worldStateArchive, ethMessages, ethereumWireProtocolConfiguration);
}
Expand All @@ -98,6 +108,7 @@ public EthProtocolManager(
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final BigInteger networkId,
final List<PeerValidator> peerValidators,
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers,
Expand All @@ -108,6 +119,7 @@ public EthProtocolManager(
blockchain,
worldStateArchive,
networkId,
peerValidators,
fastSyncEnabled,
new EthScheduler(syncWorkers, txWorkers, computationWorkers, metricsSystem),
EthProtocolConfiguration.defaultConfig(),
Expand All @@ -119,6 +131,7 @@ public EthProtocolManager(
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final BigInteger networkId,
final List<PeerValidator> peerValidators,
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers,
Expand All @@ -130,6 +143,7 @@ public EthProtocolManager(
blockchain,
worldStateArchive,
networkId,
peerValidators,
fastSyncEnabled,
new EthScheduler(syncWorkers, txWorkers, computationWorkers, metricsSystem),
ethereumWireProtocolConfiguration,
Expand Down Expand Up @@ -212,7 +226,7 @@ public void processMessage(final Capability cap, final Message message) {

@Override
public void handleNewConnection(final PeerConnection connection) {
ethPeers.registerConnection(connection);
ethPeers.registerConnection(connection, peerValidators);
final EthPeer peer = ethPeers.peer(connection);
if (peer.statusHasBeenSentToPeer()) {
return;
Expand Down
Loading