diff --git a/consensus/ibftlegacy/src/main/java/tech/pegasys/pantheon/consensus/ibftlegacy/protocol/Istanbul64ProtocolManager.java b/consensus/ibftlegacy/src/main/java/tech/pegasys/pantheon/consensus/ibftlegacy/protocol/Istanbul64ProtocolManager.java index d7ed51ca45..262c23c062 100644 --- a/consensus/ibftlegacy/src/main/java/tech/pegasys/pantheon/consensus/ibftlegacy/protocol/Istanbul64ProtocolManager.java +++ b/consensus/ibftlegacy/src/main/java/tech/pegasys/pantheon/consensus/ibftlegacy/protocol/Istanbul64ProtocolManager.java @@ -21,6 +21,7 @@ import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive; import tech.pegasys.pantheon.metrics.MetricsSystem; +import java.time.Clock; import java.util.List; /** This allows for interoperability with Quorum, but shouldn't be used otherwise. */ @@ -34,6 +35,7 @@ public Istanbul64ProtocolManager( final int syncWorkers, final int txWorkers, final int computationWorkers, + final Clock clock, final MetricsSystem metricsSystem, final EthereumWireProtocolConfiguration ethereumWireProtocolConfiguration) { super( @@ -44,6 +46,7 @@ public Istanbul64ProtocolManager( syncWorkers, txWorkers, computationWorkers, + clock, metricsSystem, ethereumWireProtocolConfiguration); } @@ -56,6 +59,7 @@ public Istanbul64ProtocolManager( final int syncWorkers, final int txWorkers, final int computationWorkers, + final Clock clock, final MetricsSystem metricsSystem) { super( blockchain, @@ -65,6 +69,7 @@ public Istanbul64ProtocolManager( syncWorkers, txWorkers, computationWorkers, + clock, metricsSystem); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java index 670469dfd9..08f1d3726b 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java @@ -30,6 +30,7 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; import tech.pegasys.pantheon.util.uint.UInt256; +import java.time.Clock; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -45,16 +46,21 @@ public class EthPeer { private static final Logger LOG = LogManager.getLogger(); + + private static final int MAX_OUTSTANDING_REQUESTS = 5; + private final PeerConnection connection; private final int maxTrackedSeenBlocks = 300; private final Set knownBlocks; private final String protocolName; + private final Clock clock; private final ChainState chainHeadState; private final AtomicBoolean statusHasBeenSentToPeer = new AtomicBoolean(false); private final AtomicBoolean statusHasBeenReceivedFromPeer = new AtomicBoolean(false); + private volatile long lastRequestTimestamp = 0; private final RequestManager headersRequestManager = new RequestManager(this); private final RequestManager bodiesRequestManager = new RequestManager(this); private final RequestManager receiptsRequestManager = new RequestManager(this); @@ -66,9 +72,11 @@ public class EthPeer { EthPeer( final PeerConnection connection, final String protocolName, - final Consumer onStatusesExchanged) { + final Consumer onStatusesExchanged, + final Clock clock) { this.connection = connection; this.protocolName = protocolName; + this.clock = clock; knownBlocks = Collections.newSetFromMap( Collections.synchronizedMap( @@ -111,13 +119,13 @@ public void disconnect(final DisconnectReason reason) { public ResponseStream send(final MessageData messageData) throws PeerNotConnected { switch (messageData.getCode()) { case EthPV62.GET_BLOCK_HEADERS: - return sendHeadersRequest(messageData); + return sendRequest(headersRequestManager, messageData); case EthPV62.GET_BLOCK_BODIES: - return sendBodiesRequest(messageData); + return sendRequest(bodiesRequestManager, messageData); case EthPV63.GET_RECEIPTS: - return sendReceiptsRequest(messageData); + return sendRequest(receiptsRequestManager, messageData); case EthPV63.GET_NODE_DATA: - return sendNodeDataRequest(messageData); + return sendRequest(nodeDataRequestManager, messageData); default: connection.sendForProtocol(protocolName, messageData); return null; @@ -129,7 +137,7 @@ public ResponseStream getHeadersByHash( throws PeerNotConnected { final GetBlockHeadersMessage message = GetBlockHeadersMessage.create(hash, maxHeaders, skip, reverse); - return sendHeadersRequest(message); + return sendRequest(headersRequestManager, message); } public ResponseStream getHeadersByNumber( @@ -137,44 +145,29 @@ public ResponseStream getHeadersByNumber( throws PeerNotConnected { final GetBlockHeadersMessage message = GetBlockHeadersMessage.create(blockNumber, maxHeaders, skip, reverse); - return sendHeadersRequest(message); + return sendRequest(headersRequestManager, message); } - private ResponseStream sendHeadersRequest(final MessageData messageData) throws PeerNotConnected { - return headersRequestManager.dispatchRequest( + private ResponseStream sendRequest( + final RequestManager requestManager, final MessageData messageData) throws PeerNotConnected { + lastRequestTimestamp = clock.millis(); + return requestManager.dispatchRequest( () -> connection.sendForProtocol(protocolName, messageData)); } public ResponseStream getBodies(final List blockHashes) throws PeerNotConnected { final GetBlockBodiesMessage message = GetBlockBodiesMessage.create(blockHashes); - return sendBodiesRequest(message); - } - - private ResponseStream sendBodiesRequest(final MessageData messageData) throws PeerNotConnected { - return bodiesRequestManager.dispatchRequest( - () -> connection.sendForProtocol(protocolName, messageData)); + return sendRequest(bodiesRequestManager, message); } public ResponseStream getReceipts(final List blockHashes) throws PeerNotConnected { final GetReceiptsMessage message = GetReceiptsMessage.create(blockHashes); - return sendReceiptsRequest(message); - } - - private ResponseStream sendReceiptsRequest(final MessageData messageData) - throws PeerNotConnected { - return receiptsRequestManager.dispatchRequest( - () -> connection.sendForProtocol(protocolName, messageData)); + return sendRequest(receiptsRequestManager, message); } public ResponseStream getNodeData(final Iterable nodeHashes) throws PeerNotConnected { final GetNodeDataMessage message = GetNodeDataMessage.create(nodeHashes); - return sendNodeDataRequest(message); - } - - private ResponseStream sendNodeDataRequest(final MessageData messageData) - throws PeerNotConnected { - return nodeDataRequestManager.dispatchRequest( - () -> connection.sendForProtocol(protocolName, messageData)); + return sendRequest(nodeDataRequestManager, message); } boolean validateReceivedMessage(final EthMessage message) { @@ -317,6 +310,14 @@ public int outstandingRequests() { + nodeDataRequestManager.outstandingRequests(); } + public long getLastRequestTimestamp() { + return lastRequestTimestamp; + } + + public boolean hasAvailableRequestCapacity() { + return outstandingRequests() < MAX_OUTSTANDING_REQUESTS; + } + public BytesValue nodeId() { return connection.getPeerInfo().getNodeId(); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeers.java index b33bec9ab2..75ea1a7795 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeers.java @@ -14,11 +14,14 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer.DisconnectCallback; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; +import tech.pegasys.pantheon.metrics.MetricCategory; +import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.util.Subscribers; -import java.util.Collections; +import java.time.Clock; +import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -36,20 +39,29 @@ public class EthPeers { public static final Comparator BEST_CHAIN = TOTAL_DIFFICULTY.thenComparing(CHAIN_HEIGHT); public static final Comparator LEAST_TO_MOST_BUSY = - Comparator.comparing(EthPeer::outstandingRequests); + Comparator.comparing(EthPeer::outstandingRequests) + .thenComparing(EthPeer::getLastRequestTimestamp); - private final int maxOutstandingRequests = 5; private final Map connections = new ConcurrentHashMap<>(); private final String protocolName; + private final Clock clock; private final Subscribers connectCallbacks = new Subscribers<>(); private final Subscribers disconnectCallbacks = new Subscribers<>(); + private final Collection pendingRequests = new ArrayList<>(); - public EthPeers(final String protocolName) { + public EthPeers(final String protocolName, final Clock clock, final MetricsSystem metricsSystem) { this.protocolName = protocolName; + this.clock = clock; + metricsSystem.createIntegerGauge( + MetricCategory.PEERS, + "pending_peer_requests_current", + "Number of peer requests currently pending because peers are busy", + pendingRequests::size); } void registerConnection(final PeerConnection peerConnection) { - final EthPeer peer = new EthPeer(peerConnection, protocolName, this::invokeConnectionCallbacks); + final EthPeer peer = + new EthPeer(peerConnection, protocolName, this::invokeConnectionCallbacks, clock); connections.putIfAbsent(peerConnection, peer); } @@ -59,12 +71,38 @@ void registerDisconnect(final PeerConnection connection) { disconnectCallbacks.forEach(callback -> callback.onDisconnect(peer)); peer.handleDisconnect(); } + checkPendingConnections(); } public EthPeer peer(final PeerConnection peerConnection) { return connections.get(peerConnection); } + public PendingPeerRequest executePeerRequest( + final PeerRequest request, final long minimumBlockNumber, final Optional peer) { + final PendingPeerRequest pendingPeerRequest = + new PendingPeerRequest(this, request, minimumBlockNumber, peer); + synchronized (this) { + if (!pendingPeerRequest.attemptExecution()) { + pendingRequests.add(pendingPeerRequest); + } + } + return pendingPeerRequest; + } + + public void dispatchMessage(final EthPeer peer, final EthMessage ethMessage) { + peer.dispatch(ethMessage); + if (peer.hasAvailableRequestCapacity()) { + checkPendingConnections(); + } + } + + private void checkPendingConnections() { + synchronized (this) { + pendingRequests.removeIf(PendingPeerRequest::attemptExecution); + } + } + public long subscribeConnect(final ConnectCallback callback) { return connectCallbacks.subscribe(callback); } @@ -89,23 +127,6 @@ public Optional bestPeer() { return availablePeers().max(BEST_CHAIN); } - public Optional idlePeer() { - return idlePeers().min(LEAST_TO_MOST_BUSY); - } - - private Stream idlePeers() { - final List peers = - availablePeers() - .filter(p -> p.outstandingRequests() < maxOutstandingRequests) - .collect(Collectors.toList()); - Collections.shuffle(peers); - return peers.stream(); - } - - public Optional idlePeer(final long withBlocksUpTo) { - return idlePeers().filter(p -> p.chainState().getEstimatedHeight() >= withBlocksUpTo).findAny(); - } - @FunctionalInterface public interface ConnectCallback { void onPeerConnected(EthPeer newPeer); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java index 201a606cfb..0f96453934 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java @@ -35,6 +35,7 @@ import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.util.uint.UInt256; +import java.time.Clock; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -71,7 +72,9 @@ public EthProtocolManager( final int networkId, final boolean fastSyncEnabled, final EthScheduler scheduler, - final EthereumWireProtocolConfiguration ethereumWireProtocolConfiguration) { + final EthereumWireProtocolConfiguration ethereumWireProtocolConfiguration, + final Clock clock, + final MetricsSystem metricsSystem) { this.networkId = networkId; this.scheduler = scheduler; this.blockchain = blockchain; @@ -80,7 +83,7 @@ public EthProtocolManager( this.shutdown = new CountDownLatch(1); genesisHash = blockchain.getBlockHashByNumber(0L).get(); - ethPeers = new EthPeers(getSupportedProtocol()); + ethPeers = new EthPeers(getSupportedProtocol(), clock, metricsSystem); ethMessages = new EthMessages(); ethContext = new EthContext(ethPeers, ethMessages, scheduler); @@ -98,6 +101,7 @@ public EthProtocolManager( final int syncWorkers, final int txWorkers, final int computationWorkers, + final Clock clock, final MetricsSystem metricsSystem) { this( blockchain, @@ -105,7 +109,9 @@ public EthProtocolManager( networkId, fastSyncEnabled, new EthScheduler(syncWorkers, txWorkers, computationWorkers, metricsSystem), - EthereumWireProtocolConfiguration.defaultConfig()); + EthereumWireProtocolConfiguration.defaultConfig(), + clock, + metricsSystem); } public EthProtocolManager( @@ -116,6 +122,7 @@ public EthProtocolManager( final int syncWorkers, final int txWorkers, final int computationWorkers, + final Clock clock, final MetricsSystem metricsSystem, final EthereumWireProtocolConfiguration ethereumWireProtocolConfiguration) { this( @@ -124,7 +131,9 @@ public EthProtocolManager( networkId, fastSyncEnabled, new EthScheduler(syncWorkers, txWorkers, computationWorkers, metricsSystem), - ethereumWireProtocolConfiguration); + ethereumWireProtocolConfiguration, + clock, + metricsSystem); } public EthContext ethContext() { @@ -192,7 +201,7 @@ public void processMessage(final Capability cap, final Message message) { peer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL); return; } - peer.dispatch(ethMessage); + ethPeers.dispatchMessage(peer, ethMessage); ethMessages.dispatch(ethMessage); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/PeerRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/PeerRequest.java new file mode 100644 index 0000000000..3d98d3178c --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/PeerRequest.java @@ -0,0 +1,20 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.manager; + +import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream; +import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; + +public interface PeerRequest { + ResponseStream sendRequest(EthPeer peer) throws PeerNotConnected; +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/PendingPeerRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/PendingPeerRequest.java new file mode 100644 index 0000000000..3cd1d1d964 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/PendingPeerRequest.java @@ -0,0 +1,120 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.manager; + +import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream; +import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.NoAvailablePeersException; +import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerDisconnectedException; +import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; + +import java.util.Optional; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.Consumer; + +public class PendingPeerRequest { + private final EthPeers ethPeers; + private final PeerRequest request; + private final CompletableFuture result = new CompletableFuture<>(); + private final long minimumBlockNumber; + private final Optional peer; + + PendingPeerRequest( + final EthPeers ethPeers, + final PeerRequest request, + final long minimumBlockNumber, + final Optional peer) { + this.ethPeers = ethPeers; + this.request = request; + this.minimumBlockNumber = minimumBlockNumber; + this.peer = peer; + } + + /** + * Attempts to find an available peer and execute the peer request. + * + * @return true if the request should be removed from the pending list, otherwise false. + */ + public boolean attemptExecution() { + if (result.isDone()) { + return true; + } + final Optional leastBusySuitablePeer = getLeastBusySuitablePeer(); + if (!leastBusySuitablePeer.isPresent()) { + // No peers have the required height. + result.completeExceptionally(new NoAvailablePeersException()); + return true; + } else { + // At least one peer has the required height, but we not be able to use it if it's busy + final Optional selectedPeer = + leastBusySuitablePeer.filter(EthPeer::hasAvailableRequestCapacity); + + selectedPeer.ifPresent(this::sendRequest); + return selectedPeer.isPresent(); + } + } + + private synchronized void sendRequest(final EthPeer peer) { + // Recheck if we should send the request now we're inside the synchronized block + if (!result.isDone()) { + try { + final ResponseStream responseStream = request.sendRequest(peer); + result.complete(responseStream); + } catch (final PeerNotConnected e) { + result.completeExceptionally(new PeerDisconnectedException(peer)); + } + } + } + + private Optional getLeastBusySuitablePeer() { + return peer.isPresent() + ? peer + : ethPeers + .availablePeers() + .filter(peer -> peer.chainState().getEstimatedHeight() >= minimumBlockNumber) + .min(EthPeers.LEAST_TO_MOST_BUSY); + } + + /** + * Register callbacks for when the request is made or + * + * @param onSuccess handler for when a peer becomes available and the request is sent + * @param onError handler for when there is no peer with sufficient height or the request fails to + * send + */ + public void then(final Consumer onSuccess, final Consumer onError) { + result.whenComplete( + (result, error) -> { + if (error != null) { + onError.accept(error); + } else { + onSuccess.accept(result); + } + }); + } + + /** + * Abort this request. + * + * @return the response stream if the request has already been sent, otherwise empty. + */ + public synchronized Optional abort() { + try { + result.cancel(false); + return Optional.ofNullable(result.getNow(null)); + } catch (final CancellationException | CompletionException e) { + return Optional.empty(); + } + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManager.java index 625f67686a..ae49c56ffc 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManager.java @@ -148,6 +148,10 @@ public void close() { dispatchBufferedResponses(); } + public EthPeer getPeer() { + return peer; + } + private void processMessage(final MessageData message) { if (closed) { return; diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractGetHeadersFromPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractGetHeadersFromPeerTask.java index ec64bd3f32..5e279cefad 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractGetHeadersFromPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractGetHeadersFromPeerTask.java @@ -41,12 +41,10 @@ public abstract class AbstractGetHeadersFromPeerTask protected final int count; protected final int skip; protected final boolean reverse; - private final long minimumRequiredBlockNumber; protected AbstractGetHeadersFromPeerTask( final ProtocolSchedule protocolSchedule, final EthContext ethContext, - final long minimumRequiredBlockNumber, final int count, final int skip, final boolean reverse, @@ -57,7 +55,6 @@ protected AbstractGetHeadersFromPeerTask( this.count = count; this.skip = skip; this.reverse = reverse; - this.minimumRequiredBlockNumber = minimumRequiredBlockNumber; } @Override @@ -106,10 +103,5 @@ protected Optional> processResponse( return Optional.of(headersList); } - @Override - protected Optional findSuitablePeer() { - return ethContext.getEthPeers().idlePeer(minimumRequiredBlockNumber); - } - protected abstract boolean matchesFirstHeader(BlockHeader firstHeader); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPeerRequestTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPeerRequestTask.java index 7e71391db2..0574f6ccf2 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPeerRequestTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPeerRequestTask.java @@ -14,10 +14,11 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; +import tech.pegasys.pantheon.ethereum.eth.manager.PeerRequest; +import tech.pegasys.pantheon.ethereum.eth.manager.PendingPeerRequest; import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream; import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerBreachedProtocolException; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; -import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.ethereum.rlp.RLPException; import tech.pegasys.pantheon.metrics.MetricsSystem; @@ -33,7 +34,7 @@ public abstract class AbstractPeerRequestTask extends AbstractPeerTask { private Duration timeout = DEFAULT_TIMEOUT; private final int requestCode; - private volatile ResponseStream responseStream; + private volatile PendingPeerRequest responseStream; protected AbstractPeerRequestTask( final EthContext ethContext, final int requestCode, final MetricsSystem metricsSystem) { @@ -47,28 +48,39 @@ public AbstractPeerRequestTask setTimeout(final Duration timeout) { } @Override - protected final void executeTaskWithPeer(final EthPeer peer) throws PeerNotConnected { + protected final void executeTask() { final CompletableFuture promise = new CompletableFuture<>(); - responseStream = - sendRequest(peer) - .then( - (streamClosed, message, peer1) -> - handleMessage(promise, streamClosed, message, peer1)); + responseStream = sendRequest(); + responseStream.then( + stream -> { + // Start the timeout now that the request has actually been sent + ethContext.getScheduler().failAfterTimeout(promise, timeout); + + stream.then( + (streamClosed, message, peer1) -> + handleMessage(promise, streamClosed, message, peer1)); + }, + promise::completeExceptionally); promise.whenComplete( (r, t) -> { + final Optional responseStream = this.responseStream.abort(); if (t != null) { t = ExceptionUtils.rootCause(t); - if (t instanceof TimeoutException) { - peer.recordRequestTimeout(requestCode); + if (t instanceof TimeoutException && responseStream.isPresent()) { + responseStream.get().getPeer().recordRequestTimeout(requestCode); } result.get().completeExceptionally(t); } else if (r != null) { - result.get().complete(new PeerTaskResult<>(peer, r)); + // If we got a response we must have had a response stream... + result.get().complete(new PeerTaskResult<>(responseStream.get().getPeer(), r)); } }); + } - ethContext.getScheduler().failAfterTimeout(promise, timeout); + public PendingPeerRequest sendRequestToPeer( + final PeerRequest request, final long minimumBlockNumber) { + return ethContext.getEthPeers().executePeerRequest(request, minimumBlockNumber, assignedPeer); } private void handleMessage( @@ -93,13 +105,10 @@ private void handleMessage( @Override protected void cleanup() { super.cleanup(); - final ResponseStream stream = responseStream; - if (stream != null) { - stream.close(); - } + responseStream.abort().ifPresent(ResponseStream::close); } - protected abstract ResponseStream sendRequest(EthPeer peer) throws PeerNotConnected; + protected abstract PendingPeerRequest sendRequest(); protected abstract Optional processResponse( boolean streamClosed, MessageData message, EthPeer peer); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPeerTask.java index 14d413c2b1..88e25b313e 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPeerTask.java @@ -14,10 +14,7 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; -import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.NoAvailablePeersException; -import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerDisconnectedException; import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; -import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; import tech.pegasys.pantheon.metrics.MetricsSystem; import java.util.Optional; @@ -31,32 +28,6 @@ protected AbstractPeerTask(final EthContext ethContext, final MetricsSystem metr this.ethContext = ethContext; } - @Override - protected void executeTask() { - final EthPeer peer; - if (assignedPeer.isPresent()) { - peer = assignedPeer.get(); - } else { - // Try to find a peer - final Optional maybePeer = findSuitablePeer(); - if (!maybePeer.isPresent()) { - result.get().completeExceptionally(new NoAvailablePeersException()); - return; - } - peer = maybePeer.get(); - } - - try { - executeTaskWithPeer(peer); - } catch (final PeerNotConnected e) { - result.get().completeExceptionally(new PeerDisconnectedException(peer)); - } - } - - protected abstract Optional findSuitablePeer(); - - protected abstract void executeTaskWithPeer(EthPeer peer) throws PeerNotConnected; - public AbstractPeerTask assignPeer(final EthPeer peer) { assignedPeer = Optional.of(peer); return this; diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetBlockFromPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetBlockFromPeerTask.java index a34066ceac..09822853bc 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetBlockFromPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetBlockFromPeerTask.java @@ -21,11 +21,9 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.IncompleteResultsException; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; -import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; import tech.pegasys.pantheon.metrics.MetricsSystem; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.logging.log4j.LogManager; @@ -63,37 +61,40 @@ public static GetBlockFromPeerTask create( } @Override - protected Optional findSuitablePeer() { - return ethContext.getEthPeers().idlePeer(blockNumber); - } - - @Override - protected void executeTaskWithPeer(final EthPeer peer) throws PeerNotConnected { - LOG.debug("Downloading block {} from peer {}.", hash, peer); - downloadHeader(peer) + protected void executeTask() { + LOG.debug( + "Downloading block {} from peer {}.", + hash, + assignedPeer.map(EthPeer::toString).orElse("")); + downloadHeader() .thenCompose(this::completeBlock) .whenComplete( (r, t) -> { if (t != null) { - LOG.info("Failed to download block {} from peer {}.", hash, peer); + LOG.info( + "Failed to download block {} from peer {}.", + hash, + assignedPeer.map(EthPeer::toString).orElse("")); result.get().completeExceptionally(t); } else if (r.getResult().isEmpty()) { - LOG.info("Failed to download block {} from peer {}.", hash, peer); + LOG.info("Failed to download block {} from peer {}.", hash, r.getPeer()); result.get().completeExceptionally(new IncompleteResultsException()); } else { - LOG.debug("Successfully downloaded block {} from peer {}.", hash, peer); + LOG.debug("Successfully downloaded block {} from peer {}.", hash, r.getPeer()); result.get().complete(new PeerTaskResult<>(r.getPeer(), r.getResult().get(0))); } }); } - private CompletableFuture>> downloadHeader(final EthPeer peer) { + private CompletableFuture>> downloadHeader() { return executeSubTask( - () -> - GetHeadersFromPeerByHashTask.forSingleHash( - protocolSchedule, ethContext, hash, metricsSystem) - .assignPeer(peer) - .run()); + () -> { + final AbstractGetHeadersFromPeerTask task = + GetHeadersFromPeerByHashTask.forSingleHash( + protocolSchedule, ethContext, hash, blockNumber, metricsSystem); + assignedPeer.ifPresent(task::assignPeer); + return task.run(); + }); } private CompletableFuture>> completeBlock( diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetBodiesFromPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetBodiesFromPeerTask.java index ed10df2609..505db56ec3 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetBodiesFromPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetBodiesFromPeerTask.java @@ -21,13 +21,12 @@ import tech.pegasys.pantheon.ethereum.core.Transaction; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; -import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream; +import tech.pegasys.pantheon.ethereum.eth.manager.PendingPeerRequest; import tech.pegasys.pantheon.ethereum.eth.messages.BlockBodiesMessage; import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62; import tech.pegasys.pantheon.ethereum.mainnet.BodyValidation; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; -import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.util.bytes.Bytes32; @@ -82,11 +81,17 @@ public static GetBodiesFromPeerTask forHeaders( } @Override - protected ResponseStream sendRequest(final EthPeer peer) throws PeerNotConnected { + protected PendingPeerRequest sendRequest() { final List blockHashes = headers.stream().map(BlockHeader::getHash).collect(Collectors.toList()); - LOG.debug("Requesting {} bodies from peer {}.", blockHashes.size(), peer); - return peer.getBodies(blockHashes); + final long minimumRequiredBlockNumber = headers.get(headers.size() - 1).getNumber(); + + return sendRequestToPeer( + peer -> { + LOG.debug("Requesting {} bodies from peer {}.", blockHashes.size(), peer); + return peer.getBodies(blockHashes); + }, + minimumRequiredBlockNumber); } @Override @@ -123,11 +128,6 @@ protected Optional> processResponse( return Optional.of(blocks); } - @Override - protected Optional findSuitablePeer() { - return this.ethContext.getEthPeers().idlePeer(headers.get(headers.size() - 1).getNumber()); - } - private static class BodyIdentifier { private final Bytes32 transactionsRoot; private final Bytes32 ommersHash; diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetHeadersFromPeerByHashTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetHeadersFromPeerByHashTask.java index 3c48e6a9b4..34b1a31fd9 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetHeadersFromPeerByHashTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetHeadersFromPeerByHashTask.java @@ -17,10 +17,8 @@ import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; -import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; -import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream; +import tech.pegasys.pantheon.ethereum.eth.manager.PendingPeerRequest; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; -import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; import tech.pegasys.pantheon.metrics.MetricsSystem; import com.google.common.annotations.VisibleForTesting; @@ -32,6 +30,7 @@ public class GetHeadersFromPeerByHashTask extends AbstractGetHeadersFromPeerTask private static final Logger LOG = LogManager.getLogger(); private final Hash referenceHash; + private final long minimumRequiredBlockNumber; @VisibleForTesting GetHeadersFromPeerByHashTask( @@ -43,14 +42,8 @@ public class GetHeadersFromPeerByHashTask extends AbstractGetHeadersFromPeerTask final int skip, final boolean reverse, final MetricsSystem metricsSystem) { - super( - protocolSchedule, - ethContext, - minimumRequiredBlockNumber, - count, - skip, - reverse, - metricsSystem); + super(protocolSchedule, ethContext, count, skip, reverse, metricsSystem); + this.minimumRequiredBlockNumber = minimumRequiredBlockNumber; checkNotNull(referenceHash); this.referenceHash = referenceHash; } @@ -114,15 +107,20 @@ public static AbstractGetHeadersFromPeerTask forSingleHash( final ProtocolSchedule protocolSchedule, final EthContext ethContext, final Hash hash, + final long minimumRequiredBlockNumber, final MetricsSystem metricsSystem) { return new GetHeadersFromPeerByHashTask( - protocolSchedule, ethContext, hash, 0, 1, 0, false, metricsSystem); + protocolSchedule, ethContext, hash, minimumRequiredBlockNumber, 1, 0, false, metricsSystem); } @Override - protected ResponseStream sendRequest(final EthPeer peer) throws PeerNotConnected { - LOG.debug("Requesting {} headers from peer {}.", count, peer); - return peer.getHeadersByHash(referenceHash, count, skip, reverse); + protected PendingPeerRequest sendRequest() { + return sendRequestToPeer( + peer -> { + LOG.debug("Requesting {} headers from peer {}.", count, peer); + return peer.getHeadersByHash(referenceHash, count, skip, reverse); + }, + minimumRequiredBlockNumber); } @Override diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetHeadersFromPeerByNumberTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetHeadersFromPeerByNumberTask.java index 1d4f5b4df3..c936c17354 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetHeadersFromPeerByNumberTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetHeadersFromPeerByNumberTask.java @@ -14,10 +14,8 @@ import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; -import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; -import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream; +import tech.pegasys.pantheon.ethereum.eth.manager.PendingPeerRequest; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; -import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; import tech.pegasys.pantheon.metrics.MetricsSystem; import com.google.common.annotations.VisibleForTesting; @@ -39,7 +37,7 @@ public class GetHeadersFromPeerByNumberTask extends AbstractGetHeadersFromPeerTa final int skip, final boolean reverse, final MetricsSystem metricsSystem) { - super(protocolSchedule, ethContext, blockNumber, count, skip, reverse, metricsSystem); + super(protocolSchedule, ethContext, count, skip, reverse, metricsSystem); this.blockNumber = blockNumber; } @@ -74,9 +72,13 @@ public static AbstractGetHeadersFromPeerTask forSingleNumber( } @Override - protected ResponseStream sendRequest(final EthPeer peer) throws PeerNotConnected { - LOG.debug("Requesting {} headers from peer {}.", count, peer); - return peer.getHeadersByNumber(blockNumber, count, skip, reverse); + protected PendingPeerRequest sendRequest() { + return sendRequestToPeer( + peer -> { + LOG.debug("Requesting {} headers from peer {}.", count, peer); + return peer.getHeadersByNumber(blockNumber, count, skip, reverse); + }, + blockNumber); } @Override diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetNodeDataFromPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetNodeDataFromPeerTask.java index 41ed67581b..8c4a7aff8a 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetNodeDataFromPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetNodeDataFromPeerTask.java @@ -17,11 +17,10 @@ import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; -import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream; +import tech.pegasys.pantheon.ethereum.eth.manager.PendingPeerRequest; import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63; import tech.pegasys.pantheon.ethereum.eth.messages.NodeDataMessage; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; -import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.util.bytes.BytesValue; @@ -62,9 +61,13 @@ public static GetNodeDataFromPeerTask forHashes( } @Override - protected ResponseStream sendRequest(final EthPeer peer) throws PeerNotConnected { - LOG.debug("Requesting {} node data entries from peer {}.", hashes.size(), peer); - return peer.getNodeData(hashes); + protected PendingPeerRequest sendRequest() { + return sendRequestToPeer( + peer -> { + LOG.debug("Requesting {} node data entries from peer {}.", hashes.size(), peer); + return peer.getNodeData(hashes); + }, + pivotBlockNumber); } @Override @@ -86,7 +89,7 @@ protected Optional> processResponse( private Optional> mapNodeDataByHash(final List nodeData) { final Map nodeDataByHash = new HashMap<>(); - for (BytesValue data : nodeData) { + for (final BytesValue data : nodeData) { final Hash hash = Hash.hash(data); if (!hashes.contains(hash)) { return Optional.empty(); @@ -95,9 +98,4 @@ private Optional> mapNodeDataByHash(final List } return Optional.of(nodeDataByHash); } - - @Override - protected Optional findSuitablePeer() { - return ethContext.getEthPeers().idlePeer(pivotBlockNumber); - } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetReceiptsFromPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetReceiptsFromPeerTask.java index 76165df0bf..ffffe474dd 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetReceiptsFromPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetReceiptsFromPeerTask.java @@ -21,11 +21,10 @@ import tech.pegasys.pantheon.ethereum.core.TransactionReceipt; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; -import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream; +import tech.pegasys.pantheon.ethereum.eth.manager.PendingPeerRequest; import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63; import tech.pegasys.pantheon.ethereum.eth.messages.ReceiptsMessage; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; -import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; import tech.pegasys.pantheon.metrics.MetricsSystem; import java.util.ArrayList; @@ -67,15 +66,25 @@ public static GetReceiptsFromPeerTask forHeaders( } @Override - protected ResponseStream sendRequest(final EthPeer peer) throws PeerNotConnected { - LOG.debug("Requesting {} receipts from peer {}.", blockHeaders.size(), peer); + protected PendingPeerRequest sendRequest() { + final long maximumRequiredBlockNumber = + blockHeaders.stream() + .mapToLong(BlockHeader::getNumber) + .max() + .orElse(BlockHeader.GENESIS_BLOCK_NUMBER); + // Since we have to match up the data by receipt root, we only need to request receipts // for one of the headers with each unique receipt root. final List blockHashes = headersByReceiptsRoot.values().stream() .map(headers -> headers.get(0).getHash()) .collect(toList()); - return peer.getReceipts(blockHashes); + return sendRequestToPeer( + peer -> { + LOG.debug("Requesting {} receipts from peer {}.", blockHeaders.size(), peer); + return peer.getReceipts(blockHashes); + }, + maximumRequiredBlockNumber); } @Override @@ -108,14 +117,4 @@ protected Optional>> processResponse( } return Optional.of(receiptsByHeader); } - - @Override - protected Optional findSuitablePeer() { - final long maximumRequiredBlockNumber = - blockHeaders.stream() - .mapToLong(BlockHeader::getNumber) - .max() - .orElse(BlockHeader.GENESIS_BLOCK_NUMBER); - return this.ethContext.getEthPeers().idlePeer(maximumRequiredBlockNumber); - } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/WaitForPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/WaitForPeerTask.java index ee1c9770ec..b2ddd8b723 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/WaitForPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/WaitForPeerTask.java @@ -40,9 +40,7 @@ public static WaitForPeerTask create( protected void executeTask() { final EthPeers ethPeers = ethContext.getEthPeers(); LOG.debug( - "Waiting for new peer connection. {} peers currently connected, {} idle.", - ethPeers.peerCount(), - ethPeers.idlePeer().isPresent() ? "Some peers" : "No peers"); + "Waiting for new peer connection. {} peers currently connected.", ethPeers.peerCount()); // Listen for peer connections and complete task when we hit our target peerListenerId = ethPeers.subscribeConnect( diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainHeadTracker.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainHeadTracker.java index 843056878a..70fa790fbc 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainHeadTracker.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainHeadTracker.java @@ -70,6 +70,7 @@ public void onPeerConnected(final EthPeer peer) { protocolSchedule, ethContext, Hash.wrap(peer.chainState().getBestBlock().getHash()), + 0, metricsSystem) .assignPeer(peer) .run() diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthContextTestUtil.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthContextTestUtil.java index 26b34615ca..6c9ff9fde0 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthContextTestUtil.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthContextTestUtil.java @@ -13,6 +13,8 @@ package tech.pegasys.pantheon.ethereum.eth.manager; import tech.pegasys.pantheon.ethereum.eth.manager.DeterministicEthScheduler.TimeoutPolicy; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; +import tech.pegasys.pantheon.testutil.TestClock; public class EthContextTestUtil { @@ -20,7 +22,7 @@ public class EthContextTestUtil { public static EthContext createTestEthContext(final TimeoutPolicy timeoutPolicy) { return new EthContext( - new EthPeers(PROTOCOL_NAME), + new EthPeers(PROTOCOL_NAME, TestClock.fixed(), new NoOpMetricsSystem()), new EthMessages(), new DeterministicEthScheduler(timeoutPolicy)); } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeerTest.java index 0792bd8759..87349ee747 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeerTest.java @@ -13,6 +13,7 @@ package tech.pegasys.pantheon.ethereum.eth.manager; import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThat; @@ -28,6 +29,7 @@ import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; +import tech.pegasys.pantheon.testutil.TestClock; import java.util.HashSet; import java.util.Set; @@ -38,6 +40,7 @@ public class EthPeerTest { private static final BlockDataGenerator gen = new BlockDataGenerator(); + private final TestClock clock = new TestClock(); @Test public void getHeadersStream() throws PeerNotConnected { @@ -81,6 +84,63 @@ public void getNodeDataStream() throws PeerNotConnected { messageStream(getStream, targetMessage, otherMessage); } + @Test + public void shouldHaveAvailableCapacityUntilOutstandingRequestLimitIsReached() + throws PeerNotConnected { + final EthPeer peer = createPeer(); + assertThat(peer.hasAvailableRequestCapacity()).isTrue(); + assertThat(peer.outstandingRequests()).isEqualTo(0); + + peer.getBodies(asList(gen.hash(), gen.hash())); + assertThat(peer.hasAvailableRequestCapacity()).isTrue(); + assertThat(peer.outstandingRequests()).isEqualTo(1); + + peer.getReceipts(asList(gen.hash(), gen.hash())); + assertThat(peer.hasAvailableRequestCapacity()).isTrue(); + assertThat(peer.outstandingRequests()).isEqualTo(2); + + peer.getNodeData(asList(gen.hash(), gen.hash())); + assertThat(peer.hasAvailableRequestCapacity()).isTrue(); + assertThat(peer.outstandingRequests()).isEqualTo(3); + + peer.getHeadersByHash(gen.hash(), 4, 1, false); + assertThat(peer.hasAvailableRequestCapacity()).isTrue(); + assertThat(peer.outstandingRequests()).isEqualTo(4); + + peer.getHeadersByNumber(1, 1, 1, false); + assertThat(peer.hasAvailableRequestCapacity()).isFalse(); + assertThat(peer.outstandingRequests()).isEqualTo(5); + + peer.dispatch(new EthMessage(peer, BlockBodiesMessage.create(emptyList()))); + assertThat(peer.hasAvailableRequestCapacity()).isTrue(); + assertThat(peer.outstandingRequests()).isEqualTo(4); + } + + @Test + public void shouldTrackLastRequestTime() throws PeerNotConnected { + final EthPeer peer = createPeer(); + + clock.stepMillis(10_000); + peer.getBodies(asList(gen.hash(), gen.hash())); + assertThat(peer.getLastRequestTimestamp()).isEqualTo(clock.millis()); + + clock.stepMillis(10_000); + peer.getReceipts(asList(gen.hash(), gen.hash())); + assertThat(peer.getLastRequestTimestamp()).isEqualTo(clock.millis()); + + clock.stepMillis(10_000); + peer.getNodeData(asList(gen.hash(), gen.hash())); + assertThat(peer.getLastRequestTimestamp()).isEqualTo(clock.millis()); + + clock.stepMillis(10_000); + peer.getHeadersByHash(gen.hash(), 4, 1, false); + assertThat(peer.getLastRequestTimestamp()).isEqualTo(clock.millis()); + + clock.stepMillis(10_000); + peer.getHeadersByNumber(1, 1, 1, false); + assertThat(peer.getLastRequestTimestamp()).isEqualTo(clock.millis()); + } + @Test public void closeStreamsOnPeerDisconnect() throws PeerNotConnected { final EthPeer peer = createPeer(); @@ -283,7 +343,7 @@ private EthPeer createPeer() { final Set caps = new HashSet<>(singletonList(EthProtocol.ETH63)); final PeerConnection peerConnection = new MockPeerConnection(caps); final Consumer onPeerReady = (peer) -> {}; - return new EthPeer(peerConnection, EthProtocol.NAME, onPeerReady); + return new EthPeer(peerConnection, EthProtocol.NAME, onPeerReady, clock); } @FunctionalInterface diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeersTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeersTest.java index 9e889da142..f1139dcf80 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeersTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeersTest.java @@ -12,20 +12,44 @@ */ package tech.pegasys.pantheon.ethereum.eth.manager; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; +import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream; +import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.NoAvailablePeersException; +import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerDisconnectedException; +import tech.pegasys.pantheon.ethereum.eth.messages.NodeDataMessage; +import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; +import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.util.uint.UInt256; +import java.util.Optional; +import java.util.concurrent.CancellationException; +import java.util.function.Consumer; + import org.junit.Before; import org.junit.Test; public class EthPeersTest { private EthProtocolManager ethProtocolManager; + private EthPeers ethPeers; + private final PeerRequest peerRequest = mock(PeerRequest.class); + private final ResponseStream responseStream = mock(ResponseStream.class); @Before - public void setup() { + public void setup() throws Exception { + when(peerRequest.sendRequest(any())).thenReturn(responseStream); ethProtocolManager = EthProtocolManagerTestUtil.create(); + ethPeers = ethProtocolManager.ethContext().getEthPeers(); } @Test @@ -64,4 +88,186 @@ public void comparesPeersWithTdAndNoHeight() { assertThat(EthPeers.BEST_CHAIN.compare(peerA, peerA)).isEqualTo(0); assertThat(EthPeers.BEST_CHAIN.compare(peerB, peerB)).isEqualTo(0); } + + @Test + public void shouldExecutePeerRequestImmediatelyWhenPeerIsAvailable() throws Exception { + final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + final PendingPeerRequest pendingRequest = + ethPeers.executePeerRequest(peerRequest, 10, Optional.empty()); + + verify(peerRequest).sendRequest(peer.getEthPeer()); + assertRequestSuccessful(pendingRequest); + } + + @Test + public void shouldUseLeastBusyPeerForRequest() throws Exception { + final RespondingEthPeer idlePeer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + final RespondingEthPeer workingPeer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + useRequestSlot(workingPeer.getEthPeer()); + + final PendingPeerRequest pendingRequest = + ethPeers.executePeerRequest(peerRequest, 10, Optional.empty()); + + verify(peerRequest).sendRequest(idlePeer.getEthPeer()); + assertRequestSuccessful(pendingRequest); + } + + @Test + public void shouldUseLeastRecentlyUsedPeerWhenBothHaveSameNumberOfOutstandingRequests() + throws Exception { + final RespondingEthPeer mostRecentlyUsedPeer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + final RespondingEthPeer leastRecentlyUsedPeer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + useRequestSlot(mostRecentlyUsedPeer.getEthPeer()); + freeUpCapacity(mostRecentlyUsedPeer.getEthPeer()); + + assertThat(leastRecentlyUsedPeer.getEthPeer().outstandingRequests()) + .isEqualTo(mostRecentlyUsedPeer.getEthPeer().outstandingRequests()); + + final PendingPeerRequest pendingRequest = + ethPeers.executePeerRequest(peerRequest, 10, Optional.empty()); + + verify(peerRequest).sendRequest(leastRecentlyUsedPeer.getEthPeer()); + assertRequestSuccessful(pendingRequest); + } + + @Test + public void shouldFailWithNoAvailablePeersWhenNoPeersConnected() { + final PendingPeerRequest pendingRequest = + ethPeers.executePeerRequest(peerRequest, 10, Optional.empty()); + + verifyZeroInteractions(peerRequest); + assertRequestFailure(pendingRequest, NoAvailablePeersException.class); + } + + @Test + public void shouldFailWhenNoPeerWithSufficientHeight() { + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 100); + final PendingPeerRequest pendingRequest = + ethPeers.executePeerRequest(peerRequest, 200, Optional.empty()); + + verifyZeroInteractions(peerRequest); + assertRequestFailure(pendingRequest, NoAvailablePeersException.class); + } + + @Test + public void shouldFailWhenAllPeersWithSufficientHeightHaveDisconnected() throws Exception { + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 100); + final RespondingEthPeer suitablePeer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + useAllAvailableCapacity(suitablePeer.getEthPeer()); + + final PendingPeerRequest pendingRequest = + ethPeers.executePeerRequest(peerRequest, 200, Optional.empty()); + + verifyZeroInteractions(peerRequest); + assertNotDone(pendingRequest); + + suitablePeer.disconnect(DisconnectReason.TOO_MANY_PEERS); + assertRequestFailure(pendingRequest, NoAvailablePeersException.class); + } + + @Test + public void shouldFailWithPeerNotConnectedIfPeerRequestThrows() throws Exception { + final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + when(peerRequest.sendRequest(peer.getEthPeer())).thenThrow(new PeerNotConnected("Oh dear")); + final PendingPeerRequest pendingRequest = + ethPeers.executePeerRequest(peerRequest, 100, Optional.empty()); + + assertRequestFailure(pendingRequest, PeerDisconnectedException.class); + } + + @Test + public void shouldDelayExecutionUntilPeerHasCapacity() throws Exception { + final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + useAllAvailableCapacity(peer.getEthPeer()); + + final PendingPeerRequest pendingRequest = + ethPeers.executePeerRequest(peerRequest, 100, Optional.empty()); + verifyZeroInteractions(peerRequest); + + freeUpCapacity(peer.getEthPeer()); + + verify(peerRequest).sendRequest(peer.getEthPeer()); + assertRequestSuccessful(pendingRequest); + } + + @Test + public void shouldDelayExecutionUntilPeerWithSufficientHeightHasCapacity() throws Exception { + // Create a peer that has available capacity but not the required height + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 10); + + final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + useAllAvailableCapacity(peer.getEthPeer()); + + final PendingPeerRequest pendingRequest = + ethPeers.executePeerRequest(peerRequest, 100, Optional.empty()); + verifyZeroInteractions(peerRequest); + + freeUpCapacity(peer.getEthPeer()); + + verify(peerRequest).sendRequest(peer.getEthPeer()); + assertRequestSuccessful(pendingRequest); + } + + @Test + public void shouldNotExecuteAbortedRequest() throws Exception { + final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + useAllAvailableCapacity(peer.getEthPeer()); + + final PendingPeerRequest pendingRequest = + ethPeers.executePeerRequest(peerRequest, 100, Optional.empty()); + verifyZeroInteractions(peerRequest); + + pendingRequest.abort(); + + freeUpCapacity(peer.getEthPeer()); + + verifyZeroInteractions(peerRequest); + assertRequestFailure(pendingRequest, CancellationException.class); + } + + private void freeUpCapacity(final EthPeer ethPeer) { + ethPeers.dispatchMessage(ethPeer, new EthMessage(ethPeer, NodeDataMessage.create(emptyList()))); + } + + private void useAllAvailableCapacity(final EthPeer peer) throws PeerNotConnected { + while (peer.hasAvailableRequestCapacity()) { + useRequestSlot(peer); + } + assertThat(peer.hasAvailableRequestCapacity()).isFalse(); + } + + private void useRequestSlot(final EthPeer peer) throws PeerNotConnected { + peer.getNodeData(singletonList(Hash.ZERO)); + } + + @SuppressWarnings("unchecked") + private void assertRequestSuccessful(final PendingPeerRequest pendingRequest) { + final Consumer onSuccess = mock(Consumer.class); + pendingRequest.then(onSuccess, error -> fail("Request should have executed", error)); + verify(onSuccess).accept(any()); + } + + @SuppressWarnings("unchecked") + private void assertRequestFailure( + final PendingPeerRequest pendingRequest, final Class reason) { + final Consumer errorHandler = mock(Consumer.class); + pendingRequest.then(responseStream -> fail("Should not have performed request"), errorHandler); + + verify(errorHandler).accept(any(reason)); + } + + @SuppressWarnings("unchecked") + private void assertNotDone(final PendingPeerRequest pendingRequest) { + final Consumer onSuccess = mock(Consumer.class); + final Consumer onError = mock(Consumer.class); + pendingRequest.then(onSuccess, onError); + + verifyZeroInteractions(onSuccess); + verifyZeroInteractions(onError); + } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java index 1b7fc7aa71..b020d9d345 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java @@ -118,6 +118,7 @@ public void disconnectOnUnsolicitedMessage() { 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), EthereumWireProtocolConfiguration.defaultConfig())) { final MessageData messageData = @@ -139,6 +140,7 @@ public void disconnectOnFailureToSendStatusMessage() { 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), EthereumWireProtocolConfiguration.defaultConfig())) { final MessageData messageData = @@ -161,6 +163,7 @@ public void disconnectOnWrongChainId() { 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), EthereumWireProtocolConfiguration.defaultConfig())) { final MessageData messageData = @@ -194,6 +197,7 @@ public void disconnectOnWrongGenesisHash() { 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), EthereumWireProtocolConfiguration.defaultConfig())) { final MessageData messageData = @@ -227,6 +231,7 @@ public void doNotDisconnectOnValidMessage() { 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), EthereumWireProtocolConfiguration.defaultConfig())) { final MessageData messageData = @@ -252,6 +257,7 @@ public void respondToGetHeaders() throws ExecutionException, InterruptedExceptio 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), EthereumWireProtocolConfiguration.defaultConfig())) { final long startBlock = 5L; @@ -293,6 +299,7 @@ public void respondToGetHeadersWithinLimits() throws ExecutionException, Interru 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), new EthereumWireProtocolConfiguration(limit, limit, limit, limit))) { final long startBlock = 5L; @@ -333,6 +340,7 @@ public void respondToGetHeadersReversed() throws ExecutionException, Interrupted 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), EthereumWireProtocolConfiguration.defaultConfig())) { final long endBlock = 10L; @@ -372,6 +380,7 @@ public void respondToGetHeadersWithSkip() throws ExecutionException, Interrupted 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), EthereumWireProtocolConfiguration.defaultConfig())) { final long startBlock = 5L; @@ -414,6 +423,7 @@ public void respondToGetHeadersReversedWithSkip() 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), EthereumWireProtocolConfiguration.defaultConfig())) { final long endBlock = 10L; @@ -477,6 +487,7 @@ public void respondToGetHeadersPartial() throws ExecutionException, InterruptedE 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), EthereumWireProtocolConfiguration.defaultConfig())) { final long startBlock = blockchain.getChainHeadBlockNumber() - 1L; @@ -517,6 +528,7 @@ public void respondToGetHeadersEmpty() throws ExecutionException, InterruptedExc 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), EthereumWireProtocolConfiguration.defaultConfig())) { final long startBlock = blockchain.getChainHeadBlockNumber() + 1; @@ -554,6 +566,7 @@ public void respondToGetBodies() throws ExecutionException, InterruptedException 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), EthereumWireProtocolConfiguration.defaultConfig())) { // Setup blocks query @@ -607,6 +620,7 @@ public void respondToGetBodiesWithinLimits() throws ExecutionException, Interrup 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), new EthereumWireProtocolConfiguration(limit, limit, limit, limit))) { // Setup blocks query @@ -659,6 +673,7 @@ public void respondToGetBodiesPartial() throws ExecutionException, InterruptedEx 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), EthereumWireProtocolConfiguration.defaultConfig())) { // Setup blocks query @@ -705,6 +720,7 @@ public void respondToGetReceipts() throws ExecutionException, InterruptedExcepti 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), EthereumWireProtocolConfiguration.defaultConfig())) { // Setup blocks query @@ -757,6 +773,7 @@ public void respondToGetReceiptsWithinLimits() throws ExecutionException, Interr 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), new EthereumWireProtocolConfiguration(limit, limit, limit, limit))) { // Setup blocks query @@ -808,6 +825,7 @@ public void respondToGetReceiptsPartial() throws ExecutionException, Interrupted 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), EthereumWireProtocolConfiguration.defaultConfig())) { // Setup blocks query @@ -856,6 +874,7 @@ public void respondToGetNodeData() throws Exception { 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), EthereumWireProtocolConfiguration.defaultConfig())) { // Setup node data query @@ -907,6 +926,7 @@ public void newBlockMinedSendsNewBlockMessageToAllPeers() { 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), EthereumWireProtocolConfiguration.defaultConfig()); @@ -979,6 +999,7 @@ public void shouldSuccessfullyRespondToGetHeadersRequestLessThanZero() 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), EthereumWireProtocolConfiguration.defaultConfig())) { final long startBlock = 1L; @@ -1045,7 +1066,9 @@ public void transactionMessagesGoToTheCorrectExecutor() { 1, true, ethScheduler, - EthereumWireProtocolConfiguration.defaultConfig())) { + EthereumWireProtocolConfiguration.defaultConfig(), + TestClock.fixed(), + metricsSystem)) { // Create a transaction pool. This has a side effect of registring a listener for the // transactions message. diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java index 346dab6145..fd419fe95d 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java @@ -28,6 +28,8 @@ import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.DefaultMessage; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; +import tech.pegasys.pantheon.testutil.TestClock; import tech.pegasys.pantheon.util.uint.UInt256; public class EthProtocolManagerTestUtil { @@ -50,7 +52,9 @@ public static EthProtocolManager create( networkId, false, ethScheduler, - EthereumWireProtocolConfiguration.defaultConfig()); + EthereumWireProtocolConfiguration.defaultConfig(), + TestClock.fixed(), + new NoOpMetricsSystem()); } public static EthProtocolManager create( diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManagerTest.java index 934c230ee7..57efcae3ab 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManagerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManagerTest.java @@ -22,6 +22,7 @@ import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; +import tech.pegasys.pantheon.testutil.TestClock; import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.ArrayList; @@ -219,6 +220,6 @@ private EthPeer createPeer() { final Set caps = new HashSet<>(Collections.singletonList(EthProtocol.ETH63)); final PeerConnection peerConnection = new MockPeerConnection(caps); final Consumer onPeerReady = (peer) -> {}; - return new EthPeer(peerConnection, EthProtocol.NAME, onPeerReady); + return new EthPeer(peerConnection, EthProtocol.NAME, onPeerReady, TestClock.fixed()); } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java index 9f501fd787..f12f465ca8 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java @@ -48,6 +48,7 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec; import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; +import tech.pegasys.pantheon.testutil.TestClock; import tech.pegasys.pantheon.util.uint.UInt256; import java.util.Collections; @@ -580,7 +581,8 @@ public void shouldNotImportBlocksThatAreAlreadyBeingImported() { when(ethScheduler.scheduleSyncWorkerTask(any(Supplier.class))) .thenReturn(new CompletableFuture<>()); final EthContext ethContext = - new EthContext(new EthPeers("eth"), new EthMessages(), ethScheduler); + new EthContext( + new EthPeers("eth", TestClock.fixed(), metricsSystem), new EthMessages(), ethScheduler); final BlockPropagationManager blockPropagationManager = new BlockPropagationManager<>( syncConfig, diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java index 8c85ca80a4..b4949c63ba 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java @@ -112,6 +112,7 @@ public TestNode( 1, 1, 1, + TestClock.fixed(), new NoOpMetricsSystem(), EthereumWireProtocolConfiguration.defaultConfig()); diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonControllerBuilder.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonControllerBuilder.java index ded1abfb9c..0fd426c913 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonControllerBuilder.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonControllerBuilder.java @@ -92,6 +92,7 @@ protected EthProtocolManager createEthProtocolManager( syncConfig.downloaderParallelism(), syncConfig.transactionsParallelism(), syncConfig.computationParallelism(), + clock, metricsSystem, ethereumWireProtocolConfiguration); } diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonControllerBuilder.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonControllerBuilder.java index 6dc3366710..06e9ed5b29 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonControllerBuilder.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonControllerBuilder.java @@ -310,6 +310,7 @@ protected EthProtocolManager createEthProtocolManager( syncConfig.downloaderParallelism(), syncConfig.transactionsParallelism(), syncConfig.computationParallelism(), + clock, metricsSystem, ethereumWireProtocolConfiguration); }