diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBlockFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBlockFromPeerTask.java index 99fc3e8a014..84bb864e857 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBlockFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBlockFromPeerTask.java @@ -24,24 +24,26 @@ import org.hyperledger.besu.plugin.services.MetricsSystem; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.tuweni.bytes.Bytes; /** Downloads a block from a peer. Will complete exceptionally if block cannot be downloaded. */ public class GetBlockFromPeerTask extends AbstractPeerTask { private static final Logger LOG = LogManager.getLogger(); private final ProtocolSchedule protocolSchedule; - private final Hash hash; + private final Optional hash; private final long blockNumber; private final MetricsSystem metricsSystem; protected GetBlockFromPeerTask( final ProtocolSchedule protocolSchedule, final EthContext ethContext, - final Hash hash, + final Optional hash, final long blockNumber, final MetricsSystem metricsSystem) { super(ethContext, metricsSystem); @@ -54,7 +56,7 @@ protected GetBlockFromPeerTask( public static GetBlockFromPeerTask create( final ProtocolSchedule protocolSchedule, final EthContext ethContext, - final Hash hash, + final Optional hash, final long blockNumber, final MetricsSystem metricsSystem) { return new GetBlockFromPeerTask(protocolSchedule, ethContext, hash, blockNumber, metricsSystem); @@ -62,9 +64,10 @@ public static GetBlockFromPeerTask create( @Override protected void executeTask() { + final String blockIdentifier = hash.map(Bytes::toHexString).orElse(Long.toString(blockNumber)); LOG.debug( "Downloading block {} from peer {}.", - hash, + blockIdentifier, assignedPeer.map(EthPeer::toString).orElse("")); downloadHeader() .thenCompose(this::completeBlock) @@ -73,14 +76,15 @@ protected void executeTask() { if (t != null) { LOG.info( "Failed to download block {} from peer {}.", - hash, + blockIdentifier, assignedPeer.map(EthPeer::toString).orElse("")); result.completeExceptionally(t); } else if (r.getResult().isEmpty()) { - LOG.info("Failed to download block {} from peer {}.", hash, r.getPeer()); + LOG.info("Failed to download block {} from peer {}.", blockIdentifier, r.getPeer()); result.completeExceptionally(new IncompleteResultsException()); } else { - LOG.debug("Successfully downloaded block {} from peer {}.", hash, r.getPeer()); + LOG.debug( + "Successfully downloaded block {} from peer {}.", blockIdentifier, r.getPeer()); result.complete(new PeerTaskResult<>(r.getPeer(), r.getResult().get(0))); } }); @@ -89,9 +93,16 @@ protected void executeTask() { private CompletableFuture>> downloadHeader() { return executeSubTask( () -> { - final AbstractGetHeadersFromPeerTask task = - GetHeadersFromPeerByHashTask.forSingleHash( - protocolSchedule, ethContext, hash, blockNumber, metricsSystem); + final AbstractGetHeadersFromPeerTask task; + task = + hash.map( + value -> + GetHeadersFromPeerByHashTask.forSingleHash( + protocolSchedule, ethContext, value, blockNumber, metricsSystem)) + .orElseGet( + () -> + GetHeadersFromPeerByNumberTask.forSingleNumber( + protocolSchedule, ethContext, blockNumber, metricsSystem)); assignedPeer.ifPresent(task::assignPeer); return task.run(); }); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBlockFromPeersTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBlockFromPeersTask.java index 8035e4b0c48..9fb682f23c2 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBlockFromPeersTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBlockFromPeersTask.java @@ -23,6 +23,7 @@ import org.hyperledger.besu.plugin.services.MetricsSystem; import java.util.List; +import java.util.Optional; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -34,7 +35,7 @@ public class GetBlockFromPeersTask extends AbstractEthTask peers; private final EthContext ethContext; private final ProtocolSchedule protocolSchedule; - private final Hash hash; + private final Optional hash; private final long blockNumber; private final MetricsSystem metricsSystem; @@ -42,7 +43,7 @@ protected GetBlockFromPeersTask( final List peers, final ProtocolSchedule protocolSchedule, final EthContext ethContext, - final Hash hash, + final Optional hash, final long blockNumber, final MetricsSystem metricsSystem) { super(metricsSystem); @@ -58,7 +59,7 @@ public static GetBlockFromPeersTask create( final List peers, final ProtocolSchedule protocolSchedule, final EthContext ethContext, - final Hash hash, + final Optional hash, final long blockNumber, final MetricsSystem metricsSystem) { return new GetBlockFromPeersTask( diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java index 3b2fb52dae5..500327139f6 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java @@ -23,6 +23,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.Difficulty; import org.hyperledger.besu.ethereum.core.Hash; +import org.hyperledger.besu.ethereum.core.ProcessableBlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthMessage; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; @@ -45,6 +46,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -143,6 +145,20 @@ private void onBlockAdded(final BlockAddedEvent blockAddedEvent) { LOG.info("Imported {} pending blocks", r.size()); } }); + } else { + pendingBlocksManager + .lowestAnnouncedBlock() + .map(ProcessableBlockHeader::getNumber) + .ifPresent( + minAnnouncedBlockNumber -> { + long distance = + minAnnouncedBlockNumber + - protocolContext.getBlockchain().getChainHeadBlockNumber(); + if (distance < config.getBlockPropagationRange().upperEndpoint() + && minAnnouncedBlockNumber > newBlock.getHeader().getNumber()) { + retrieveMissingAnnouncedBlock(newBlock.getHeader().getNumber() + 1); + } + }); } if (blockAddedEvent.getEventType().equals(EventType.HEAD_ADVANCED)) { @@ -245,11 +261,28 @@ private void handleNewBlockHashesFromNetwork(final EthMessage message) { } } + private CompletableFuture retrieveMissingAnnouncedBlock(final long blockNumber) { + LOG.debug("Retrieve missing announced block {} from peer", blockNumber); + final List peers = + ethContext.getEthPeers().streamBestPeers().collect(Collectors.toList()); + final GetBlockFromPeersTask getBlockTask = + GetBlockFromPeersTask.create( + peers, protocolSchedule, ethContext, Optional.empty(), blockNumber, metricsSystem); + return getBlockTask + .run() + .thenCompose((r) -> importOrSavePendingBlock(r.getResult(), r.getPeer().nodeId())); + } + private CompletableFuture processAnnouncedBlock( final List peers, final NewBlockHash newBlock) { final GetBlockFromPeersTask getBlockTask = GetBlockFromPeersTask.create( - peers, protocolSchedule, ethContext, newBlock.hash(), newBlock.number(), metricsSystem); + peers, + protocolSchedule, + ethContext, + Optional.of(newBlock.hash()), + newBlock.number(), + metricsSystem); return getBlockTask .run() .thenCompose((r) -> importOrSavePendingBlock(r.getResult(), r.getPeer().nodeId())); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksManager.java index 84fb7274791..62af29eb256 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksManager.java @@ -17,15 +17,18 @@ import static java.util.Collections.newSetFromMap; import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.Hash; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.state.cache.ImmutablePendingBlock; import org.hyperledger.besu.ethereum.eth.sync.state.cache.PendingBlockCache; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -116,4 +119,11 @@ public List childrenOf(final Hash parentBlock) { .map(ImmutablePendingBlock::block) .collect(Collectors.toList()); } + + public Optional lowestAnnouncedBlock() { + return pendingBlocks.values().stream() + .map(ImmutablePendingBlock::block) + .map(Block::getHeader) + .min(Comparator.comparing(BlockHeader::getNumber)); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java index 13948a0e1a1..04f4c1cc4d8 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java @@ -212,7 +212,7 @@ private CompletableFuture> processHeaders( GetBlockFromPeerTask.create( protocolSchedule, ethContext, - child.getHash(), + Optional.of(child.getHash()), child.getNumber(), metricsSystem) .assignPeer(headersResult.getPeer()); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBlockFromPeerTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBlockFromPeerTaskTest.java index 7f0d84fc760..7cdcd3c7071 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBlockFromPeerTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBlockFromPeerTaskTest.java @@ -26,6 +26,7 @@ import org.hyperledger.besu.ethereum.eth.manager.exceptions.EthTaskException; import org.hyperledger.besu.util.ExceptionUtils; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; @@ -47,7 +48,11 @@ protected Block generateDataToBeRequested() { @Override protected EthTask> createTask(final Block requestedData) { return GetBlockFromPeerTask.create( - protocolSchedule, ethContext, requestedData.getHash(), BLOCK_NUMBER, metricsSystem); + protocolSchedule, + ethContext, + Optional.of(requestedData.getHash()), + BLOCK_NUMBER, + metricsSystem); } @Override diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksManagerTest.java index 18e1315d56a..8c867c7d254 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/PendingBlocksManagerTest.java @@ -245,4 +245,18 @@ public void shouldReplaceLowestPriorityBlockWhenCacheIsFull() { } assertThat(pendingBlocksManager.contains(reorgBlock.getHash())).isTrue(); } + + @Test + public void shouldReturnLowestBlockByNumber() { + final BlockDataGenerator gen = new BlockDataGenerator(); + final Block parentBlock = gen.block(); + final Block childBlock = gen.nextBlock(parentBlock); + final Block childBlock2 = gen.nextBlock(parentBlock); + + pendingBlocksManager.registerPendingBlock(parentBlock, NODE_ID_1); + pendingBlocksManager.registerPendingBlock(childBlock, NODE_ID_1); + pendingBlocksManager.registerPendingBlock(childBlock2, NODE_ID_1); + + assertThat(pendingBlocksManager.lowestAnnouncedBlock()).contains(parentBlock.getHeader()); + } }