diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/BlockMetadata.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/BlockMetadata.java
deleted file mode 100644
index 026ad5cf79..0000000000
--- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/BlockMetadata.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright 2018 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package tech.pegasys.pantheon.ethereum.core;
-
-import tech.pegasys.pantheon.ethereum.rlp.RLP;
-import tech.pegasys.pantheon.ethereum.rlp.RLPException;
-import tech.pegasys.pantheon.ethereum.rlp.RLPInput;
-import tech.pegasys.pantheon.ethereum.rlp.RLPOutput;
-import tech.pegasys.pantheon.util.bytes.BytesValue;
-import tech.pegasys.pantheon.util.uint.UInt256;
-
-public class BlockMetadata {
- private static final BlockMetadata EMPTY = new BlockMetadata(null);
- private final UInt256 totalDifficulty;
-
- public BlockMetadata(final UInt256 totalDifficulty) {
- this.totalDifficulty = totalDifficulty;
- }
-
- public static BlockMetadata empty() {
- return EMPTY;
- }
-
- public static BlockMetadata fromRlp(final BytesValue bytes) {
- return readFrom(RLP.input(bytes));
- }
-
- public static BlockMetadata readFrom(final RLPInput in) throws RLPException {
- in.enterList();
-
- final UInt256 totalDifficulty = in.readUInt256Scalar();
-
- in.leaveList();
-
- return new BlockMetadata(totalDifficulty);
- }
-
- public UInt256 getTotalDifficulty() {
- return totalDifficulty;
- }
-
- public BytesValue toRlp() {
- return RLP.encode(this::writeTo);
- }
-
- public void writeTo(final RLPOutput out) {
- out.startList();
-
- out.writeUInt256Scalar(totalDifficulty);
-
- out.endList();
- }
-}
diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/TransactionBuilder.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/TransactionBuilder.java
deleted file mode 100644
index 888327e7cc..0000000000
--- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/TransactionBuilder.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright 2018 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package tech.pegasys.pantheon.ethereum.core;
-
-import tech.pegasys.pantheon.crypto.SECP256K1;
-import tech.pegasys.pantheon.crypto.SECP256K1.Signature;
-import tech.pegasys.pantheon.ethereum.rlp.RLPInput;
-import tech.pegasys.pantheon.util.bytes.BytesValue;
-
-/** Convenience object for building {@link Transaction}s. */
-public interface TransactionBuilder {
-
- /** @return A {@link Transaction} populated with the accumulated state. */
- Transaction build();
-
- /**
- * Constructs a {@link SECP256K1.Signature} based on the accumulated state and then builds a
- * corresponding {@link Transaction}.
- *
- * @param keys The keys to construct the transaction signature with.
- * @return A {@link Transaction} populated with the accumulated state.
- */
- Transaction signAndBuild(SECP256K1.KeyPair keys);
-
- /**
- * Populates the {@link TransactionBuilder} based on the RLP-encoded transaction and builds a
- * {@link Transaction}.
- *
- *
Note: the fields from the RLP-transaction will be extracted and replace any previously
- * populated fields.
- *
- * @param in The RLP-encoded transaction.
- * @return The updated {@link TransactionBuilder}.
- */
- TransactionBuilder populateFrom(RLPInput in);
-
- /**
- * Sets the chain id for the {@link Transaction}.
- *
- * @param chainId The chain id.
- * @return The updated {@link TransactionBuilder}.
- */
- TransactionBuilder chainId(int chainId);
-
- /**
- * Sets the gas limit for the {@link Transaction}.
- *
- * @param gasLimit The gas limit.
- * @return The updated {@link TransactionBuilder}.
- */
- TransactionBuilder gasLimit(long gasLimit);
-
- /**
- * Sets the gas price for the {@link Transaction}.
- *
- * @param gasPrice The gas price.
- * @return The updated {@link TransactionBuilder}.
- */
- TransactionBuilder gasPrice(Wei gasPrice);
-
- /**
- * Sets the nonce for the {@link Transaction}.
- *
- * @param nonce The nonce.
- * @return The updated {@link TransactionBuilder}.
- */
- TransactionBuilder nonce(long nonce);
-
- /**
- * Sets the payload for the {@link Transaction}.
- *
- * @param payload The payload.
- * @return The updated {@link TransactionBuilder}.
- */
- TransactionBuilder payload(BytesValue payload);
-
- /**
- * Sets the sender of the {@link Transaction}.
- *
- * @param sender The sender.
- * @return The updated {@link TransactionBuilder}.
- */
- TransactionBuilder sender(Address sender);
-
- /**
- * Sets the signature of the {@link Transaction}.
- *
- * @param signature The signature.
- * @return The updated {@link TransactionBuilder}.
- */
- TransactionBuilder signature(Signature signature);
-
- /**
- * Sets the recipient of the {@link Transaction}.
- *
- * @param to The recipent (can be null).
- * @return The updated {@link TransactionBuilder}.
- */
- TransactionBuilder to(Address to);
-
- /**
- * Sets the {@link Wei} transfer value of the {@link Transaction}.
- *
- * @param value The transfer value.
- * @return The updated {@link TransactionBuilder}.
- */
- TransactionBuilder value(Wei value);
-}
diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Wei.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Wei.java
index 3df4f9e164..c2b5e6b392 100644
--- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Wei.java
+++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Wei.java
@@ -64,10 +64,6 @@ public static Wei fromEth(final long eth) {
return Wei.of(BigInteger.valueOf(eth).multiply(BigInteger.TEN.pow(18)));
}
- public static Counter newCounter() {
- return new WeiCounter();
- }
-
private static class WeiCounter extends Counter {
private WeiCounter() {
super(Wei::new);
diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/mainnet/MainnetContractCreationProcessor.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/mainnet/MainnetContractCreationProcessor.java
index 517fef1ad6..12987dadd8 100644
--- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/mainnet/MainnetContractCreationProcessor.java
+++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/mainnet/MainnetContractCreationProcessor.java
@@ -75,10 +75,6 @@ private static boolean accountExists(final Account account) {
return account.getNonce() > 0 || !account.getCode().isEmpty();
}
- protected GasCalculator gasCalculator() {
- return gasCalculator;
- }
-
@Override
public void start(final MessageFrame frame) {
if (LOG.isTraceEnabled()) {
diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/mainnet/MainnetTransactionValidator.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/mainnet/MainnetTransactionValidator.java
index 4c6f608937..d982562e61 100644
--- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/mainnet/MainnetTransactionValidator.java
+++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/mainnet/MainnetTransactionValidator.java
@@ -38,10 +38,6 @@
*/
public class MainnetTransactionValidator implements TransactionValidator {
- public static MainnetTransactionValidator create() {
- return new MainnetTransactionValidator(new FrontierGasCalculator(), false, Optional.empty());
- }
-
private final GasCalculator gasCalculator;
private final boolean disallowSignatureMalleability;
diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/EthereumWireProtocolConfiguration.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/EthereumWireProtocolConfiguration.java
index 822cbea274..dd6aa1ef5f 100644
--- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/EthereumWireProtocolConfiguration.java
+++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/EthereumWireProtocolConfiguration.java
@@ -14,6 +14,9 @@
import tech.pegasys.pantheon.util.number.PositiveNumber;
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
import picocli.CommandLine;
public class EthereumWireProtocolConfiguration {
@@ -68,27 +71,33 @@ public int getMaxGetNodeData() {
}
@Override
- public boolean equals(final Object obj) {
- if (!(obj instanceof EthereumWireProtocolConfiguration)) {
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
return false;
}
- EthereumWireProtocolConfiguration other = ((EthereumWireProtocolConfiguration) obj);
- return maxGetBlockHeaders == other.maxGetBlockHeaders
- && maxGetBlockBodies == other.maxGetBlockBodies
- && maxGetReceipts == other.maxGetReceipts
- && maxGetNodeData == other.maxGetNodeData;
+ final EthereumWireProtocolConfiguration that = (EthereumWireProtocolConfiguration) o;
+ return maxGetBlockHeaders == that.maxGetBlockHeaders
+ && maxGetBlockBodies == that.maxGetBlockBodies
+ && maxGetReceipts == that.maxGetReceipts
+ && maxGetNodeData == that.maxGetNodeData;
}
@Override
public int hashCode() {
- return super.hashCode();
+ return Objects.hash(maxGetBlockHeaders, maxGetBlockBodies, maxGetReceipts, maxGetNodeData);
}
@Override
public String toString() {
- return String.format(
- "maxGetBlockHeaders=%s\tmaxGetBlockBodies=%s\tmaxGetReceipts=%s\tmaxGetReceipts=%s",
- maxGetBlockHeaders, maxGetBlockBodies, maxGetReceipts, maxGetNodeData);
+ return MoreObjects.toStringHelper(this)
+ .add("maxGetBlockHeaders", maxGetBlockHeaders)
+ .add("maxGetBlockBodies", maxGetBlockBodies)
+ .add("maxGetReceipts", maxGetReceipts)
+ .add("maxGetNodeData", maxGetNodeData)
+ .toString();
}
public static class Builder {
diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthContext.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthContext.java
index fb67ae6e42..644ea94001 100644
--- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthContext.java
+++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthContext.java
@@ -14,26 +14,17 @@
public class EthContext {
- private final String protocolName;
private final EthPeers ethPeers;
private final EthMessages ethMessages;
private final EthScheduler scheduler;
public EthContext(
- final String protocolName,
- final EthPeers ethPeers,
- final EthMessages ethMessages,
- final EthScheduler scheduler) {
- this.protocolName = protocolName;
+ final EthPeers ethPeers, final EthMessages ethMessages, final EthScheduler scheduler) {
this.ethPeers = ethPeers;
this.ethMessages = ethMessages;
this.scheduler = scheduler;
}
- public String getProtocolName() {
- return protocolName;
- }
-
public EthPeers getEthPeers() {
return ethPeers;
}
diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthMessages.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthMessages.java
index f0583eb120..be5d6bf04f 100644
--- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthMessages.java
+++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthMessages.java
@@ -15,7 +15,6 @@
import tech.pegasys.pantheon.util.Subscribers;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
public class EthMessages {
@@ -31,18 +30,8 @@ void dispatch(final EthMessage message) {
listeners.forEach(callback -> callback.exec(message));
}
- public long subscribe(final int messageCode, final MessageCallback callback) {
- return listenersByCode
- .computeIfAbsent(messageCode, key -> new Subscribers<>())
- .subscribe(callback);
- }
-
- public void unsubscribe(final long listenerId) {
- for (final Entry> entry : listenersByCode.entrySet()) {
- if (entry.getValue().unsubscribe(listenerId)) {
- break;
- }
- }
+ public void subscribe(final int messageCode, final MessageCallback callback) {
+ listenersByCode.computeIfAbsent(messageCode, key -> new Subscribers<>()).subscribe(callback);
}
@FunctionalInterface
diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java
index 7691f94ef7..670469dfd9 100644
--- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java
+++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java
@@ -27,7 +27,6 @@
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
-import tech.pegasys.pantheon.util.Subscribers;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import tech.pegasys.pantheon.util.uint.UInt256;
@@ -63,7 +62,6 @@ public class EthPeer {
private final AtomicReference> onStatusesExchanged = new AtomicReference<>();
private final PeerReputation reputation = new PeerReputation();
- private final Subscribers disconnectCallbacks = new Subscribers<>();
EthPeer(
final PeerConnection connection,
@@ -110,14 +108,6 @@ public void disconnect(final DisconnectReason reason) {
connection.disconnect(reason);
}
- public long subscribeDisconnect(final DisconnectCallback callback) {
- return disconnectCallbacks.subscribe(callback);
- }
-
- public void unsubscribeDisconnect(final long id) {
- disconnectCallbacks.unsubscribe(id);
- }
-
public ResponseStream send(final MessageData messageData) throws PeerNotConnected {
switch (messageData.getCode()) {
case EthPV62.GET_BLOCK_HEADERS:
@@ -258,7 +248,6 @@ void handleDisconnect() {
bodiesRequestManager.close();
receiptsRequestManager.close();
nodeDataRequestManager.close();
- disconnectCallbacks.forEach(callback -> callback.onDisconnect(this));
}
public void registerKnownBlock(final Hash hash) {
diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeers.java
index 3b48745013..b33bec9ab2 100644
--- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeers.java
+++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeers.java
@@ -73,18 +73,14 @@ public void unsubscribeConnect(final long id) {
connectCallbacks.unsubscribe(id);
}
- public long subscribeDisconnect(final DisconnectCallback callback) {
- return disconnectCallbacks.subscribe(callback);
+ public void subscribeDisconnect(final DisconnectCallback callback) {
+ disconnectCallbacks.subscribe(callback);
}
public int peerCount() {
return connections.size();
}
- public int availablePeerCount() {
- return (int) availablePeers().count();
- }
-
public Stream availablePeers() {
return connections.values().stream().filter(EthPeer::readyForRequests);
}
diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java
index da04086647..201a606cfb 100644
--- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java
+++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java
@@ -82,7 +82,7 @@ public EthProtocolManager(
ethPeers = new EthPeers(getSupportedProtocol());
ethMessages = new EthMessages();
- ethContext = new EthContext(getSupportedProtocol(), ethPeers, ethMessages, scheduler);
+ ethContext = new EthContext(ethPeers, ethMessages, scheduler);
this.blockBroadcaster = new BlockBroadcaster(ethContext);
diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java
index 49354289c7..c36f5404a1 100644
--- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java
+++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java
@@ -242,10 +242,6 @@ private CompletableFuture failAfterTimeout(final Duration timeout) {
return promise;
}
- public void failAfterTimeout(final CompletableFuture promise) {
- failAfterTimeout(promise, defaultTimeout);
- }
-
public void failAfterTimeout(final CompletableFuture promise, final Duration timeout) {
final long delay = timeout.toMillis();
final TimeUnit unit = TimeUnit.MILLISECONDS;
diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthServer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthServer.java
index 22a6c7dfe0..7967521ea2 100644
--- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthServer.java
+++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthServer.java
@@ -65,17 +65,6 @@ class EthServer {
this.setupListeners();
}
- EthServer(
- final Blockchain blockchain,
- final WorldStateArchive worldStateArchive,
- final EthMessages ethMessages) {
- this(
- blockchain,
- worldStateArchive,
- ethMessages,
- EthereumWireProtocolConfiguration.defaultConfig());
- }
-
private void setupListeners() {
ethMessages.subscribe(EthPV62.GET_BLOCK_HEADERS, this::handleGetBlockHeaders);
ethMessages.subscribe(EthPV62.GET_BLOCK_BODIES, this::handleGetBlockBodies);
diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManager.java
index aa626db960..625f67686a 100644
--- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManager.java
+++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManager.java
@@ -148,10 +148,6 @@ public void close() {
dispatchBufferedResponses();
}
- public EthPeer peer() {
- return peer;
- }
-
private void processMessage(final MessageData message) {
if (closed) {
return;
diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractEthTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractEthTask.java
index adc476c502..27ceb21d55 100644
--- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractEthTask.java
+++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractEthTask.java
@@ -25,7 +25,6 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -130,21 +129,6 @@ protected final CompletableFuture executeSubTask(
}
}
- /**
- * Utility for registering completable futures for cleanup if this EthTask is cancelled.
- *
- * @param the type of data returned from the CompletableFuture
- * @param subTaskFuture the future to be registered.
- */
- protected final void registerSubTask(final CompletableFuture subTaskFuture) {
- synchronized (result) {
- if (!isCancelled()) {
- subTaskFutures.add(subTaskFuture);
- subTaskFuture.whenComplete((r, t) -> subTaskFutures.remove(subTaskFuture));
- }
- }
- }
-
/**
* Helper method for sending subTask to worker that will clean up if this EthTask is cancelled.
*
@@ -158,17 +142,6 @@ protected final CompletableFuture executeWorkerSubTask(
return executeSubTask(() -> scheduler.scheduleSyncWorkerTask(subTask));
}
- public final T result() {
- if (!isSucceeded()) {
- return null;
- }
- try {
- return result.get().get();
- } catch (final InterruptedException | ExecutionException e) {
- return null;
- }
- }
-
/** Execute core task logic. */
protected abstract void executeTask();
diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPipelinedTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPipelinedTask.java
deleted file mode 100644
index 4f41f2bc8b..0000000000
--- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPipelinedTask.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package tech.pegasys.pantheon.ethereum.eth.manager.task;
-
-import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.EthTaskException;
-import tech.pegasys.pantheon.metrics.Counter;
-import tech.pegasys.pantheon.metrics.MetricCategory;
-import tech.pegasys.pantheon.metrics.MetricsSystem;
-import tech.pegasys.pantheon.util.ExceptionUtils;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public abstract class AbstractPipelinedTask extends AbstractEthTask> {
- private static final Logger LOG = LogManager.getLogger();
-
- static final int TIMEOUT_MS = 1000;
-
- private final BlockingQueue inboundQueue;
- private final BlockingQueue outboundQueue;
- private final List results;
-
- private boolean shuttingDown = false;
- private final AtomicReference processingException = new AtomicReference<>(null);
-
- private final Counter inboundQueueCounter;
- private final Counter outboundQueueCounter;
-
- protected AbstractPipelinedTask(
- final BlockingQueue inboundQueue,
- final int outboundBacklogSize,
- final MetricsSystem metricsSystem) {
- super(metricsSystem);
- this.inboundQueue = inboundQueue;
- outboundQueue = new LinkedBlockingQueue<>(outboundBacklogSize);
- results = new ArrayList<>();
- this.inboundQueueCounter =
- metricsSystem
- .createLabelledCounter(
- MetricCategory.SYNCHRONIZER,
- "inboundQueueCounter",
- "count of queue items that started processing",
- "taskName")
- .labels(getClass().getSimpleName());
- this.outboundQueueCounter =
- metricsSystem
- .createLabelledCounter(
- MetricCategory.SYNCHRONIZER,
- "outboundQueueCounter",
- "count of queue items that finished processing",
- "taskName")
- .labels(getClass().getSimpleName());
- }
-
- @Override
- protected void executeTask() {
- Optional previousInput = Optional.empty();
- try {
- while (!isDone() && processingException.get() == null) {
- if (shuttingDown && inboundQueue.isEmpty()) {
- break;
- }
- final I input;
- try {
- input = inboundQueue.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
- if (input == null) {
- // timed out waiting for a result
- continue;
- }
- inboundQueueCounter.inc();
- } catch (final InterruptedException e) {
- // this is expected
- continue;
- }
- final Optional output = processStep(input, previousInput);
- output.ifPresent(
- o -> {
- while (!isDone()) {
- try {
- if (outboundQueue.offer(o, 1, TimeUnit.SECONDS)) {
- outboundQueueCounter.inc();
- results.add(o);
- break;
- }
- } catch (final InterruptedException e) {
- processingException.compareAndSet(null, e);
- break;
- }
- }
- });
- previousInput = Optional.of(input);
- }
- } catch (final RuntimeException e) {
- processingException.compareAndSet(null, e);
- }
- if (processingException.get() == null) {
- result.get().complete(results);
- } else {
- result.get().completeExceptionally(processingException.get());
- }
- }
-
- public BlockingQueue getOutboundQueue() {
- return outboundQueue;
- }
-
- public void shutdown() {
- this.shuttingDown = true;
- }
-
- protected void failExceptionally(final Throwable t) {
- Throwable rootCause = ExceptionUtils.rootCause(t);
- if (rootCause instanceof InterruptedException || rootCause instanceof EthTaskException) {
- LOG.debug("Task Failure: {}", t.toString());
- } else {
- LOG.error("Task Failure", t);
- }
-
- processingException.compareAndSet(null, t);
- result.get().completeExceptionally(t);
- cancel();
- }
-
- protected abstract Optional processStep(I input, Optional previousInput);
-}
diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetHeadersFromPeerByHashTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetHeadersFromPeerByHashTask.java
index 76f7f46ad3..3c48e6a9b4 100644
--- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetHeadersFromPeerByHashTask.java
+++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetHeadersFromPeerByHashTask.java
@@ -110,25 +110,6 @@ public static AbstractGetHeadersFromPeerTask endingAtHash(
metricsSystem);
}
- public static AbstractGetHeadersFromPeerTask endingAtHash(
- final ProtocolSchedule> protocolSchedule,
- final EthContext ethContext,
- final Hash lastHash,
- final long lastBlockNumber,
- final int segmentLength,
- final int skip,
- final MetricsSystem metricsSystem) {
- return new GetHeadersFromPeerByHashTask(
- protocolSchedule,
- ethContext,
- lastHash,
- lastBlockNumber,
- segmentLength,
- skip,
- true,
- metricsSystem);
- }
-
public static AbstractGetHeadersFromPeerTask forSingleHash(
final ProtocolSchedule> protocolSchedule,
final EthContext ethContext,
diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetHeadersFromPeerByNumberTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetHeadersFromPeerByNumberTask.java
index 5d096b7231..1d4f5b4df3 100644
--- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetHeadersFromPeerByNumberTask.java
+++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetHeadersFromPeerByNumberTask.java
@@ -53,16 +53,6 @@ public static AbstractGetHeadersFromPeerTask startingAtNumber(
protocolSchedule, ethContext, firstBlockNumber, segmentLength, 0, false, metricsSystem);
}
- public static AbstractGetHeadersFromPeerTask endingAtNumber(
- final ProtocolSchedule> protocolSchedule,
- final EthContext ethContext,
- final long lastlockNumber,
- final int segmentLength,
- final MetricsSystem metricsSystem) {
- return new GetHeadersFromPeerByNumberTask(
- protocolSchedule, ethContext, lastlockNumber, segmentLength, 0, true, metricsSystem);
- }
-
public static AbstractGetHeadersFromPeerTask endingAtNumber(
final ProtocolSchedule> protocolSchedule,
final EthContext ethContext,
diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockHandler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockHandler.java
deleted file mode 100644
index 52433b5e07..0000000000
--- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockHandler.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package tech.pegasys.pantheon.ethereum.eth.sync;
-
-import tech.pegasys.pantheon.ethereum.core.BlockHeader;
-import tech.pegasys.pantheon.ethereum.core.Hash;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-public interface BlockHandler {
- CompletableFuture> downloadBlocks(List headers);
-
- CompletableFuture> validateAndImportBlocks(List blocks);
-
- long extractBlockNumber(B block);
-
- Hash extractBlockHash(B block);
-
- CompletableFuture executeParallelCalculations(List blocks);
-}
diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderManager.java
deleted file mode 100644
index 9f74daf6a4..0000000000
--- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderManager.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package tech.pegasys.pantheon.ethereum.eth.sync;
-
-import static java.util.Collections.emptyList;
-
-import tech.pegasys.pantheon.ethereum.ProtocolContext;
-import tech.pegasys.pantheon.ethereum.chain.Blockchain;
-import tech.pegasys.pantheon.ethereum.core.BlockHeader;
-import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
-import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
-import tech.pegasys.pantheon.ethereum.eth.manager.task.GetHeadersFromPeerByHashTask;
-import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
-import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
-import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
-import tech.pegasys.pantheon.metrics.MetricsSystem;
-import tech.pegasys.pantheon.util.ExceptionUtils;
-
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.TimeoutException;
-
-import com.google.common.collect.Lists;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class CheckpointHeaderManager {
-
- private static final Logger LOG = LogManager.getLogger();
-
- private final Deque checkpointHeaders = new ConcurrentLinkedDeque<>();
- private final SynchronizerConfiguration config;
- private final ProtocolContext protocolContext;
- private final EthContext ethContext;
- private final SyncState syncState;
- private final ProtocolSchedule protocolSchedule;
- private final MetricsSystem metricsSystem;
-
- private int checkpointTimeouts = 0;
-
- public CheckpointHeaderManager(
- final SynchronizerConfiguration config,
- final ProtocolContext protocolContext,
- final EthContext ethContext,
- final SyncState syncState,
- final ProtocolSchedule protocolSchedule,
- final MetricsSystem metricsSystem) {
- this.config = config;
- this.protocolContext = protocolContext;
- this.ethContext = ethContext;
- this.syncState = syncState;
- this.protocolSchedule = protocolSchedule;
- this.metricsSystem = metricsSystem;
- }
-
- public CompletableFuture> pullCheckpointHeaders(final SyncTarget syncTarget) {
- if (!shouldDownloadMoreCheckpoints()) {
- return CompletableFuture.completedFuture(getCheckpointsAwaitingImport());
- }
-
- final BlockHeader lastHeader =
- checkpointHeaders.size() > 0 ? checkpointHeaders.getLast() : syncTarget.commonAncestor();
- // Try to pull more checkpoint headers
- return getAdditionalCheckpointHeaders(syncTarget, lastHeader)
- .thenApply(
- additionalCheckpoints -> {
- if (!additionalCheckpoints.isEmpty()) {
- checkpointTimeouts = 0;
- checkpointHeaders.addAll(additionalCheckpoints);
- LOG.debug("Tracking {} checkpoint headers", checkpointHeaders.size());
- }
- return getCheckpointsAwaitingImport();
- });
- }
-
- protected CompletableFuture> getAdditionalCheckpointHeaders(
- final SyncTarget syncTarget, final BlockHeader lastHeader) {
- return requestAdditionalCheckpointHeaders(lastHeader, syncTarget)
- .handle(
- (headers, t) -> {
- t = ExceptionUtils.rootCause(t);
- if (t instanceof TimeoutException) {
- checkpointTimeouts++;
- return emptyList();
- } else if (t != null) {
- // An error occurred, so no new checkpoints to add.
- return emptyList();
- }
- if (headers.size() > 0
- && checkpointHeaders.size() > 0
- && checkpointHeaders.getLast().equals(headers.get(0))) {
- // Don't push header that is already tracked
- headers.remove(0);
- }
- if (headers.isEmpty()) {
- checkpointTimeouts++;
- }
- return headers;
- });
- }
-
- private CompletableFuture> requestAdditionalCheckpointHeaders(
- final BlockHeader lastHeader, final SyncTarget syncTarget) {
- LOG.debug("Requesting checkpoint headers from {}", lastHeader.getNumber());
- final int skip = config.downloaderChainSegmentSize() - 1;
- final int additionalHeaderCount =
- calculateAdditionalCheckpointHeadersToRequest(lastHeader, skip);
- if (additionalHeaderCount <= 0) {
- return CompletableFuture.completedFuture(emptyList());
- }
- return GetHeadersFromPeerByHashTask.startingAtHash(
- protocolSchedule,
- ethContext,
- lastHeader.getHash(),
- lastHeader.getNumber(),
- // + 1 because lastHeader will be returned as well.
- additionalHeaderCount + 1,
- skip,
- metricsSystem)
- .assignPeer(syncTarget.peer())
- .run()
- .thenApply(PeerTaskResult::getResult);
- }
-
- protected int calculateAdditionalCheckpointHeadersToRequest(
- final BlockHeader lastHeader, final int skip) {
- return config.downloaderHeaderRequestSize();
- }
-
- protected boolean shouldDownloadMoreCheckpoints() {
- return checkpointHeaders.size() < config.downloaderHeaderRequestSize()
- && checkpointTimeouts < config.downloaderCheckpointTimeoutsPermitted();
- }
-
- public boolean checkpointsHaveTimedOut() {
- // We have no more checkpoints, and have been unable to pull any new checkpoints for
- // several cycles.
- return checkpointHeaders.size() == 0
- && checkpointTimeouts >= config.downloaderCheckpointTimeoutsPermitted();
- }
-
- public void clearSyncTarget() {
- checkpointTimeouts = 0;
- checkpointHeaders.clear();
- }
-
- public boolean clearImportedCheckpointHeaders() {
- final Blockchain blockchain = protocolContext.getBlockchain();
- // Update checkpoint headers to reflect if any checkpoints were imported.
- final List imported = new ArrayList<>();
- while (!checkpointHeaders.isEmpty()
- && blockchain.contains(checkpointHeaders.peekFirst().getHash())) {
- imported.add(checkpointHeaders.removeFirst());
- }
- final BlockHeader lastImportedCheckpointHeader = imported.get(imported.size() - 1);
- // The first checkpoint header is always present in the blockchain.
- checkpointHeaders.addFirst(lastImportedCheckpointHeader);
- syncState.setCommonAncestor(lastImportedCheckpointHeader);
- return imported.size() > 1;
- }
-
- public BlockHeader allCheckpointsImported() {
- final BlockHeader lastImportedCheckpoint = checkpointHeaders.getLast();
- checkpointHeaders.clear();
- return lastImportedCheckpoint;
- }
-
- private List getCheckpointsAwaitingImport() {
- return Lists.newArrayList(checkpointHeaders);
- }
-}
diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/EthTaskChainDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/EthTaskChainDownloader.java
deleted file mode 100644
index 2b0c5638ed..0000000000
--- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/EthTaskChainDownloader.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package tech.pegasys.pantheon.ethereum.eth.sync;
-
-import static java.util.Collections.emptyList;
-
-import tech.pegasys.pantheon.ethereum.core.BlockHeader;
-import tech.pegasys.pantheon.ethereum.core.Hash;
-import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
-import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
-import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.EthTaskException;
-import tech.pegasys.pantheon.ethereum.eth.manager.task.WaitForPeersTask;
-import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
-import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
-import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
-import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
-import tech.pegasys.pantheon.metrics.MetricsSystem;
-import tech.pegasys.pantheon.util.ExceptionUtils;
-
-import java.time.Duration;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class EthTaskChainDownloader implements ChainDownloader {
- private static final Logger LOG = LogManager.getLogger();
-
- private final SynchronizerConfiguration config;
- private final EthContext ethContext;
- private final SyncState syncState;
- private final SyncTargetManager syncTargetManager;
- private final CheckpointHeaderManager checkpointHeaderManager;
- private final BlockImportTaskFactory blockImportTaskFactory;
- private final MetricsSystem metricsSystem;
- private final CompletableFuture downloadFuture = new CompletableFuture<>();
-
- private int chainSegmentTimeouts = 0;
-
- private final AtomicBoolean started = new AtomicBoolean(false);
- private CompletableFuture> currentTask;
-
- public EthTaskChainDownloader(
- final SynchronizerConfiguration config,
- final EthContext ethContext,
- final SyncState syncState,
- final SyncTargetManager syncTargetManager,
- final CheckpointHeaderManager checkpointHeaderManager,
- final BlockImportTaskFactory blockImportTaskFactory,
- final MetricsSystem metricsSystem) {
- this.metricsSystem = metricsSystem;
- this.config = config;
- this.ethContext = ethContext;
-
- this.syncState = syncState;
- this.syncTargetManager = syncTargetManager;
- this.checkpointHeaderManager = checkpointHeaderManager;
- this.blockImportTaskFactory = blockImportTaskFactory;
- }
-
- @Override
- public CompletableFuture start() {
- if (started.compareAndSet(false, true)) {
- executeDownload();
- return downloadFuture;
- } else {
- throw new IllegalStateException(
- "Attempt to start an already started " + this.getClass().getSimpleName() + ".");
- }
- }
-
- @Override
- public void cancel() {
- downloadFuture.cancel(true);
- }
-
- @VisibleForTesting
- public CompletableFuture> getCurrentTask() {
- return currentTask;
- }
-
- private void executeDownload() {
- if (downloadFuture.isDone()) {
- return;
- }
- // Find target, pull checkpoint headers, import, repeat
- currentTask =
- waitForPeers()
- .thenCompose(r -> syncTargetManager.findSyncTarget(syncState.syncTarget()))
- .thenApply(this::updateSyncState)
- .thenCompose(this::pullCheckpointHeaders)
- .thenCompose(this::importBlocks)
- .thenCompose(r -> checkSyncTarget())
- .whenComplete(
- (r, t) -> {
- if (t != null) {
- final Throwable rootCause = ExceptionUtils.rootCause(t);
- if (rootCause instanceof CancellationException) {
- LOG.trace("Download cancelled", t);
- } else if (rootCause instanceof InvalidBlockException) {
- LOG.debug("Invalid block downloaded", t);
- } else if (rootCause instanceof EthTaskException) {
- LOG.debug(rootCause.toString());
- } else if (rootCause instanceof InterruptedException) {
- LOG.trace("Interrupted while downloading chain", rootCause);
- } else {
- LOG.error("Error encountered while downloading", t);
- }
- // On error, wait a bit before retrying
- ethContext
- .getScheduler()
- .scheduleFutureTask(this::executeDownload, Duration.ofSeconds(2));
- } else if (syncTargetManager.shouldContinueDownloading()) {
- executeDownload();
- } else {
- LOG.info("Chain download complete");
- downloadFuture.complete(null);
- }
- });
- }
-
- private SyncTarget updateSyncState(final SyncTarget newTarget) {
- if (isSameAsCurrentTarget(newTarget)) {
- return syncState.syncTarget().get();
- }
- return syncState.setSyncTarget(newTarget.peer(), newTarget.commonAncestor());
- }
-
- private Boolean isSameAsCurrentTarget(final SyncTarget newTarget) {
- return syncState
- .syncTarget()
- .map(currentTarget -> currentTarget.equals(newTarget))
- .orElse(false);
- }
-
- private CompletableFuture> pullCheckpointHeaders(final SyncTarget syncTarget) {
- return syncTarget.peer().isDisconnected()
- ? CompletableFuture.completedFuture(emptyList())
- : checkpointHeaderManager.pullCheckpointHeaders(syncTarget);
- }
-
- private CompletableFuture> waitForPeers() {
- return WaitForPeersTask.create(ethContext, 1, metricsSystem).run();
- }
-
- private CompletableFuture checkSyncTarget() {
- final Optional maybeSyncTarget = syncState.syncTarget();
- if (!maybeSyncTarget.isPresent()) {
- // No sync target, so nothing to check.
- return CompletableFuture.completedFuture(null);
- }
-
- final SyncTarget syncTarget = maybeSyncTarget.get();
- if (syncTargetManager.shouldSwitchSyncTarget(syncTarget)) {
- LOG.info("Better sync target found, clear current sync target: {}.", syncTarget);
- clearSyncTarget(syncTarget);
- return CompletableFuture.completedFuture(null);
- }
- if (finishedSyncingToCurrentTarget(syncTarget)) {
- LOG.info("Finished syncing to target: {}.", syncTarget);
- clearSyncTarget(syncTarget);
- // Wait a bit before checking for a new sync target
- final CompletableFuture future = new CompletableFuture<>();
- ethContext
- .getScheduler()
- .scheduleFutureTask(() -> future.complete(null), Duration.ofSeconds(10));
- return future;
- }
- return CompletableFuture.completedFuture(null);
- }
-
- private boolean finishedSyncingToCurrentTarget(final SyncTarget syncTarget) {
- return !syncTargetManager.syncTargetCanProvideMoreBlocks(syncTarget)
- || checkpointHeaderManager.checkpointsHaveTimedOut()
- || chainSegmentsHaveTimedOut();
- }
-
- private boolean chainSegmentsHaveTimedOut() {
- return chainSegmentTimeouts >= config.downloaderChainSegmentTimeoutsPermitted();
- }
-
- private void clearSyncTarget() {
- syncState.syncTarget().ifPresent(this::clearSyncTarget);
- }
-
- private void clearSyncTarget(final SyncTarget syncTarget) {
- chainSegmentTimeouts = 0;
- checkpointHeaderManager.clearSyncTarget();
- syncState.clearSyncTarget();
- }
-
- private CompletableFuture> importBlocks(final List checkpointHeaders) {
- if (checkpointHeaders.isEmpty()) {
- // No checkpoints to download
- return CompletableFuture.completedFuture(emptyList());
- }
-
- final CompletableFuture> importedBlocks =
- blockImportTaskFactory.importBlocksForCheckpoints(checkpointHeaders);
-
- return importedBlocks.whenComplete(
- (r, t) -> {
- t = ExceptionUtils.rootCause(t);
- if (t instanceof InvalidBlockException) {
- // Blocks were invalid, meaning our checkpoints are wrong
- // Reset sync target
- final Optional maybeSyncTarget = syncState.syncTarget();
- maybeSyncTarget.ifPresent(
- target -> target.peer().disconnect(DisconnectReason.BREACH_OF_PROTOCOL));
- final String peerDescriptor =
- maybeSyncTarget
- .map(SyncTarget::peer)
- .map(EthPeer::toString)
- .orElse("(unknown - already disconnected)");
- LOG.warn(
- "Invalid block discovered while downloading from peer {}. Disconnect.",
- peerDescriptor);
- clearSyncTarget();
- } else if (t != null || r.isEmpty()) {
- if (t != null) {
- final Throwable rootCause = ExceptionUtils.rootCause(t);
- if (rootCause instanceof EthTaskException) {
- LOG.debug(rootCause.toString());
- } else if (rootCause instanceof InterruptedException) {
- LOG.trace("Interrupted while importing blocks", rootCause);
- } else {
- LOG.error("Encountered error importing blocks", t);
- }
- }
- if (checkpointHeaderManager.clearImportedCheckpointHeaders()) {
- chainSegmentTimeouts = 0;
- }
- if (t instanceof TimeoutException || r != null) {
- // Download timed out, or returned no new blocks
- chainSegmentTimeouts++;
- }
- } else {
- chainSegmentTimeouts = 0;
-
- final BlockHeader lastImportedCheckpoint =
- checkpointHeaderManager.allCheckpointsImported();
- syncState.setCommonAncestor(lastImportedCheckpoint);
- }
- });
- }
-
- public interface BlockImportTaskFactory {
- CompletableFuture> importBlocksForCheckpoints(
- final List checkpointHeaders);
- }
-}
diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SyncTargetManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SyncTargetManager.java
index 6b9c67b445..1c56906f86 100644
--- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SyncTargetManager.java
+++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SyncTargetManager.java
@@ -120,14 +120,5 @@ private CompletableFuture> waitForNewPeer() {
.timeout(WaitForPeerTask.create(ethContext, metricsSystem), Duration.ofSeconds(5));
}
- public abstract boolean shouldSwitchSyncTarget(final SyncTarget currentTarget);
-
public abstract boolean shouldContinueDownloading();
-
- public abstract boolean isSyncTargetReached(final EthPeer peer);
-
- public boolean syncTargetCanProvideMoreBlocks(final SyncTarget syncTarget) {
- final EthPeer currentSyncingPeer = syncTarget.peer();
- return !currentSyncingPeer.isDisconnected() && !isSyncTargetReached(currentSyncingPeer);
- }
}
diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java
index c7eec2d0d7..539c673a87 100644
--- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java
+++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java
@@ -54,13 +54,11 @@ public class SynchronizerConfiguration {
private final UInt256 downloaderChangeTargetThresholdByTd;
private final int downloaderHeaderRequestSize;
private final int downloaderCheckpointTimeoutsPermitted;
- private final int downloaderChainSegmentTimeoutsPermitted;
private final int downloaderChainSegmentSize;
private final int downloaderParallelism;
private final int transactionsParallelism;
private final int computationParallelism;
private final int maxTrailingPeers;
- private final boolean piplineDownloaderForFullSyncEnabled;
private final long worldStateMinMillisBeforeStalling;
private SynchronizerConfiguration(
@@ -77,13 +75,11 @@ private SynchronizerConfiguration(
final UInt256 downloaderChangeTargetThresholdByTd,
final int downloaderHeaderRequestSize,
final int downloaderCheckpointTimeoutsPermitted,
- final int downloaderChainSegmentTimeoutsPermitted,
final int downloaderChainSegmentSize,
final int downloaderParallelism,
final int transactionsParallelism,
final int computationParallelism,
- final int maxTrailingPeers,
- final boolean piplineDownloaderForFullSyncEnabled) {
+ final int maxTrailingPeers) {
this.fastSyncPivotDistance = fastSyncPivotDistance;
this.fastSyncFullValidationRate = fastSyncFullValidationRate;
this.fastSyncMinimumPeerCount = fastSyncMinimumPeerCount;
@@ -97,13 +93,11 @@ private SynchronizerConfiguration(
this.downloaderChangeTargetThresholdByTd = downloaderChangeTargetThresholdByTd;
this.downloaderHeaderRequestSize = downloaderHeaderRequestSize;
this.downloaderCheckpointTimeoutsPermitted = downloaderCheckpointTimeoutsPermitted;
- this.downloaderChainSegmentTimeoutsPermitted = downloaderChainSegmentTimeoutsPermitted;
this.downloaderChainSegmentSize = downloaderChainSegmentSize;
this.downloaderParallelism = downloaderParallelism;
this.transactionsParallelism = transactionsParallelism;
this.computationParallelism = computationParallelism;
this.maxTrailingPeers = maxTrailingPeers;
- this.piplineDownloaderForFullSyncEnabled = piplineDownloaderForFullSyncEnabled;
}
public static Builder builder() {
@@ -155,10 +149,6 @@ public int downloaderCheckpointTimeoutsPermitted() {
return downloaderCheckpointTimeoutsPermitted;
}
- public int downloaderChainSegmentTimeoutsPermitted() {
- return downloaderChainSegmentTimeoutsPermitted;
- }
-
public int downloaderChainSegmentSize() {
return downloaderChainSegmentSize;
}
@@ -210,10 +200,6 @@ public int getMaxTrailingPeers() {
return maxTrailingPeers;
}
- public boolean isPiplineDownloaderForFullSyncEnabled() {
- return piplineDownloaderForFullSyncEnabled;
- }
-
public static class Builder {
private SyncMode syncMode = SyncMode.FULL;
private int fastSyncMinimumPeerCount = DEFAULT_FAST_SYNC_MINIMUM_PEERS;
@@ -273,15 +259,6 @@ public void parseBlockPropagationRange(final String arg) {
"Number of tries to attempt to download checkpoints before stopping (default: ${DEFAULT-VALUE})")
private int downloaderCheckpointTimeoutsPermitted = 5;
- @CommandLine.Option(
- names = "--Xsynchronizer-downloader-chain-segment-timeouts-permitted",
- hidden = true,
- defaultValue = "5",
- paramLabel = "",
- description =
- "Number of times to attempt to download chain segments before stopping (default: ${DEFAULT-VALUE})")
- private int downloaderChainSegmentTimeoutsPermitted = 5;
-
@CommandLine.Option(
names = "--Xsynchronizer-downloader-chain-segment-size",
hidden = true,
@@ -371,14 +348,6 @@ public void parseBlockPropagationRange(final String arg) {
"Minimum time in ms without progress before considering a world state download as stalled (default: ${DEFAULT-VALUE})")
private long worldStateMinMillisBeforeStalling = DEFAULT_WORLD_STATE_MIN_MILLIS_BEFORE_STALLING;
- @CommandLine.Option(
- names = "--Xsynchronizer-pipeline-full-sync-enabled",
- hidden = true,
- defaultValue = "true",
- paramLabel = "",
- description = "Enable the pipeline based chain downloader during full synchronization")
- private Boolean piplineDownloaderForFullSyncEnabled = true;
-
public Builder fastSyncPivotDistance(final int distance) {
fastSyncPivotDistance = distance;
return this;
@@ -417,12 +386,6 @@ public Builder downloaderCheckpointTimeoutsPermitted(
return this;
}
- public Builder downloaderChainSegmentTimeoutsPermitted(
- final int downloaderChainSegmentTimeoutsPermitted) {
- this.downloaderChainSegmentTimeoutsPermitted = downloaderChainSegmentTimeoutsPermitted;
- return this;
- }
-
public Builder downloaderChainSegmentSize(final int downloaderChainSegmentSize) {
this.downloaderChainSegmentSize = downloaderChainSegmentSize;
return this;
@@ -480,12 +443,6 @@ public Builder maxTrailingPeers(final int maxTailingPeers) {
return this;
}
- public Builder piplineDownloaderForFullSyncEnabled(
- final Boolean piplineDownloaderForFullSyncEnabled) {
- this.piplineDownloaderForFullSyncEnabled = piplineDownloaderForFullSyncEnabled;
- return this;
- }
-
public SynchronizerConfiguration build() {
return new SynchronizerConfiguration(
fastSyncPivotDistance,
@@ -501,13 +458,11 @@ public SynchronizerConfiguration build() {
downloaderChangeTargetThresholdByTd,
downloaderHeaderRequestSize,
downloaderCheckpointTimeoutsPermitted,
- downloaderChainSegmentTimeoutsPermitted,
downloaderChainSegmentSize,
downloaderParallelism,
transactionsParallelism,
computationParallelism,
- maxTrailingPeers,
- piplineDownloaderForFullSyncEnabled);
+ maxTrailingPeers);
}
}
}
diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java
deleted file mode 100644
index aa24d5d9b3..0000000000
--- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package tech.pegasys.pantheon.ethereum.eth.sync.fastsync;
-
-import static java.util.Collections.emptyList;
-import static tech.pegasys.pantheon.util.FutureUtils.completedExceptionally;
-
-import tech.pegasys.pantheon.ethereum.ProtocolContext;
-import tech.pegasys.pantheon.ethereum.core.Block;
-import tech.pegasys.pantheon.ethereum.core.BlockHeader;
-import tech.pegasys.pantheon.ethereum.core.BlockImporter;
-import tech.pegasys.pantheon.ethereum.core.Hash;
-import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
-import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
-import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
-import tech.pegasys.pantheon.ethereum.eth.sync.ValidationPolicy;
-import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask;
-import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetReceiptsForHeadersTask;
-import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
-import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
-import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec;
-import tech.pegasys.pantheon.metrics.MetricsSystem;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class FastSyncBlockHandler implements BlockHandler {
- private static final Logger LOG = LogManager.getLogger();
-
- private final ProtocolSchedule protocolSchedule;
- private final ProtocolContext protocolContext;
- private final EthContext ethContext;
- private final MetricsSystem metricsSystem;
- private final ValidationPolicy headerValidationPolicy;
- private final ValidationPolicy ommerValidationPolicy;
-
- public FastSyncBlockHandler(
- final ProtocolSchedule protocolSchedule,
- final ProtocolContext protocolContext,
- final EthContext ethContext,
- final MetricsSystem metricsSystem,
- final ValidationPolicy headerValidationPolicy,
- final ValidationPolicy ommerValidationPolicy) {
- this.protocolSchedule = protocolSchedule;
- this.protocolContext = protocolContext;
- this.ethContext = ethContext;
- this.metricsSystem = metricsSystem;
- this.headerValidationPolicy = headerValidationPolicy;
- this.ommerValidationPolicy = ommerValidationPolicy;
- }
-
- @Override
- public CompletableFuture> downloadBlocks(
- final List headers) {
- return downloadBodies(headers)
- .thenCombine(downloadReceipts(headers), this::combineBlocksAndReceipts);
- }
-
- private CompletableFuture> downloadBodies(final List headers) {
- return CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, headers, metricsSystem)
- .run();
- }
-
- private CompletableFuture