Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ask block when not present #2213

Merged
merged 8 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,26 @@
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;

/** Downloads a block from a peer. Will complete exceptionally if block cannot be downloaded. */
public class GetBlockFromPeerTask extends AbstractPeerTask<Block> {
private static final Logger LOG = LogManager.getLogger();

private final ProtocolSchedule protocolSchedule;
private final Hash hash;
private final Optional<Hash> hash;
private final long blockNumber;
private final MetricsSystem metricsSystem;

protected GetBlockFromPeerTask(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash hash,
final Optional<Hash> hash,
final long blockNumber,
final MetricsSystem metricsSystem) {
super(ethContext, metricsSystem);
Expand All @@ -54,17 +56,18 @@ protected GetBlockFromPeerTask(
public static GetBlockFromPeerTask create(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash hash,
final Optional<Hash> hash,
final long blockNumber,
final MetricsSystem metricsSystem) {
return new GetBlockFromPeerTask(protocolSchedule, ethContext, hash, blockNumber, metricsSystem);
}

@Override
protected void executeTask() {
final String blockIdentifier = hash.map(Bytes::toHexString).orElse(Long.toString(blockNumber));
LOG.debug(
"Downloading block {} from peer {}.",
hash,
blockIdentifier,
assignedPeer.map(EthPeer::toString).orElse("<any>"));
downloadHeader()
.thenCompose(this::completeBlock)
Expand All @@ -73,14 +76,15 @@ protected void executeTask() {
if (t != null) {
LOG.info(
"Failed to download block {} from peer {}.",
hash,
blockIdentifier,
assignedPeer.map(EthPeer::toString).orElse("<any>"));
result.completeExceptionally(t);
} else if (r.getResult().isEmpty()) {
LOG.info("Failed to download block {} from peer {}.", hash, r.getPeer());
LOG.info("Failed to download block {} from peer {}.", blockIdentifier, r.getPeer());
result.completeExceptionally(new IncompleteResultsException());
} else {
LOG.debug("Successfully downloaded block {} from peer {}.", hash, r.getPeer());
LOG.debug(
"Successfully downloaded block {} from peer {}.", blockIdentifier, r.getPeer());
result.complete(new PeerTaskResult<>(r.getPeer(), r.getResult().get(0)));
}
});
Expand All @@ -89,9 +93,16 @@ protected void executeTask() {
private CompletableFuture<PeerTaskResult<List<BlockHeader>>> downloadHeader() {
return executeSubTask(
() -> {
final AbstractGetHeadersFromPeerTask task =
GetHeadersFromPeerByHashTask.forSingleHash(
protocolSchedule, ethContext, hash, blockNumber, metricsSystem);
final AbstractGetHeadersFromPeerTask task;
task =
hash.map(
value ->
GetHeadersFromPeerByHashTask.forSingleHash(
protocolSchedule, ethContext, value, blockNumber, metricsSystem))
.orElseGet(
() ->
GetHeadersFromPeerByNumberTask.forSingleNumber(
protocolSchedule, ethContext, blockNumber, metricsSystem));
assignedPeer.ifPresent(task::assignPeer);
return task.run();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.List;
import java.util.Optional;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -34,15 +35,15 @@ public class GetBlockFromPeersTask extends AbstractEthTask<AbstractPeerTask.Peer
private final List<EthPeer> peers;
private final EthContext ethContext;
private final ProtocolSchedule protocolSchedule;
private final Hash hash;
private final Optional<Hash> hash;
private final long blockNumber;
private final MetricsSystem metricsSystem;

protected GetBlockFromPeersTask(
final List<EthPeer> peers,
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash hash,
final Optional<Hash> hash,
final long blockNumber,
final MetricsSystem metricsSystem) {
super(metricsSystem);
Expand All @@ -58,7 +59,7 @@ public static GetBlockFromPeersTask create(
final List<EthPeer> peers,
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash hash,
final Optional<Hash> hash,
final long blockNumber,
final MetricsSystem metricsSystem) {
return new GetBlockFromPeersTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.core.ProcessableBlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthMessage;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
Expand All @@ -45,6 +46,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -143,6 +145,20 @@ private void onBlockAdded(final BlockAddedEvent blockAddedEvent) {
LOG.info("Imported {} pending blocks", r.size());
}
});
} else {
pendingBlocksManager
.lowestAnnouncedBlock()
.map(ProcessableBlockHeader::getNumber)
.ifPresent(
minAnnouncedBlockNumber -> {
long distance =
minAnnouncedBlockNumber
- protocolContext.getBlockchain().getChainHeadBlockNumber();
if (distance < config.getBlockPropagationRange().upperEndpoint()
&& minAnnouncedBlockNumber > newBlock.getHeader().getNumber()) {
retrieveMissingAnnouncedBlock(newBlock.getHeader().getNumber() + 1);
}
RatanRSur marked this conversation as resolved.
Show resolved Hide resolved
});
}

if (blockAddedEvent.getEventType().equals(EventType.HEAD_ADVANCED)) {
Expand Down Expand Up @@ -245,11 +261,29 @@ private void handleNewBlockHashesFromNetwork(final EthMessage message) {
}
}

@SuppressWarnings("unused")
matkt marked this conversation as resolved.
Show resolved Hide resolved
private CompletableFuture<Block> retrieveMissingAnnouncedBlock(final long blockNumber) {
LOG.info("retrieveMissingAnnouncedBlock " + blockNumber);
final List<EthPeer> peers =
ethContext.getEthPeers().streamBestPeers().collect(Collectors.toList());
final GetBlockFromPeersTask getBlockTask =
GetBlockFromPeersTask.create(
peers, protocolSchedule, ethContext, Optional.empty(), blockNumber, metricsSystem);
return getBlockTask
.run()
.thenCompose((r) -> importOrSavePendingBlock(r.getResult(), r.getPeer().nodeId()));
}

private CompletableFuture<Block> processAnnouncedBlock(
final List<EthPeer> peers, final NewBlockHash newBlock) {
final GetBlockFromPeersTask getBlockTask =
GetBlockFromPeersTask.create(
peers, protocolSchedule, ethContext, newBlock.hash(), newBlock.number(), metricsSystem);
peers,
protocolSchedule,
ethContext,
Optional.of(newBlock.hash()),
newBlock.number(),
metricsSystem);
return getBlockTask
.run()
.thenCompose((r) -> importOrSavePendingBlock(r.getResult(), r.getPeer().nodeId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@
import static java.util.Collections.newSetFromMap;

import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.cache.ImmutablePendingBlock;
import org.hyperledger.besu.ethereum.eth.sync.state.cache.PendingBlockCache;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -116,4 +119,11 @@ public List<Block> childrenOf(final Hash parentBlock) {
.map(ImmutablePendingBlock::block)
.collect(Collectors.toList());
}

public Optional<BlockHeader> lowestAnnouncedBlock() {
return pendingBlocks.values().stream()
.map(ImmutablePendingBlock::block)
.map(Block::getHeader)
.min(Comparator.comparing(BlockHeader::getNumber));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private CompletableFuture<List<BlockHeader>> processHeaders(
GetBlockFromPeerTask.create(
protocolSchedule,
ethContext,
child.getHash(),
Optional.of(child.getHash()),
child.getNumber(),
metricsSystem)
.assignPeer(headersResult.getPeer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.hyperledger.besu.ethereum.eth.manager.exceptions.EthTaskException;
import org.hyperledger.besu.util.ExceptionUtils;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -47,7 +48,11 @@ protected Block generateDataToBeRequested() {
@Override
protected EthTask<AbstractPeerTask.PeerTaskResult<Block>> createTask(final Block requestedData) {
return GetBlockFromPeerTask.create(
protocolSchedule, ethContext, requestedData.getHash(), BLOCK_NUMBER, metricsSystem);
protocolSchedule,
ethContext,
Optional.of(requestedData.getHash()),
BLOCK_NUMBER,
metricsSystem);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,4 +245,18 @@ public void shouldReplaceLowestPriorityBlockWhenCacheIsFull() {
}
assertThat(pendingBlocksManager.contains(reorgBlock.getHash())).isTrue();
}

@Test
public void shouldReturnLowestBlockByNumber() {
final BlockDataGenerator gen = new BlockDataGenerator();
final Block parentBlock = gen.block();
final Block childBlock = gen.nextBlock(parentBlock);
final Block childBlock2 = gen.nextBlock(parentBlock);

pendingBlocksManager.registerPendingBlock(parentBlock, NODE_ID_1);
pendingBlocksManager.registerPendingBlock(childBlock, NODE_ID_1);
pendingBlocksManager.registerPendingBlock(childBlock2, NODE_ID_1);

assertThat(pendingBlocksManager.lowestAnnouncedBlock()).contains(parentBlock.getHeader());
}
}