Skip to content

Commit

Permalink
ugly work in progress of parallel block downloader
Browse files Browse the repository at this point in the history
  • Loading branch information
shemnon committed Feb 3, 2019
1 parent 1a823ae commit 3775fe4
Show file tree
Hide file tree
Showing 15 changed files with 400 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.List;
import java.util.Objects;

public class Block {
public class Block implements NumberedBlock {

private final BlockHeader header;
private final BlockBody body;
Expand Down Expand Up @@ -95,4 +95,9 @@ public String toString() {
sb.append("body=").append(body);
return sb.append("}").toString();
}

@Override
public long getBlockNumber() {
return getHeader().getNumber();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package tech.pegasys.pantheon.ethereum.core;

public interface NumberedBlock {

long getBlockNumber();
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class EthScheduler {
protected final ExecutorService syncWorkerExecutor;
protected final ScheduledExecutorService scheduler;
protected final ExecutorService txWorkerExecutor;
private final ExecutorService servicesExecutor;

EthScheduler(final int syncWorkerCount, final int txWorkerCount) {
this(
Expand All @@ -62,16 +63,21 @@ public class EthScheduler {
txWorkerCount,
new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Transactions")
.build()));
.build()),
Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Services")
.build()));
}

protected EthScheduler(
final ExecutorService syncWorkerExecutor,
final ScheduledExecutorService scheduler,
final ExecutorService txWorkerExecutor) {
final ExecutorService txWorkerExecutor,
final ExecutorService servicesExecutor) {
this.syncWorkerExecutor = syncWorkerExecutor;
this.scheduler = scheduler;
this.txWorkerExecutor = txWorkerExecutor;
this.servicesExecutor = servicesExecutor;
}

public <T> CompletableFuture<T> scheduleSyncWorkerTask(
Expand Down Expand Up @@ -109,6 +115,10 @@ public Future<?> scheduleTxWorkerTask(final Runnable command) {
return txWorkerExecutor.submit(command);
}

public CompletableFuture<?> startServiceTask(final Runnable service) {
return CompletableFuture.runAsync(service, servicesExecutor);
}

public CompletableFuture<Void> scheduleFutureTask(
final Runnable command, final Duration duration) {
final CompletableFuture<Void> promise = new CompletableFuture<>();
Expand Down Expand Up @@ -194,7 +204,9 @@ public void stop() {
if (stopped.compareAndSet(false, true)) {
LOG.trace("Stopping " + getClass().getSimpleName());
syncWorkerExecutor.shutdown();
txWorkerExecutor.shutdown();
scheduler.shutdown();
servicesExecutor.shutdown();
shutdown.countDown();
} else {
LOG.trace("Attempted to stop already stopped " + getClass().getSimpleName());
Expand All @@ -208,11 +220,21 @@ public void awaitStop() throws InterruptedException {
syncWorkerExecutor.shutdownNow();
syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES);
}
if (!txWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error("{} transaction worker executor did not shutdown cleanly.", this.getClass().getSimpleName());
txWorkerExecutor.shutdownNow();
txWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES);
}
if (!scheduler.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error("{} scheduler did not shutdown cleanly.", this.getClass().getSimpleName());
scheduler.shutdownNow();
scheduler.awaitTermination(2L, TimeUnit.MINUTES);
}
if (!servicesExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error("{} services executor did not shutdown cleanly.", this.getClass().getSimpleName());
servicesExecutor.shutdownNow();
servicesExecutor.awaitTermination(2L, TimeUnit.MINUTES);
}
LOG.trace("{} stopped.", this.getClass().getSimpleName());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package tech.pegasys.pantheon.ethereum.eth.sync;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;

import java.util.List;
import java.util.concurrent.CompletableFuture;

public interface BlockHandler<B> {
CompletableFuture<List<B>> downloadBlocks(final List<BlockHeader> headers);

CompletableFuture<List<B>> validateAndImportBlocks(final List<B> blocks);
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public CompletableFuture<List<BlockHeader>> pullCheckpointHeaders(final SyncTarg
return getAdditionalCheckpointHeaders(syncTarget, lastHeader)
.thenApply(
additionalCheckpoints -> {
if (additionalCheckpoints.isEmpty()) {
if (additionalCheckpoints == null || additionalCheckpoints.isEmpty()) {

This comment has been minimized.

Copy link
@ajsutton

ajsutton Feb 3, 2019

We shouldn't need this null check (at least not now that PegaSysEng#743 is merged...). If we do there's a bug somewhere else that we probably should fix.

checkpointTimeouts++;
} else {
checkpointTimeouts = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,11 @@ public static class Builder {
private int downloaderHeaderRequestSize = 10;
private int downloaderCheckpointTimeoutsPermitted = 5;
private int downloaderChainSegmentTimeoutsPermitted = 5;
private int downloaderChainSegmentSize = 20;
private int downloaderChainSegmentSize = 2000;
private long trailingPeerBlocksBehindThreshold;
private int maxTrailingPeers = Integer.MAX_VALUE;
private int downloaderParallelism = 2;
private int transactionsParallelism = 2;
private int downloaderParallelism = 12;
private int transactionsParallelism = 12;

public Builder fastSyncPivotDistance(final int distance) {
fastSyncPivotDistance = distance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@

import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.NumberedBlock;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;

import java.util.List;

class BlockWithReceipts {
class BlockWithReceipts implements NumberedBlock {
private final Block block;
private final List<TransactionReceipt> receipts;

Expand All @@ -38,4 +39,9 @@ public Block getBlock() {
public List<TransactionReceipt> getReceipts() {
return receipts;
}

@Override
public long getBlockNumber() {
return getHeader().getNumber();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetReceiptsFromPeerTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask.BlockHandler;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import tech.pegasys.pantheon.ethereum.eth.sync.ChainDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.ParallelImportChainSegmentTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
Expand Down Expand Up @@ -93,8 +94,10 @@ private CompletableFuture<List<Block>> importBlocksForCheckpoints(
return CompletableFuture.completedFuture(emptyList());
}
}
final PipelinedImportChainSegmentTask<C, BlockWithReceipts> importTask =
PipelinedImportChainSegmentTask.forCheckpoints(
// final PipelinedImportChainSegmentTask<C, BlockWithReceipts> importTask =
// PipelinedImportChainSegmentTask.forCheckpoints(
final ParallelImportChainSegmentTask<C, BlockWithReceipts> importTask =
ParallelImportChainSegmentTask.forCheckpoints(
protocolSchedule,
protocolContext,
ethContext,
Expand All @@ -108,6 +111,7 @@ private CompletableFuture<List<Block>> importBlocksForCheckpoints(
.run()
.thenApply(
results ->
results.stream().map(BlockWithReceipts::getBlock).collect(Collectors.toList()));
// results.stream().map(BlockWithReceipts::getBlock).collect(Collectors.toList()));
results.getResult().stream().map(BlockWithReceipts::getBlock).collect(Collectors.toList()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ protected CompletableFuture<Optional<EthPeer>> selectBestAvailableSyncTarget() {
return completedFuture(Optional.empty());
} else {
final EthPeer bestPeer = maybeBestPeer.get();
if (bestPeer.chainState().getEstimatedHeight() < pivotBlockHeader.getNumber()) {
LOG.info("No sync target with sufficient chain height, wait for peers.");
return completedFuture(Optional.empty());
} else {
// if (bestPeer.chainState().getEstimatedHeight() < pivotBlockHeader.getNumber()) {
// LOG.info("No sync target with sufficient chain height, wait for peers.");
// return completedFuture(Optional.empty());
// } else {
return confirmPivotBlockHeader(bestPeer);
}
// }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PersistBlockTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask.BlockHandler;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.ImportBlocksTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.ParallelImportChainSegmentTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
Expand All @@ -30,6 +31,7 @@

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -93,8 +95,10 @@ private CompletableFuture<List<Block>> importBlocksForCheckpoints(
ethTasksTimer);
importedBlocks = importTask.run().thenApply(PeerTaskResult::getResult);
} else {
final PipelinedImportChainSegmentTask<C, Block> importTask =
PipelinedImportChainSegmentTask.forCheckpoints(
// final PipelinedImportChainSegmentTask<C, Block> importTask =
// PipelinedImportChainSegmentTask.forCheckpoints(
final ParallelImportChainSegmentTask<C, Block> importTask =
ParallelImportChainSegmentTask.forCheckpoints(
protocolSchedule,
protocolContext,
ethContext,
Expand All @@ -104,7 +108,8 @@ private CompletableFuture<List<Block>> importBlocksForCheckpoints(
protocolSchedule, protocolContext, ethContext, ethTasksTimer),
HeaderValidationMode.DETACHED_ONLY,
checkpointHeaders);
importedBlocks = importTask.run();
importedBlocks = importTask.run().thenApply(PeerTaskResult::getResult);
// importedBlocks = importTask.run();
}
return importedBlocks;
}
Expand Down
Loading

0 comments on commit 3775fe4

Please sign in to comment.