Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Queue pending requests when all peers are busy #1331

Merged
merged 35 commits into from
May 2, 2019
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
f05fb53
Queue pending requests when all peers are busy.
ajsutton Apr 25, 2019
f42d09f
Merge branch 'master' of github.com:PegaSysEng/pantheon into claim-peers
ajsutton Apr 25, 2019
23c482c
Merge branch 'master' of github.com:PegaSysEng/pantheon into claim-peers
ajsutton Apr 26, 2019
9a2f147
Only start the timer after the request is actually sent so it doesn't…
ajsutton Apr 26, 2019
1fa9053
Encapsulate the future to provide a nicer, more explicit API for work…
ajsutton Apr 26, 2019
c4cca49
Split PendingPeerRequest and PeerRequest into separate files.
ajsutton Apr 26, 2019
d110373
Fix CheckpointHeaderManagerTest.
ajsutton Apr 26, 2019
a0143a9
JsonRpcError decoding to include message (#1336)
CjHare Apr 26, 2019
aefb23c
[PRIV-55] Allow private contract invocations in multiple privacy grou…
iikirilov Apr 26, 2019
a701e8c
removed unstable fast sync note in documentation (#1341)
NicolasMassart Apr 26, 2019
b17c2ea
Cache current chain head info (#1335)
ajsutton Apr 26, 2019
7d69eac
Added net_services (#1306)
MadelineMurray Apr 27, 2019
48a8875
Updated for returning false (#1309)
MadelineMurray Apr 27, 2019
2f2829b
Added sending private transactions initial content (#1301)
MadelineMurray Apr 27, 2019
df1ea79
Upated Docker image to indicate it doesn't run on Windows (#1346)
MadelineMurray Apr 28, 2019
088c589
[PAN-2573] include static nodes in permissioning logic (#1339)
macfarla Apr 28, 2019
46626eb
Add test for hasAvailableRequestCapacity.
ajsutton Apr 28, 2019
abab023
Merge branch 'master' of github.com:PegaSysEng/pantheon into claim-peers
ajsutton Apr 28, 2019
adb03fb
Add more tests.
ajsutton Apr 28, 2019
46edd1b
Prefer least recently used peers to more recently ones when they have…
ajsutton Apr 28, 2019
6842b3f
Merge branch 'master' of github.com:PegaSysEng/pantheon into claim-peers
ajsutton Apr 29, 2019
e60b7be
Merge branch 'master' of github.com:PegaSysEng/pantheon into claim-peers
ajsutton Apr 30, 2019
d97ec38
Merge branch 'master' of github.com:PegaSysEng/pantheon into claim-peers
ajsutton Apr 30, 2019
99c1374
Merge branch 'master' of github.com:PegaSysEng/pantheon into claim-peers
ajsutton May 1, 2019
66cd167
Merge branch 'master' into claim-peers
ajsutton May 1, 2019
76436ff
Removing smart quotes (#1381)
jmcnevin May 2, 2019
77ed3ab
Update Log message in IBFT Controller (#1387)
rain-on May 2, 2019
0ba9122
Fixed typo (#1388)
MadelineMurray May 2, 2019
d0124e4
update python mkdocs requirements versions (#1374)
NicolasMassart May 2, 2019
d8e3a22
Handle case where peers advertise a listening port of 0 (#1391)
mbaxter May 2, 2019
ca960e8
Add explanatory comment about default port (#1392)
mbaxter May 2, 2019
fa78200
Rename thenEither to then. Include a hash in the getNodeData request…
ajsutton May 2, 2019
e89f28c
Merge branch 'master' of github.com:PegaSysEng/pantheon into claim-peers
ajsutton May 2, 2019
6fd5b17
Spotless.
ajsutton May 2, 2019
b410044
Merge branch 'master' into claim-peers
ajsutton May 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
import tech.pegasys.pantheon.metrics.MetricsSystem;

import java.time.Clock;
import java.util.List;

/** This allows for interoperability with Quorum, but shouldn't be used otherwise. */
Expand All @@ -34,6 +35,7 @@ public Istanbul64ProtocolManager(
final int syncWorkers,
final int txWorkers,
final int computationWorkers,
final Clock clock,
final MetricsSystem metricsSystem,
final EthereumWireProtocolConfiguration ethereumWireProtocolConfiguration) {
super(
Expand All @@ -44,6 +46,7 @@ public Istanbul64ProtocolManager(
syncWorkers,
txWorkers,
computationWorkers,
clock,
metricsSystem,
ethereumWireProtocolConfiguration);
}
Expand All @@ -56,6 +59,7 @@ public Istanbul64ProtocolManager(
final int syncWorkers,
final int txWorkers,
final int computationWorkers,
final Clock clock,
final MetricsSystem metricsSystem) {
super(
blockchain,
Expand All @@ -65,6 +69,7 @@ public Istanbul64ProtocolManager(
syncWorkers,
txWorkers,
computationWorkers,
clock,
metricsSystem);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import tech.pegasys.pantheon.util.bytes.BytesValue;
import tech.pegasys.pantheon.util.uint.UInt256;

import java.time.Clock;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -45,16 +46,21 @@

public class EthPeer {
private static final Logger LOG = LogManager.getLogger();

private static final int MAX_OUTSTANDING_REQUESTS = 5;

private final PeerConnection connection;

private final int maxTrackedSeenBlocks = 300;

private final Set<Hash> knownBlocks;
private final String protocolName;
private final Clock clock;
private final ChainState chainHeadState;
private final AtomicBoolean statusHasBeenSentToPeer = new AtomicBoolean(false);
private final AtomicBoolean statusHasBeenReceivedFromPeer = new AtomicBoolean(false);

private volatile long lastRequestTimestamp = 0;
private final RequestManager headersRequestManager = new RequestManager(this);
private final RequestManager bodiesRequestManager = new RequestManager(this);
private final RequestManager receiptsRequestManager = new RequestManager(this);
Expand All @@ -66,9 +72,11 @@ public class EthPeer {
EthPeer(
final PeerConnection connection,
final String protocolName,
final Consumer<EthPeer> onStatusesExchanged) {
final Consumer<EthPeer> onStatusesExchanged,
final Clock clock) {
this.connection = connection;
this.protocolName = protocolName;
this.clock = clock;
knownBlocks =
Collections.newSetFromMap(
Collections.synchronizedMap(
Expand Down Expand Up @@ -111,13 +119,13 @@ public void disconnect(final DisconnectReason reason) {
public ResponseStream send(final MessageData messageData) throws PeerNotConnected {
switch (messageData.getCode()) {
case EthPV62.GET_BLOCK_HEADERS:
return sendHeadersRequest(messageData);
return sendRequest(headersRequestManager, messageData);
case EthPV62.GET_BLOCK_BODIES:
return sendBodiesRequest(messageData);
return sendRequest(bodiesRequestManager, messageData);
case EthPV63.GET_RECEIPTS:
return sendReceiptsRequest(messageData);
return sendRequest(receiptsRequestManager, messageData);
case EthPV63.GET_NODE_DATA:
return sendNodeDataRequest(messageData);
return sendRequest(nodeDataRequestManager, messageData);
default:
connection.sendForProtocol(protocolName, messageData);
return null;
Expand All @@ -129,52 +137,37 @@ public ResponseStream getHeadersByHash(
throws PeerNotConnected {
final GetBlockHeadersMessage message =
GetBlockHeadersMessage.create(hash, maxHeaders, skip, reverse);
return sendHeadersRequest(message);
return sendRequest(headersRequestManager, message);
}

public ResponseStream getHeadersByNumber(
final long blockNumber, final int maxHeaders, final int skip, final boolean reverse)
throws PeerNotConnected {
final GetBlockHeadersMessage message =
GetBlockHeadersMessage.create(blockNumber, maxHeaders, skip, reverse);
return sendHeadersRequest(message);
return sendRequest(headersRequestManager, message);
}

private ResponseStream sendHeadersRequest(final MessageData messageData) throws PeerNotConnected {
return headersRequestManager.dispatchRequest(
private ResponseStream sendRequest(
final RequestManager requestManager, final MessageData messageData) throws PeerNotConnected {
lastRequestTimestamp = clock.millis();
return requestManager.dispatchRequest(
() -> connection.sendForProtocol(protocolName, messageData));
}

public ResponseStream getBodies(final List<Hash> blockHashes) throws PeerNotConnected {
final GetBlockBodiesMessage message = GetBlockBodiesMessage.create(blockHashes);
return sendBodiesRequest(message);
}

private ResponseStream sendBodiesRequest(final MessageData messageData) throws PeerNotConnected {
return bodiesRequestManager.dispatchRequest(
() -> connection.sendForProtocol(protocolName, messageData));
return sendRequest(bodiesRequestManager, message);
}

public ResponseStream getReceipts(final List<Hash> blockHashes) throws PeerNotConnected {
final GetReceiptsMessage message = GetReceiptsMessage.create(blockHashes);
return sendReceiptsRequest(message);
}

private ResponseStream sendReceiptsRequest(final MessageData messageData)
throws PeerNotConnected {
return receiptsRequestManager.dispatchRequest(
() -> connection.sendForProtocol(protocolName, messageData));
return sendRequest(receiptsRequestManager, message);
}

public ResponseStream getNodeData(final Iterable<Hash> nodeHashes) throws PeerNotConnected {
final GetNodeDataMessage message = GetNodeDataMessage.create(nodeHashes);
return sendNodeDataRequest(message);
}

private ResponseStream sendNodeDataRequest(final MessageData messageData)
throws PeerNotConnected {
return nodeDataRequestManager.dispatchRequest(
() -> connection.sendForProtocol(protocolName, messageData));
return sendRequest(nodeDataRequestManager, message);
}

boolean validateReceivedMessage(final EthMessage message) {
Expand Down Expand Up @@ -317,6 +310,14 @@ public int outstandingRequests() {
+ nodeDataRequestManager.outstandingRequests();
}

public long getLastRequestTimestamp() {
return lastRequestTimestamp;
}

public boolean hasAvailableRequestCapacity() {
return outstandingRequests() < MAX_OUTSTANDING_REQUESTS;
}

public BytesValue nodeId() {
return connection.getPeerInfo().getNodeId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@

import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer.DisconnectCallback;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.util.Subscribers;

import java.util.Collections;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -36,20 +39,29 @@ public class EthPeers {
public static final Comparator<EthPeer> BEST_CHAIN = TOTAL_DIFFICULTY.thenComparing(CHAIN_HEIGHT);

public static final Comparator<EthPeer> LEAST_TO_MOST_BUSY =
Comparator.comparing(EthPeer::outstandingRequests);
Comparator.comparing(EthPeer::outstandingRequests)
.thenComparing(EthPeer::getLastRequestTimestamp);

private final int maxOutstandingRequests = 5;
private final Map<PeerConnection, EthPeer> connections = new ConcurrentHashMap<>();
private final String protocolName;
private final Clock clock;
private final Subscribers<ConnectCallback> connectCallbacks = new Subscribers<>();
private final Subscribers<DisconnectCallback> disconnectCallbacks = new Subscribers<>();
private final Collection<PendingPeerRequest> pendingRequests = new ArrayList<>();

public EthPeers(final String protocolName) {
public EthPeers(final String protocolName, final Clock clock, final MetricsSystem metricsSystem) {
this.protocolName = protocolName;
this.clock = clock;
metricsSystem.createIntegerGauge(
MetricCategory.PEERS,
"pending_peer_requests_current",
"Number of peer requests currently pending because peers are busy",
pendingRequests::size);
}

void registerConnection(final PeerConnection peerConnection) {
final EthPeer peer = new EthPeer(peerConnection, protocolName, this::invokeConnectionCallbacks);
final EthPeer peer =
new EthPeer(peerConnection, protocolName, this::invokeConnectionCallbacks, clock);
connections.putIfAbsent(peerConnection, peer);
}

Expand All @@ -59,12 +71,38 @@ void registerDisconnect(final PeerConnection connection) {
disconnectCallbacks.forEach(callback -> callback.onDisconnect(peer));
peer.handleDisconnect();
}
checkPendingConnections();
}

public EthPeer peer(final PeerConnection peerConnection) {
return connections.get(peerConnection);
}

public PendingPeerRequest executePeerRequest(
final PeerRequest request, final long minimumBlockNumber, final Optional<EthPeer> peer) {
final PendingPeerRequest pendingPeerRequest =
new PendingPeerRequest(this, request, minimumBlockNumber, peer);
synchronized (this) {
if (!pendingPeerRequest.attemptExecution()) {
pendingRequests.add(pendingPeerRequest);
}
}
return pendingPeerRequest;
}

public void dispatchMessage(final EthPeer peer, final EthMessage ethMessage) {
peer.dispatch(ethMessage);
if (peer.hasAvailableRequestCapacity()) {
checkPendingConnections();
}
}

private void checkPendingConnections() {
synchronized (this) {
pendingRequests.removeIf(PendingPeerRequest::attemptExecution);
}
}

public long subscribeConnect(final ConnectCallback callback) {
return connectCallbacks.subscribe(callback);
}
Expand All @@ -89,23 +127,6 @@ public Optional<EthPeer> bestPeer() {
return availablePeers().max(BEST_CHAIN);
}

public Optional<EthPeer> idlePeer() {
return idlePeers().min(LEAST_TO_MOST_BUSY);
}

private Stream<EthPeer> idlePeers() {
final List<EthPeer> peers =
availablePeers()
.filter(p -> p.outstandingRequests() < maxOutstandingRequests)
.collect(Collectors.toList());
Collections.shuffle(peers);
return peers.stream();
}

public Optional<EthPeer> idlePeer(final long withBlocksUpTo) {
return idlePeers().filter(p -> p.chainState().getEstimatedHeight() >= withBlocksUpTo).findAny();
}

@FunctionalInterface
public interface ConnectCallback {
void onPeerConnected(EthPeer newPeer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.util.uint.UInt256;

import java.time.Clock;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -71,7 +72,9 @@ public EthProtocolManager(
final int networkId,
final boolean fastSyncEnabled,
final EthScheduler scheduler,
final EthereumWireProtocolConfiguration ethereumWireProtocolConfiguration) {
final EthereumWireProtocolConfiguration ethereumWireProtocolConfiguration,
final Clock clock,
final MetricsSystem metricsSystem) {
this.networkId = networkId;
this.scheduler = scheduler;
this.blockchain = blockchain;
Expand All @@ -80,7 +83,7 @@ public EthProtocolManager(
this.shutdown = new CountDownLatch(1);
genesisHash = blockchain.getBlockHashByNumber(0L).get();

ethPeers = new EthPeers(getSupportedProtocol());
ethPeers = new EthPeers(getSupportedProtocol(), clock, metricsSystem);
ethMessages = new EthMessages();
ethContext = new EthContext(ethPeers, ethMessages, scheduler);

Expand All @@ -98,14 +101,17 @@ public EthProtocolManager(
final int syncWorkers,
final int txWorkers,
final int computationWorkers,
final Clock clock,
final MetricsSystem metricsSystem) {
this(
blockchain,
worldStateArchive,
networkId,
fastSyncEnabled,
new EthScheduler(syncWorkers, txWorkers, computationWorkers, metricsSystem),
EthereumWireProtocolConfiguration.defaultConfig());
EthereumWireProtocolConfiguration.defaultConfig(),
clock,
metricsSystem);
}

public EthProtocolManager(
Expand All @@ -116,6 +122,7 @@ public EthProtocolManager(
final int syncWorkers,
final int txWorkers,
final int computationWorkers,
final Clock clock,
final MetricsSystem metricsSystem,
final EthereumWireProtocolConfiguration ethereumWireProtocolConfiguration) {
this(
Expand All @@ -124,7 +131,9 @@ public EthProtocolManager(
networkId,
fastSyncEnabled,
new EthScheduler(syncWorkers, txWorkers, computationWorkers, metricsSystem),
ethereumWireProtocolConfiguration);
ethereumWireProtocolConfiguration,
clock,
metricsSystem);
}

public EthContext ethContext() {
Expand Down Expand Up @@ -192,7 +201,7 @@ public void processMessage(final Capability cap, final Message message) {
peer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL);
return;
}
peer.dispatch(ethMessage);
ethPeers.dispatchMessage(peer, ethMessage);
ethMessages.dispatch(ethMessage);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.manager;

import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;

public interface PeerRequest {
ResponseStream sendRequest(EthPeer peer) throws PeerNotConnected;
}
Loading