diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java index be59863aa1..e7786c3140 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -160,67 +160,75 @@ public void cancel() { } private void requestNodeData(final BlockHeader header) { - if (sendingRequests.compareAndSet(false, true)) { - while (shouldRequestNodeData()) { + while (shouldRequestNodeData()) { + if (sendingRequests.compareAndSet(false, true)) { final Optional maybePeer = ethContext.getEthPeers().idlePeer(header.getNumber()); - if (!maybePeer.isPresent()) { // If no peer is available, wait and try again + sendingRequests.set(false); waitForNewPeer().whenComplete((r, t) -> requestNodeData(header)); break; } else { - final EthPeer peer = maybePeer.get(); + requestDataFromPeer(header, maybePeer.get()); + } + sendingRequests.set(false); + } else { + break; + } + } + } - // Collect data to be requested - final List> toRequest = new ArrayList<>(); - while (toRequest.size() < hashCountPerRequest) { - final Task pendingRequestTask = pendingRequests.dequeue(); - if (pendingRequestTask == null) { - break; - } - final NodeDataRequest pendingRequest = pendingRequestTask.getData(); - final Optional existingData = - pendingRequest.getExistingData(worldStateStorage); - if (existingData.isPresent()) { - pendingRequest.setData(existingData.get()); - queueChildRequests(pendingRequest); - completedRequestsCounter.inc(); - pendingRequestTask.markCompleted(); - continue; - } - toRequest.add(pendingRequestTask); - } + private void requestDataFromPeer(final BlockHeader header, final EthPeer peer) { + // Collect data to be requested + final List> toRequest = getTasksForNextRequest(); - // Request and process node data - sendAndProcessRequests(peer, toRequest, header) - .whenComplete( - (task, error) -> { - final boolean done; - synchronized (this) { - outstandingRequests.remove(task); - done = - status == Status.RUNNING - && outstandingRequests.size() == 0 - && pendingRequests.allTasksCompleted(); - } - if (done) { - // We're done - final Updater updater = worldStateStorage.updater(); - updater.putAccountStateTrieNode(header.getStateRoot(), rootNode); - updater.commit(); - markDone(); - } else { - // Send out additional requests - requestNodeData(header); - } - }); - } + // Request and process node data + sendAndProcessRequests(peer, toRequest, header) + .whenComplete( + (task, error) -> { + final boolean done; + synchronized (this) { + outstandingRequests.remove(task); + done = + status == Status.RUNNING + && outstandingRequests.size() == 0 + && pendingRequests.allTasksCompleted(); + } + if (done) { + // We're done + final Updater updater = worldStateStorage.updater(); + updater.putAccountStateTrieNode(header.getStateRoot(), rootNode); + updater.commit(); + markDone(); + } else { + // Send out additional requests + requestNodeData(header); + } + }); + } + + private List> getTasksForNextRequest() { + final List> toRequest = new ArrayList<>(); + while (toRequest.size() < hashCountPerRequest) { + final Task pendingRequestTask = pendingRequests.dequeue(); + if (pendingRequestTask == null) { + break; + } + final NodeDataRequest pendingRequest = pendingRequestTask.getData(); + final Optional existingData = pendingRequest.getExistingData(worldStateStorage); + if (existingData.isPresent()) { + pendingRequest.setData(existingData.get()); + queueChildRequests(pendingRequest); + completedRequestsCounter.inc(); + pendingRequestTask.markCompleted(); + continue; } - sendingRequests.set(false); + toRequest.add(pendingRequestTask); } + return toRequest; } - private boolean shouldRequestNodeData() { + private synchronized boolean shouldRequestNodeData() { return !future.isDone() && outstandingRequests.size() < maxOutstandingRequests && !pendingRequests.isEmpty();