Skip to content

Commit

Permalink
Request open ended headers from sync target (PegaSysEng#1355)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajsutton authored and notlesh committed May 4, 2019
1 parent 834d1a1 commit d91a01c
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,35 @@
import static java.lang.Math.toIntExact;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;

import java.util.Objects;
import java.util.Optional;

import com.google.common.base.MoreObjects;

public class CheckpointRange {

private final EthPeer syncTarget;
private final BlockHeader start;
private final Optional<BlockHeader> end;

public CheckpointRange(final BlockHeader start) {
public CheckpointRange(final EthPeer syncTarget, final BlockHeader start) {
this.syncTarget = syncTarget;
this.start = start;
this.end = Optional.empty();
}

public CheckpointRange(final BlockHeader start, final BlockHeader end) {
public CheckpointRange(final EthPeer syncTarget, final BlockHeader start, final BlockHeader end) {
this.syncTarget = syncTarget;
this.start = start;
this.end = Optional.of(end);
}

public EthPeer getSyncTarget() {
return syncTarget;
}

public BlockHeader getStart() {
return start;
}
Expand All @@ -60,16 +69,22 @@ public boolean equals(final Object o) {
return false;
}
final CheckpointRange that = (CheckpointRange) o;
return Objects.equals(start, that.start) && Objects.equals(end, that.end);
return Objects.equals(syncTarget, that.syncTarget)
&& Objects.equals(start, that.start)
&& Objects.equals(end, that.end);
}

@Override
public int hashCode() {
return Objects.hash(start, end);
return Objects.hash(syncTarget, start, end);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("start", start).add("end", end).toString();
return MoreObjects.toStringHelper(this)
.add("syncTarget", syncTarget)
.add("start", start)
.add("end", end)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public CheckpointRange next() {
}
if (checkpointFetcher.nextCheckpointEndsAtChainHead(peer, lastRangeEnd)) {
reachedEndOfCheckpoints = true;
return new CheckpointRange(lastRangeEnd);
return new CheckpointRange(peer, lastRangeEnd);
}
pendingCheckpointsRequest = Optional.of(getNextCheckpointHeaders());
return getCheckpointRangeFromPendingRequest();
Expand Down Expand Up @@ -147,7 +147,7 @@ private CheckpointRange getCheckpointRangeFromPendingRequest() {
requestFailureCount = 0;
}
for (final BlockHeader checkpointHeader : newCheckpointHeaders) {
retrievedRanges.add(new CheckpointRange(lastRangeEnd, checkpointHeader));
retrievedRanges.add(new CheckpointRange(peer, lastRangeEnd, checkpointHeader));
lastRangeEnd = checkpointHeader;
}
return retrievedRanges.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ private CompletableFuture<List<BlockHeader>> downloadHeaders(
checkpointRange.getStart().getNumber(),
headerRequestSize,
metricsSystem)
.assignPeer(checkpointRange.getSyncTarget())
.run()
.thenApply(PeerTaskResult::getResult);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
Expand All @@ -44,14 +45,15 @@ public class CheckpointHeaderValidationStepTest {
@Mock private ProtocolContext<Void> protocolContext;
@Mock private BlockHeaderValidator<Void> headerValidator;
@Mock private ValidationPolicy validationPolicy;
@Mock private EthPeer syncTarget;
private final BlockDataGenerator gen = new BlockDataGenerator();
private CheckpointHeaderValidationStep<Void> validationStep;

private final BlockHeader checkpointStart = gen.header(10);
private final BlockHeader firstHeader = gen.header(11);
private final CheckpointRangeHeaders rangeHeaders =
new CheckpointRangeHeaders(
new CheckpointRange(checkpointStart, gen.header(13)),
new CheckpointRange(syncTarget, checkpointStart, gen.header(13)),
asList(firstHeader, gen.header(12), gen.header(13)));

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,18 @@ public void shouldRequestMoreHeadersWhenCurrentSetHasRunOut() {
when(checkpointFetcher.getNextCheckpointHeaders(peer, header(20)))
.thenReturn(completedFuture(asList(header(25), header(30))));

assertThat(source.next()).isEqualTo(new CheckpointRange(commonAncestor, header(15)));
assertThat(source.next()).isEqualTo(new CheckpointRange(peer, commonAncestor, header(15)));
verify(checkpointFetcher).getNextCheckpointHeaders(peer, commonAncestor);
verify(checkpointFetcher).nextCheckpointEndsAtChainHead(peer, commonAncestor);

assertThat(source.next()).isEqualTo(new CheckpointRange(header(15), header(20)));
assertThat(source.next()).isEqualTo(new CheckpointRange(peer, header(15), header(20)));
verifyNoMoreInteractions(checkpointFetcher);

assertThat(source.next()).isEqualTo(new CheckpointRange(header(20), header(25)));
assertThat(source.next()).isEqualTo(new CheckpointRange(peer, header(20), header(25)));
verify(checkpointFetcher).getNextCheckpointHeaders(peer, header(20));
verify(checkpointFetcher).nextCheckpointEndsAtChainHead(peer, header(20));

assertThat(source.next()).isEqualTo(new CheckpointRange(header(25), header(30)));
assertThat(source.next()).isEqualTo(new CheckpointRange(peer, header(25), header(30)));
verifyNoMoreInteractions(checkpointFetcher);
}

Expand All @@ -192,8 +192,8 @@ public void shouldReturnCheckpointsFromExistingBatch() {
when(checkpointFetcher.getNextCheckpointHeaders(peer, commonAncestor))
.thenReturn(completedFuture(asList(header(15), header(20))));

assertThat(source.next()).isEqualTo(new CheckpointRange(commonAncestor, header(15)));
assertThat(source.next()).isEqualTo(new CheckpointRange(header(15), header(20)));
assertThat(source.next()).isEqualTo(new CheckpointRange(peer, commonAncestor, header(15)));
assertThat(source.next()).isEqualTo(new CheckpointRange(peer, header(15), header(20)));
}

@Test
Expand Down Expand Up @@ -226,7 +226,7 @@ public void shouldReturnCheckpointsOnceHeadersRequestCompletes() {
verify(checkpointFetcher).getNextCheckpointHeaders(peer, commonAncestor);

future.complete(asList(header(15), header(20)));
assertThat(source.next()).isEqualTo(new CheckpointRange(commonAncestor, header(15)));
assertThat(source.next()).isEqualTo(new CheckpointRange(peer, commonAncestor, header(15)));
}

@Test
Expand All @@ -240,7 +240,7 @@ public void shouldSendNewRequestIfRequestForHeadersFails() {
verify(checkpointFetcher).getNextCheckpointHeaders(peer, commonAncestor);

// Then retries
assertThat(source.next()).isEqualTo(new CheckpointRange(commonAncestor, header(15)));
assertThat(source.next()).isEqualTo(new CheckpointRange(peer, commonAncestor, header(15)));
verify(checkpointFetcher, times(2)).getNextCheckpointHeaders(peer, commonAncestor);
}

Expand All @@ -251,7 +251,7 @@ public void shouldReturnUnboundedCheckpointRangeWhenNextCheckpointEndsAtChainHea
when(checkpointFetcher.nextCheckpointEndsAtChainHead(peer, commonAncestor)).thenReturn(true);

assertThat(source).hasNext();
assertThat(source.next()).isEqualTo(new CheckpointRange(commonAncestor));
assertThat(source.next()).isEqualTo(new CheckpointRange(peer, commonAncestor));

// Once we've sent an open-ended range we shouldn't have any more ranges.
assertThat(source).isExhausted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
package tech.pegasys.pantheon.ethereum.eth.sync;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.blockchainResponder;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
Expand All @@ -42,6 +44,7 @@ public class DownloadHeadersStepTest {
private static ProtocolContext<Void> protocolContext;
private static MutableBlockchain blockchain;

private final EthPeer syncTarget = mock(EthPeer.class);
private EthProtocolManager ethProtocolManager;
private DownloadHeadersStep<Void> downloader;
private CheckpointRange checkpointRange;
Expand Down Expand Up @@ -70,7 +73,7 @@ public void setUp() {

checkpointRange =
new CheckpointRange(
blockchain.getBlockHeader(1).get(), blockchain.getBlockHeader(10).get());
syncTarget, blockchain.getBlockHeader(1).get(), blockchain.getBlockHeader(10).get());
}

@Test
Expand Down Expand Up @@ -103,7 +106,8 @@ public void shouldCancelRequestToPeerWhenReturnedFutureIsCancelled() {
@Test
public void shouldReturnOnlyEndHeaderWhenCheckpointRangeHasLengthOfOne() {
final CheckpointRange checkpointRange =
new CheckpointRange(blockchain.getBlockHeader(3).get(), blockchain.getBlockHeader(4).get());
new CheckpointRange(
syncTarget, blockchain.getBlockHeader(3).get(), blockchain.getBlockHeader(4).get());

final CompletableFuture<CheckpointRangeHeaders> result = this.downloader.apply(checkpointRange);

Expand All @@ -113,8 +117,9 @@ public void shouldReturnOnlyEndHeaderWhenCheckpointRangeHasLengthOfOne() {

@Test
public void shouldGetRemainingHeadersWhenRangeHasNoEnd() {
final CheckpointRange checkpointRange = new CheckpointRange(blockchain.getBlockHeader(3).get());
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
final CheckpointRange checkpointRange =
new CheckpointRange(peer.getEthPeer(), blockchain.getBlockHeader(3).get());

final CompletableFuture<CheckpointRangeHeaders> result = this.downloader.apply(checkpointRange);

Expand Down

0 comments on commit d91a01c

Please sign in to comment.