Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Replace filtering headers after the fact with calculating number to request up-front #1216

Merged
merged 3 commits into from
Apr 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.concurrent.CompletableFuture.completedFuture;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
Expand All @@ -21,48 +25,72 @@
import tech.pegasys.pantheon.metrics.MetricsSystem;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.UnaryOperator;

public class CheckpointHeaderFetcher {

private final SynchronizerConfiguration syncConfig;
private final ProtocolSchedule<?> protocolSchedule;
private final EthContext ethContext;
private final UnaryOperator<List<BlockHeader>> checkpointFilter;
private final Optional<BlockHeader> lastCheckpointHeader;
private final MetricsSystem metricsSystem;

public CheckpointHeaderFetcher(
final SynchronizerConfiguration syncConfig,
final ProtocolSchedule<?> protocolSchedule,
final EthContext ethContext,
final UnaryOperator<List<BlockHeader>> checkpointFilter,
final Optional<BlockHeader> lastCheckpointHeader,
final MetricsSystem metricsSystem) {
this.syncConfig = syncConfig;
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.checkpointFilter = checkpointFilter;
this.lastCheckpointHeader = lastCheckpointHeader;
this.metricsSystem = metricsSystem;
}

public CompletableFuture<List<BlockHeader>> getNextCheckpointHeaders(
final EthPeer peer, final BlockHeader lastHeader) {
final int skip = syncConfig.downloaderChainSegmentSize() - 1;
final int additionalHeaderCount = syncConfig.downloaderHeaderRequestSize();
final int maximumHeaderRequestSize = syncConfig.downloaderHeaderRequestSize();

final int additionalHeaderCount;
if (lastCheckpointHeader.isPresent()) {
final BlockHeader targetHeader = lastCheckpointHeader.get();
final long blocksUntilTarget = targetHeader.getNumber() - lastHeader.getNumber();
if (blocksUntilTarget <= 0) {
return completedFuture(emptyList());
}
final long maxHeadersToRequest = blocksUntilTarget / (skip + 1);
additionalHeaderCount = (int) Math.min(maxHeadersToRequest, maximumHeaderRequestSize);
if (additionalHeaderCount == 0) {
return completedFuture(singletonList(targetHeader));
}
} else {
additionalHeaderCount = maximumHeaderRequestSize;
}

return requestHeaders(peer, lastHeader, additionalHeaderCount, skip);
}

private CompletableFuture<List<BlockHeader>> requestHeaders(
final EthPeer peer,
final BlockHeader referenceHeader,
final int headerCount,
final int skip) {
return GetHeadersFromPeerByHashTask.startingAtHash(
protocolSchedule,
ethContext,
lastHeader.getHash(),
lastHeader.getNumber(),
referenceHeader.getHash(),
referenceHeader.getNumber(),
// + 1 because lastHeader will be returned as well.
additionalHeaderCount + 1,
headerCount + 1,
skip,
metricsSystem)
.assignPeer(peer)
.run()
.thenApply(PeerTaskResult::getResult)
.thenApply(
headers -> checkpointFilter.apply(stripExistingCheckpointHeader(lastHeader, headers)));
.thenApply(headers -> stripExistingCheckpointHeader(referenceHeader, headers));
}

private List<BlockHeader> stripExistingCheckpointHeader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import tech.pegasys.pantheon.services.pipeline.PipelineBuilder;

import java.time.Duration;
import java.util.Optional;

public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory {
private final SynchronizerConfiguration syncConfig;
Expand Down Expand Up @@ -60,7 +61,7 @@ public Pipeline<?> createDownloadPipelineForSyncTarget(final SyncTarget target)
syncConfig,
protocolSchedule,
ethContext,
new FastSyncCheckpointFilter(pivotBlockHeader),
Optional.of(pivotBlockHeader),
metricsSystem),
this::shouldContinueDownloadingFromPeer,
ethContext.getScheduler(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
package tech.pegasys.pantheon.ethereum.eth.sync;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
Expand All @@ -31,14 +31,13 @@
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.UnaryOperator;

import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
Expand All @@ -47,9 +46,9 @@ public class CheckpointHeaderFetcherTest {
private static ProtocolSchedule<Void> protocolSchedule;
private static ProtocolContext<Void> protocolContext;
private static final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@Mock private UnaryOperator<List<BlockHeader>> filter;
private EthProtocolManager ethProtocolManager;
private CheckpointHeaderFetcher checkpointHeaderFetcher;
private Responder responder;
private RespondingEthPeer respondingPeer;

@BeforeClass
public static void setUpClass() {
Expand All @@ -65,26 +64,15 @@ public void setUpTest() {
ethProtocolManager =
EthProtocolManagerTestUtil.create(
blockchain, protocolContext.getWorldStateArchive(), () -> false);
final EthContext ethContext = ethProtocolManager.ethContext();
checkpointHeaderFetcher =
new CheckpointHeaderFetcher(
SynchronizerConfiguration.builder()
.downloaderChainSegmentSize(5)
.downloaderHeadersRequestSize(3)
.build(),
protocolSchedule,
ethContext,
filter,
metricsSystem);
responder =
RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive());
respondingPeer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
}

@Test
public void shouldRequestHeadersFromPeerAndExcludeExistingHeader() {
when(filter.apply(any())).thenAnswer(invocation -> invocation.getArgument(0));
final Responder responder =
RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive());
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.empty());

final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1));
Expand All @@ -97,23 +85,74 @@ public void shouldRequestHeadersFromPeerAndExcludeExistingHeader() {
}

@Test
public void shouldApplyFilterToDownloadedCheckpoints() {
final List<BlockHeader> filteredResult = asList(header(7), header(9));
final List<BlockHeader> unfilteredResult = asList(header(6), header(11), header(16));
when(filter.apply(unfilteredResult)).thenReturn(filteredResult);
final Responder responder =
RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive());
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
public void shouldNotRequestHeadersBeyondTargetWhenTargetIsMultipleOfSegmentSize() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.of(header(11)));

final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1));

assertThat(result).isNotDone();
respondingPeer.respond(responder);

assertThat(result).isCompletedWithValue(asList(header(6), header(11)));
}

@Test
public void shouldNotRequestHeadersBeyondTargetWhenTargetIsNotAMultipleOfSegmentSize() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.of(header(15)));

final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1));

respondingPeer.respond(responder);

assertThat(result).isCompletedWithValue(filteredResult);
assertThat(result).isCompletedWithValue(asList(header(6), header(11)));
}

@Test
public void shouldReturnOnlyTargetHeaderWhenLastHeaderIsTheCheckpointBeforeTarget() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.of(header(15)));

final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(11));

assertThat(result).isCompletedWithValue(singletonList(header(15)));
}

@Test
public void shouldReturnEmptyListWhenLastHeaderIsTarget() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.of(header(15)));

final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(15));
assertThat(result).isCompletedWithValue(emptyList());
}

@Test
public void shouldReturnEmptyListWhenLastHeaderIsAfterTarget() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.of(header(15)));

final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(16));
assertThat(result).isCompletedWithValue(emptyList());
}

private CheckpointHeaderFetcher createCheckpointHeaderFetcher(
final Optional<BlockHeader> targetHeader) {
final EthContext ethContext = ethProtocolManager.ethContext();
return new CheckpointHeaderFetcher(
SynchronizerConfiguration.builder()
.downloaderChainSegmentSize(5)
.downloaderHeadersRequestSize(3)
.build(),
protocolSchedule,
ethContext,
targetHeader,
metricsSystem);
}

private BlockHeader header(final long blockNumber) {
Expand Down

This file was deleted.