Skip to content

Commit

Permalink
Add fix to prevent the node from stopping downloading blocks (hyperle…
Browse files Browse the repository at this point in the history
…dger#2213)

Signed-off-by: Karim TAAM <[email protected]>
  • Loading branch information
matkt authored and eum602 committed Nov 3, 2023
1 parent cd5d2b7 commit 136e10a
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,26 @@
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;

/** Downloads a block from a peer. Will complete exceptionally if block cannot be downloaded. */
public class GetBlockFromPeerTask extends AbstractPeerTask<Block> {
private static final Logger LOG = LogManager.getLogger();

private final ProtocolSchedule protocolSchedule;
private final Hash hash;
private final Optional<Hash> hash;
private final long blockNumber;
private final MetricsSystem metricsSystem;

protected GetBlockFromPeerTask(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash hash,
final Optional<Hash> hash,
final long blockNumber,
final MetricsSystem metricsSystem) {
super(ethContext, metricsSystem);
Expand All @@ -54,17 +56,18 @@ protected GetBlockFromPeerTask(
public static GetBlockFromPeerTask create(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash hash,
final Optional<Hash> hash,
final long blockNumber,
final MetricsSystem metricsSystem) {
return new GetBlockFromPeerTask(protocolSchedule, ethContext, hash, blockNumber, metricsSystem);
}

@Override
protected void executeTask() {
final String blockIdentifier = hash.map(Bytes::toHexString).orElse(Long.toString(blockNumber));
LOG.debug(
"Downloading block {} from peer {}.",
hash,
blockIdentifier,
assignedPeer.map(EthPeer::toString).orElse("<any>"));
downloadHeader()
.thenCompose(this::completeBlock)
Expand All @@ -73,14 +76,15 @@ protected void executeTask() {
if (t != null) {
LOG.info(
"Failed to download block {} from peer {}.",
hash,
blockIdentifier,
assignedPeer.map(EthPeer::toString).orElse("<any>"));
result.completeExceptionally(t);
} else if (r.getResult().isEmpty()) {
LOG.info("Failed to download block {} from peer {}.", hash, r.getPeer());
LOG.info("Failed to download block {} from peer {}.", blockIdentifier, r.getPeer());
result.completeExceptionally(new IncompleteResultsException());
} else {
LOG.debug("Successfully downloaded block {} from peer {}.", hash, r.getPeer());
LOG.debug(
"Successfully downloaded block {} from peer {}.", blockIdentifier, r.getPeer());
result.complete(new PeerTaskResult<>(r.getPeer(), r.getResult().get(0)));
}
});
Expand All @@ -89,9 +93,16 @@ protected void executeTask() {
private CompletableFuture<PeerTaskResult<List<BlockHeader>>> downloadHeader() {
return executeSubTask(
() -> {
final AbstractGetHeadersFromPeerTask task =
GetHeadersFromPeerByHashTask.forSingleHash(
protocolSchedule, ethContext, hash, blockNumber, metricsSystem);
final AbstractGetHeadersFromPeerTask task;
task =
hash.map(
value ->
GetHeadersFromPeerByHashTask.forSingleHash(
protocolSchedule, ethContext, value, blockNumber, metricsSystem))
.orElseGet(
() ->
GetHeadersFromPeerByNumberTask.forSingleNumber(
protocolSchedule, ethContext, blockNumber, metricsSystem));
assignedPeer.ifPresent(task::assignPeer);
return task.run();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.List;
import java.util.Optional;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -34,15 +35,15 @@ public class GetBlockFromPeersTask extends AbstractEthTask<AbstractPeerTask.Peer
private final List<EthPeer> peers;
private final EthContext ethContext;
private final ProtocolSchedule protocolSchedule;
private final Hash hash;
private final Optional<Hash> hash;
private final long blockNumber;
private final MetricsSystem metricsSystem;

protected GetBlockFromPeersTask(
final List<EthPeer> peers,
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash hash,
final Optional<Hash> hash,
final long blockNumber,
final MetricsSystem metricsSystem) {
super(metricsSystem);
Expand All @@ -58,7 +59,7 @@ public static GetBlockFromPeersTask create(
final List<EthPeer> peers,
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash hash,
final Optional<Hash> hash,
final long blockNumber,
final MetricsSystem metricsSystem) {
return new GetBlockFromPeersTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.core.ProcessableBlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthMessage;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
Expand All @@ -45,6 +46,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -143,6 +145,20 @@ private void onBlockAdded(final BlockAddedEvent blockAddedEvent) {
LOG.info("Imported {} pending blocks", r.size());
}
});
} else {
pendingBlocksManager
.lowestAnnouncedBlock()
.map(ProcessableBlockHeader::getNumber)
.ifPresent(
minAnnouncedBlockNumber -> {
long distance =
minAnnouncedBlockNumber
- protocolContext.getBlockchain().getChainHeadBlockNumber();
if (distance < config.getBlockPropagationRange().upperEndpoint()
&& minAnnouncedBlockNumber > newBlock.getHeader().getNumber()) {
retrieveMissingAnnouncedBlock(newBlock.getHeader().getNumber() + 1);
}
});
}

if (blockAddedEvent.getEventType().equals(EventType.HEAD_ADVANCED)) {
Expand Down Expand Up @@ -245,11 +261,28 @@ private void handleNewBlockHashesFromNetwork(final EthMessage message) {
}
}

private CompletableFuture<Block> retrieveMissingAnnouncedBlock(final long blockNumber) {
LOG.debug("Retrieve missing announced block {} from peer", blockNumber);
final List<EthPeer> peers =
ethContext.getEthPeers().streamBestPeers().collect(Collectors.toList());
final GetBlockFromPeersTask getBlockTask =
GetBlockFromPeersTask.create(
peers, protocolSchedule, ethContext, Optional.empty(), blockNumber, metricsSystem);
return getBlockTask
.run()
.thenCompose((r) -> importOrSavePendingBlock(r.getResult(), r.getPeer().nodeId()));
}

private CompletableFuture<Block> processAnnouncedBlock(
final List<EthPeer> peers, final NewBlockHash newBlock) {
final GetBlockFromPeersTask getBlockTask =
GetBlockFromPeersTask.create(
peers, protocolSchedule, ethContext, newBlock.hash(), newBlock.number(), metricsSystem);
peers,
protocolSchedule,
ethContext,
Optional.of(newBlock.hash()),
newBlock.number(),
metricsSystem);
return getBlockTask
.run()
.thenCompose((r) -> importOrSavePendingBlock(r.getResult(), r.getPeer().nodeId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@
import static java.util.Collections.newSetFromMap;

import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.cache.ImmutablePendingBlock;
import org.hyperledger.besu.ethereum.eth.sync.state.cache.PendingBlockCache;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -116,4 +119,11 @@ public List<Block> childrenOf(final Hash parentBlock) {
.map(ImmutablePendingBlock::block)
.collect(Collectors.toList());
}

public Optional<BlockHeader> lowestAnnouncedBlock() {
return pendingBlocks.values().stream()
.map(ImmutablePendingBlock::block)
.map(Block::getHeader)
.min(Comparator.comparing(BlockHeader::getNumber));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private CompletableFuture<List<BlockHeader>> processHeaders(
GetBlockFromPeerTask.create(
protocolSchedule,
ethContext,
child.getHash(),
Optional.of(child.getHash()),
child.getNumber(),
metricsSystem)
.assignPeer(headersResult.getPeer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.hyperledger.besu.ethereum.eth.manager.exceptions.EthTaskException;
import org.hyperledger.besu.util.ExceptionUtils;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -47,7 +48,11 @@ protected Block generateDataToBeRequested() {
@Override
protected EthTask<AbstractPeerTask.PeerTaskResult<Block>> createTask(final Block requestedData) {
return GetBlockFromPeerTask.create(
protocolSchedule, ethContext, requestedData.getHash(), BLOCK_NUMBER, metricsSystem);
protocolSchedule,
ethContext,
Optional.of(requestedData.getHash()),
BLOCK_NUMBER,
metricsSystem);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,4 +245,18 @@ public void shouldReplaceLowestPriorityBlockWhenCacheIsFull() {
}
assertThat(pendingBlocksManager.contains(reorgBlock.getHash())).isTrue();
}

@Test
public void shouldReturnLowestBlockByNumber() {
final BlockDataGenerator gen = new BlockDataGenerator();
final Block parentBlock = gen.block();
final Block childBlock = gen.nextBlock(parentBlock);
final Block childBlock2 = gen.nextBlock(parentBlock);

pendingBlocksManager.registerPendingBlock(parentBlock, NODE_ID_1);
pendingBlocksManager.registerPendingBlock(childBlock, NODE_ID_1);
pendingBlocksManager.registerPendingBlock(childBlock2, NODE_ID_1);

assertThat(pendingBlocksManager.lowestAnnouncedBlock()).contains(parentBlock.getHeader());
}
}

0 comments on commit 136e10a

Please sign in to comment.