Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PAN-2333] Use only fully validated peers for fast sync pivot selection #21

Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.DaoForkPeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidatorRunner;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
Expand Down Expand Up @@ -245,7 +244,9 @@ public BesuController<C> build() {
}));

final boolean fastSyncEnabled = syncConfig.getSyncMode().equals(SyncMode.FAST);
ethProtocolManager = createEthProtocolManager(protocolContext, fastSyncEnabled);
ethProtocolManager =
createEthProtocolManager(
protocolContext, fastSyncEnabled, createPeerValidators(protocolSchedule));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move validator management into EthProtocolManager so that every EthPeer can track its validation state.

final SyncState syncState =
new SyncState(blockchain, ethProtocolManager.ethContext().getEthPeers());
final Synchronizer synchronizer =
Expand All @@ -262,17 +263,6 @@ public BesuController<C> build() {
clock,
metricsSystem);

final OptionalLong daoBlock =
genesisConfig.getConfigOptions(genesisConfigOverrides).getDaoForkBlock();
if (daoBlock.isPresent()) {
// Setup dao validator
final EthContext ethContext = ethProtocolManager.ethContext();
final DaoForkPeerValidator daoForkPeerValidator =
new DaoForkPeerValidator(
ethContext, protocolSchedule, metricsSystem, daoBlock.getAsLong());
PeerValidatorRunner.runValidator(ethContext, daoForkPeerValidator);
}

final TransactionPool transactionPool =
TransactionPoolFactory.createTransactionPool(
protocolSchedule,
Expand Down Expand Up @@ -356,11 +346,14 @@ protected abstract C createConsensusContext(
Blockchain blockchain, WorldStateArchive worldStateArchive);

protected EthProtocolManager createEthProtocolManager(
final ProtocolContext<C> protocolContext, final boolean fastSyncEnabled) {
final ProtocolContext<C> protocolContext,
final boolean fastSyncEnabled,
final List<PeerValidator> peerValidators) {
return new EthProtocolManager(
protocolContext.getBlockchain(),
protocolContext.getWorldStateArchive(),
networkId,
peerValidators,
fastSyncEnabled,
syncConfig.getDownloaderParallelism(),
syncConfig.getTransactionsParallelism(),
Expand All @@ -369,4 +362,18 @@ protected EthProtocolManager createEthProtocolManager(
metricsSystem,
ethereumWireProtocolConfiguration);
}

protected List<PeerValidator> createPeerValidators(final ProtocolSchedule<C> protocolSchedule) {
final List<PeerValidator> validators = new ArrayList<>();

final OptionalLong daoBlock =
genesisConfig.getConfigOptions(genesisConfigOverrides).getDaoForkBlock();
if (daoBlock.isPresent()) {
// Setup dao validator
validators.add(
new DaoForkPeerValidator(protocolSchedule, metricsSystem, daoBlock.getAsLong()));
}

return validators;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.config.SubProtocolConfiguration;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;

import java.util.List;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -97,12 +100,15 @@ protected void validateContext(final ProtocolContext<IbftContext> context) {

@Override
protected EthProtocolManager createEthProtocolManager(
final ProtocolContext<IbftContext> protocolContext, final boolean fastSyncEnabled) {
final ProtocolContext<IbftContext> protocolContext,
final boolean fastSyncEnabled,
final List<PeerValidator> peerValidators) {
LOG.info("Operating on IBFT-1.0 network.");
return new Istanbul64ProtocolManager(
protocolContext.getBlockchain(),
protocolContext.getWorldStateArchive(),
networkId,
peerValidators,
fastSyncEnabled,
syncConfig.getDownloaderParallelism(),
syncConfig.getTransactionsParallelism(),
Expand Down
28 changes: 22 additions & 6 deletions besu/src/test/java/org/hyperledger/besu/RunnerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.hyperledger.besu.cli.config.EthNetworkConfig;
import org.hyperledger.besu.config.GenesisConfigFile;
import org.hyperledger.besu.config.JsonUtil;
import org.hyperledger.besu.controller.BesuController;
import org.hyperledger.besu.controller.KeyPairUtil;
import org.hyperledger.besu.controller.MainnetBesuControllerBuilder;
Expand Down Expand Up @@ -61,8 +62,10 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
Expand Down Expand Up @@ -110,15 +113,16 @@ public void getFixedNodes() {

@Test
public void fullSyncFromGenesis() throws Exception {
syncFromGenesis(SyncMode.FULL);
syncFromGenesis(SyncMode.FULL, GenesisConfigFile.mainnet());
}

@Test
public void fastSyncFromGenesis() throws Exception {
syncFromGenesis(SyncMode.FAST);
syncFromGenesis(SyncMode.FAST, getFastSyncGenesis());
}

private void syncFromGenesis(final SyncMode mode) throws Exception {
private void syncFromGenesis(final SyncMode mode, final GenesisConfigFile genesisConfig)
throws Exception {
final Path dataDirAhead = temp.newFolder().toPath();
final Path dbAhead = dataDirAhead.resolve("database");
final int blockCount = 500;
Expand All @@ -131,7 +135,7 @@ private void syncFromGenesis(final SyncMode mode) throws Exception {
// Setup state with block data
try (final BesuController<Void> controller =
new MainnetBesuControllerBuilder()
.genesisConfigFile(GenesisConfigFile.mainnet())
.genesisConfigFile(genesisConfig)
.synchronizerConfiguration(syncConfigAhead)
.ethProtocolConfiguration(EthProtocolConfiguration.defaultConfig())
.dataDirectory(dataDirAhead)
Expand All @@ -150,7 +154,7 @@ private void syncFromGenesis(final SyncMode mode) throws Exception {
// Setup Runner with blocks
final BesuController<Void> controllerAhead =
new MainnetBesuControllerBuilder()
.genesisConfigFile(GenesisConfigFile.mainnet())
.genesisConfigFile(genesisConfig)
.synchronizerConfiguration(syncConfigAhead)
.ethProtocolConfiguration(EthProtocolConfiguration.defaultConfig())
.dataDirectory(dataDirAhead)
Expand Down Expand Up @@ -208,7 +212,7 @@ private void syncFromGenesis(final SyncMode mode) throws Exception {
// Setup runner with no block data
final BesuController<Void> controllerBehind =
new MainnetBesuControllerBuilder()
.genesisConfigFile(GenesisConfigFile.mainnet())
.genesisConfigFile(genesisConfig)
.synchronizerConfiguration(syncConfigBehind)
.ethProtocolConfiguration(EthProtocolConfiguration.defaultConfig())
.dataDirectory(dataDirBehind)
Expand Down Expand Up @@ -337,6 +341,18 @@ private void syncFromGenesis(final SyncMode mode) throws Exception {
}
}

private GenesisConfigFile getFastSyncGenesis() {
final ObjectNode jsonNode = GenesisConfigFile.mainnetJsonNode();
final Optional<ObjectNode> configNode = JsonUtil.getObjectNode(jsonNode, "config");
configNode.ifPresent(
(node) -> {
// Clear DAO block so that inability to validate DAO block won't interfere with fast sync
node.remove("daoForkBlock");
node.put("daoForkSupport", false);
});
return GenesisConfigFile.fromConfig(jsonNode);
}

private StorageProvider createKeyValueStorageProvider(final Path dbAhead) {
return new KeyValueStorageProviderBuilder()
.withStorageFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ public static GenesisConfigFile mainnet() {
}
}

public static ObjectNode mainnetJsonNode() {
try {
final String jsonString =
Resources.toString(GenesisConfigFile.class.getResource("/mainnet.json"), UTF_8);
return JsonUtil.objectNodeFromString(jsonString, false);
} catch (final IOException e) {
throw new IllegalStateException(e);
}
}

public static GenesisConfigFile development() {
try {
return fromConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import org.hyperledger.besu.plugin.services.MetricsSystem;
Expand All @@ -32,6 +33,7 @@ public Istanbul64ProtocolManager(
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final BigInteger networkId,
final List<PeerValidator> peerValidators,
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers,
Expand All @@ -43,6 +45,7 @@ public Istanbul64ProtocolManager(
blockchain,
worldStateArchive,
networkId,
peerValidators,
fastSyncEnabled,
syncWorkers,
txWorkers,
Expand All @@ -52,28 +55,6 @@ public Istanbul64ProtocolManager(
ethereumWireProtocolConfiguration);
}

public Istanbul64ProtocolManager(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code

final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final BigInteger networkId,
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers,
final int computationWorkers,
final Clock clock,
final MetricsSystem metricsSystem) {
super(
blockchain,
worldStateArchive,
networkId,
fastSyncEnabled,
syncWorkers,
txWorkers,
computationWorkers,
clock,
metricsSystem);
}

@Override
public List<Capability> getSupportedCapabilities() {
return singletonList(Istanbul64Protocol.ISTANBUL64);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.hyperledger.besu.ethereum.eth.messages.GetBlockHeadersMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetNodeDataMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
Expand All @@ -30,6 +31,7 @@

import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -66,11 +68,13 @@ public class EthPeer {

private final AtomicReference<Consumer<EthPeer>> onStatusesExchanged = new AtomicReference<>();
private final PeerReputation reputation = new PeerReputation();
private final Map<PeerValidator, Boolean> validationStatus = new HashMap<>();

EthPeer(
final PeerConnection connection,
final String protocolName,
final Consumer<EthPeer> onStatusesExchanged,
final List<PeerValidator> peerValidators,
final Clock clock) {
this.connection = connection;
this.protocolName = protocolName;
Expand All @@ -86,6 +90,30 @@ protected boolean removeEldestEntry(final Map.Entry<Hash, Boolean> eldest) {
}));
this.chainHeadState = new ChainState();
this.onStatusesExchanged.set(onStatusesExchanged);
for (PeerValidator peerValidator : peerValidators) {
validationStatus.put(peerValidator, false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Track validation status within each EthPeer.

}
}

public void markValidated(final PeerValidator validator) {
if (!validationStatus.containsKey(validator)) {
throw new IllegalArgumentException("Attempt to update unknown validation status");
}
validationStatus.put(validator, true);
}

/**
* Check if this peer has been fully validated.
*
* @return {@code true} if all peer validation logic has run and successfully validated this peer
*/
public boolean isFullyValidated() {
for (Boolean isValid : validationStatus.values()) {
if (!isValid) {
return false;
}
}
return true;
}

public boolean isDisconnected() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.hyperledger.besu.ethereum.eth.manager;

import org.hyperledger.besu.ethereum.eth.manager.EthPeer.DisconnectCallback;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
Expand All @@ -22,9 +23,11 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -58,9 +61,11 @@ public EthPeers(final String protocolName, final Clock clock, final MetricsSyste
pendingRequests::size);
}

void registerConnection(final PeerConnection peerConnection) {
void registerConnection(
final PeerConnection peerConnection, final List<PeerValidator> peerValidators) {
final EthPeer peer =
new EthPeer(peerConnection, protocolName, this::invokeConnectionCallbacks, clock);
new EthPeer(
peerConnection, protocolName, this::invokeConnectionCallbacks, peerValidators, clock);
connections.putIfAbsent(peerConnection, peer);
}

Expand Down Expand Up @@ -127,7 +132,11 @@ public Optional<EthPeer> bestPeer() {
}

public Optional<EthPeer> bestPeerWithHeightEstimate() {
return streamAvailablePeers().filter(p -> p.chainState().hasEstimatedHeight()).max(BEST_CHAIN);
return bestPeerMatchingCriteria(p -> p.chainState().hasEstimatedHeight());
}

public Optional<EthPeer> bestPeerMatchingCriteria(final Predicate<EthPeer> matchesCriteria) {
return streamAvailablePeers().filter(matchesCriteria::test).max(BEST_CHAIN);
}

@FunctionalInterface
Expand Down
Loading