From c4d77df33572baf5ec12c34e70ea1c21c728874f Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Thu, 4 Apr 2019 13:44:32 +1000 Subject: [PATCH] Replace filtering headers after the fact with calculating number to request up-front. --- .../eth/sync/CheckpointHeaderFetcher.java | 48 ++++++-- .../FastSyncDownloadPipelineFactory.java | 3 +- .../eth/sync/CheckpointHeaderFetcherTest.java | 103 ++++++++++++------ .../FastSyncCheckpointFilterTest.java | 63 ----------- 4 files changed, 111 insertions(+), 106 deletions(-) delete mode 100644 ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncCheckpointFilterTest.java diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderFetcher.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderFetcher.java index 6b43f80249..93faccc5f1 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderFetcher.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderFetcher.java @@ -12,6 +12,10 @@ */ package tech.pegasys.pantheon.ethereum.eth.sync; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static java.util.concurrent.CompletableFuture.completedFuture; + import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; @@ -21,48 +25,72 @@ import tech.pegasys.pantheon.metrics.MetricsSystem; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.function.UnaryOperator; public class CheckpointHeaderFetcher { private final SynchronizerConfiguration syncConfig; private final ProtocolSchedule protocolSchedule; private final EthContext ethContext; - private final UnaryOperator> checkpointFilter; + private final Optional lastCheckpointHeader; private final MetricsSystem metricsSystem; public CheckpointHeaderFetcher( final SynchronizerConfiguration syncConfig, final ProtocolSchedule protocolSchedule, final EthContext ethContext, - final UnaryOperator> checkpointFilter, + final Optional lastCheckpointHeader, final MetricsSystem metricsSystem) { this.syncConfig = syncConfig; this.protocolSchedule = protocolSchedule; this.ethContext = ethContext; - this.checkpointFilter = checkpointFilter; + this.lastCheckpointHeader = lastCheckpointHeader; this.metricsSystem = metricsSystem; } public CompletableFuture> getNextCheckpointHeaders( final EthPeer peer, final BlockHeader lastHeader) { final int skip = syncConfig.downloaderChainSegmentSize() - 1; - final int additionalHeaderCount = syncConfig.downloaderHeaderRequestSize(); + final int maximumHeaderRequestSize = syncConfig.downloaderHeaderRequestSize(); + + final int additionalHeaderCount; + if (lastCheckpointHeader.isPresent()) { + final BlockHeader targetHeader = lastCheckpointHeader.get(); + final long blocksUntilTarget = targetHeader.getNumber() - lastHeader.getNumber(); + if (blocksUntilTarget <= 0) { + return completedFuture(emptyList()); + } + final long maxHeadersToRequest = blocksUntilTarget / (skip + 1); + additionalHeaderCount = (int) Math.min(maxHeadersToRequest, maximumHeaderRequestSize); + if (additionalHeaderCount == 0) { + return completedFuture(singletonList(targetHeader)); + } + } else { + additionalHeaderCount = maximumHeaderRequestSize; + } + + return requestHeaders(peer, lastHeader, additionalHeaderCount, skip); + } + + private CompletableFuture> requestHeaders( + final EthPeer peer, + final BlockHeader referenceHeader, + final int headerCount, + final int skip) { return GetHeadersFromPeerByHashTask.startingAtHash( protocolSchedule, ethContext, - lastHeader.getHash(), - lastHeader.getNumber(), + referenceHeader.getHash(), + referenceHeader.getNumber(), // + 1 because lastHeader will be returned as well. - additionalHeaderCount + 1, + headerCount + 1, skip, metricsSystem) .assignPeer(peer) .run() .thenApply(PeerTaskResult::getResult) - .thenApply( - headers -> checkpointFilter.apply(stripExistingCheckpointHeader(lastHeader, headers))); + .thenApply(headers -> stripExistingCheckpointHeader(referenceHeader, headers)); } private List stripExistingCheckpointHeader( diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java index 823f65e1e4..18c3dc5ca1 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java @@ -27,6 +27,7 @@ import tech.pegasys.pantheon.services.pipeline.PipelineBuilder; import java.time.Duration; +import java.util.Optional; public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory { private final SynchronizerConfiguration syncConfig; @@ -60,7 +61,7 @@ public Pipeline createDownloadPipelineForSyncTarget(final SyncTarget target) syncConfig, protocolSchedule, ethContext, - new FastSyncCheckpointFilter(pivotBlockHeader), + Optional.of(pivotBlockHeader), metricsSystem), this::shouldContinueDownloadingFromPeer, ethContext.getScheduler(), diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderFetcherTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderFetcherTest.java index 5276e14c59..d51bfd384f 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderFetcherTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderFetcherTest.java @@ -13,9 +13,9 @@ package tech.pegasys.pantheon.ethereum.eth.sync; import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.when; import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.chain.Blockchain; @@ -31,14 +31,13 @@ import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.function.UnaryOperator; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) @@ -47,9 +46,9 @@ public class CheckpointHeaderFetcherTest { private static ProtocolSchedule protocolSchedule; private static ProtocolContext protocolContext; private static final MetricsSystem metricsSystem = new NoOpMetricsSystem(); - @Mock private UnaryOperator> filter; private EthProtocolManager ethProtocolManager; - private CheckpointHeaderFetcher checkpointHeaderFetcher; + private Responder responder; + private RespondingEthPeer respondingPeer; @BeforeClass public static void setUpClass() { @@ -65,26 +64,15 @@ public void setUpTest() { ethProtocolManager = EthProtocolManagerTestUtil.create( blockchain, protocolContext.getWorldStateArchive(), () -> false); - final EthContext ethContext = ethProtocolManager.ethContext(); - checkpointHeaderFetcher = - new CheckpointHeaderFetcher( - SynchronizerConfiguration.builder() - .downloaderChainSegmentSize(5) - .downloaderHeadersRequestSize(3) - .build(), - protocolSchedule, - ethContext, - filter, - metricsSystem); + responder = + RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive()); + respondingPeer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); } @Test public void shouldRequestHeadersFromPeerAndExcludeExistingHeader() { - when(filter.apply(any())).thenAnswer(invocation -> invocation.getArgument(0)); - final Responder responder = - RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive()); - final RespondingEthPeer respondingPeer = - EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + final CheckpointHeaderFetcher checkpointHeaderFetcher = + createCheckpointHeaderFetcher(Optional.empty()); final CompletableFuture> result = checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1)); @@ -97,23 +85,74 @@ public void shouldRequestHeadersFromPeerAndExcludeExistingHeader() { } @Test - public void shouldApplyFilterToDownloadedCheckpoints() { - final List filteredResult = asList(header(7), header(9)); - final List unfilteredResult = asList(header(6), header(11), header(16)); - when(filter.apply(unfilteredResult)).thenReturn(filteredResult); - final Responder responder = - RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive()); - final RespondingEthPeer respondingPeer = - EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + public void shouldNotRequestHeadersBeyondTargetWhenTargetIsMultipleOfSegmentSize() { + final CheckpointHeaderFetcher checkpointHeaderFetcher = + createCheckpointHeaderFetcher(Optional.of(header(11))); final CompletableFuture> result = checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1)); - assertThat(result).isNotDone(); + respondingPeer.respond(responder); + + assertThat(result).isCompletedWithValue(asList(header(6), header(11))); + } + + @Test + public void shouldNotRequestHeadersBeyondTargetWhenTargetIsNotAMultipleOfSegmentSize() { + final CheckpointHeaderFetcher checkpointHeaderFetcher = + createCheckpointHeaderFetcher(Optional.of(header(15))); + + final CompletableFuture> result = + checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1)); respondingPeer.respond(responder); - assertThat(result).isCompletedWithValue(filteredResult); + assertThat(result).isCompletedWithValue(asList(header(6), header(11))); + } + + @Test + public void shouldReturnOnlyTargetHeaderWhenLastHeaderIsTheCheckpointBeforeTarget() { + final CheckpointHeaderFetcher checkpointHeaderFetcher = + createCheckpointHeaderFetcher(Optional.of(header(15))); + + final CompletableFuture> result = + checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(11)); + + assertThat(result).isCompletedWithValue(singletonList(header(15))); + } + + @Test + public void shouldReturnEmptyListWhenLastHeaderIsTarget() { + final CheckpointHeaderFetcher checkpointHeaderFetcher = + createCheckpointHeaderFetcher(Optional.of(header(15))); + + final CompletableFuture> result = + checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(15)); + assertThat(result).isCompletedWithValue(emptyList()); + } + + @Test + public void shouldReturnEmptyListWhenLastHeaderIsAfterTarget() { + final CheckpointHeaderFetcher checkpointHeaderFetcher = + createCheckpointHeaderFetcher(Optional.of(header(15))); + + final CompletableFuture> result = + checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(16)); + assertThat(result).isCompletedWithValue(emptyList()); + } + + private CheckpointHeaderFetcher createCheckpointHeaderFetcher( + final Optional targetHeader) { + final EthContext ethContext = ethProtocolManager.ethContext(); + return new CheckpointHeaderFetcher( + SynchronizerConfiguration.builder() + .downloaderChainSegmentSize(5) + .downloaderHeadersRequestSize(3) + .build(), + protocolSchedule, + ethContext, + targetHeader, + metricsSystem); } private BlockHeader header(final long blockNumber) { diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncCheckpointFilterTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncCheckpointFilterTest.java deleted file mode 100644 index 7eeb51dc4c..0000000000 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncCheckpointFilterTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; - -import static java.util.Arrays.asList; -import static java.util.Collections.emptyList; -import static org.assertj.core.api.Assertions.assertThat; - -import tech.pegasys.pantheon.ethereum.core.BlockHeader; -import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture; - -import java.util.List; - -import org.junit.Test; - -public class FastSyncCheckpointFilterTest { - private final BlockHeader pivotBlockHeader = header(50); - private final FastSyncCheckpointFilter filter = new FastSyncCheckpointFilter(pivotBlockHeader); - - @Test - public void shouldNotChangeCheckpointsPriorToThePivotBlock() { - final List input = - asList(header(10), header(20), header(30), header(40), header(49)); - - assertThat(filter.apply(input)).isEqualTo(input); - } - - @Test - public void shouldRemoveCheckpointsBeyondPivotBlock() { - final List input = asList(header(40), header(50), header(60), header(70)); - assertThat(filter.apply(input)).containsExactly(header(40), header(50)); - } - - @Test - public void shouldAppendPivotBlockHeaderWhenRemovingCheckpointsIfNotAlreadyPresent() { - final List input = asList(header(45), header(55), header(65)); - assertThat(filter.apply(input)).containsExactly(header(45), header(50)); - } - - @Test - public void shouldReturnOnlyPivotBlockHeaderIfAllBlocksAreAfterPivotBlock() { - assertThat(filter.apply(asList(header(55), header(60)))).containsExactly(pivotBlockHeader); - } - - @Test - public void shouldNotChangeEmptyHeaders() { - assertThat(filter.apply(emptyList())).isEmpty(); - } - - private BlockHeader header(final int number) { - return new BlockHeaderTestFixture().number(number).buildHeader(); - } -}