Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
EthScheduler additions (#767)
Browse files Browse the repository at this point in the history
Add the services thread pool and a computation thread pool to the
EthScheduler.

* Services are long running, sequential, and infrequently start tasks
  such as Full Sync and Fast Sync.
* Computations are short and high CPU intensity tasks such as ECDSA
  signature extractions and POW validation.  The intent is that each
  runnable represents one such extraction and the extractions from a
  block are saturated across available processing power.  These
  computations should have zero dependencies outside their object and
  thread.
  • Loading branch information
shemnon authored Feb 5, 2019
1 parent 9fd6e3b commit cd07193
Show file tree
Hide file tree
Showing 13 changed files with 311 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,16 @@ public Istanbul64ProtocolManager(
final int networkId,
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers) {
super(blockchain, worldStateArchive, networkId, fastSyncEnabled, syncWorkers, txWorkers);
final int txWorkers,
final int computationWorkers) {
super(
blockchain,
worldStateArchive,
networkId,
fastSyncEnabled,
syncWorkers,
txWorkers,
computationWorkers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,15 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers,
final int computationWorkers,
final int requestLimit) {
this(
blockchain,
worldStateArchive,
networkId,
fastSyncEnabled,
requestLimit,
new EthScheduler(syncWorkers, txWorkers));
new EthScheduler(syncWorkers, txWorkers, computationWorkers));
}

public EthProtocolManager(
Expand All @@ -111,14 +112,16 @@ public EthProtocolManager(
final int networkId,
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers) {
final int txWorkers,
final int computationWorkers) {
this(
blockchain,
worldStateArchive,
networkId,
fastSyncEnabled,
syncWorkers,
txWorkers,
computationWorkers,
DEFAULT_REQUEST_LIMIT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ public class EthScheduler {
protected final ExecutorService syncWorkerExecutor;
protected final ScheduledExecutorService scheduler;
protected final ExecutorService txWorkerExecutor;
private final ExecutorService servicesExecutor;
private final ExecutorService computationExecutor;

EthScheduler(final int syncWorkerCount, final int txWorkerCount) {
EthScheduler(
final int syncWorkerCount, final int txWorkerCount, final int computationWorkerCount) {
this(
Executors.newFixedThreadPool(
syncWorkerCount,
Expand All @@ -62,16 +65,29 @@ public class EthScheduler {
txWorkerCount,
new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Transactions-%d")
.build()),
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Services-%d")
.build()),
Executors.newFixedThreadPool(
computationWorkerCount,
new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Computation-%d")
.build()));
}

protected EthScheduler(
final ExecutorService syncWorkerExecutor,
final ScheduledExecutorService scheduler,
final ExecutorService txWorkerExecutor) {
final ExecutorService txWorkerExecutor,
final ExecutorService servicesExecutor,
final ExecutorService computationExecutor) {
this.syncWorkerExecutor = syncWorkerExecutor;
this.scheduler = scheduler;
this.txWorkerExecutor = txWorkerExecutor;
this.servicesExecutor = servicesExecutor;
this.computationExecutor = computationExecutor;
}

public <T> CompletableFuture<T> scheduleSyncWorkerTask(
Expand Down Expand Up @@ -101,12 +117,20 @@ public <T> CompletableFuture<T> scheduleSyncWorkerTask(
return promise;
}

public Future<?> scheduleSyncWorkerTask(final Runnable command) {
return syncWorkerExecutor.submit(command);
public void scheduleSyncWorkerTask(final Runnable command) {
syncWorkerExecutor.submit(command);
}

public void scheduleTxWorkerTask(final Runnable command) {
txWorkerExecutor.submit(command);
}

CompletableFuture<Void> scheduleServiceTask(final Runnable service) {
return CompletableFuture.runAsync(service, servicesExecutor);
}

public Future<?> scheduleTxWorkerTask(final Runnable command) {
return txWorkerExecutor.submit(command);
<T> CompletableFuture<T> scheduleComputationTask(final Supplier<T> computation) {
return CompletableFuture.supplyAsync(computation, computationExecutor);
}

public CompletableFuture<Void> scheduleFutureTask(
Expand Down Expand Up @@ -194,25 +218,46 @@ public void stop() {
if (stopped.compareAndSet(false, true)) {
LOG.trace("Stopping " + getClass().getSimpleName());
syncWorkerExecutor.shutdown();
txWorkerExecutor.shutdown();
scheduler.shutdown();
servicesExecutor.shutdown();
computationExecutor.shutdown();
shutdown.countDown();
} else {
LOG.trace("Attempted to stop already stopped " + getClass().getSimpleName());
}
}

public void awaitStop() throws InterruptedException {
void awaitStop() throws InterruptedException {
shutdown.await();
if (!syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error("{} worker executor did not shutdown cleanly.", this.getClass().getSimpleName());
syncWorkerExecutor.shutdownNow();
syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES);
}
if (!txWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error(
"{} transaction worker executor did not shutdown cleanly.",
this.getClass().getSimpleName());
txWorkerExecutor.shutdownNow();
txWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES);
}
if (!scheduler.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error("{} scheduler did not shutdown cleanly.", this.getClass().getSimpleName());
scheduler.shutdownNow();
scheduler.awaitTermination(2L, TimeUnit.MINUTES);
}
if (!servicesExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error("{} services executor did not shutdown cleanly.", this.getClass().getSimpleName());
servicesExecutor.shutdownNow();
servicesExecutor.awaitTermination(2L, TimeUnit.MINUTES);
}
if (!computationExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error(
"{} computation executor did not shutdown cleanly.", this.getClass().getSimpleName());
computationExecutor.shutdownNow();
computationExecutor.awaitTermination(2L, TimeUnit.MINUTES);
}
LOG.trace("{} stopped.", this.getClass().getSimpleName());
}

Expand All @@ -222,7 +267,7 @@ private <T> CompletableFuture<T> failAfterTimeout(final Duration timeout) {
return promise;
}

public <T> void failAfterTimeout(final CompletableFuture<T> promise) {
<T> void failAfterTimeout(final CompletableFuture<T> promise) {
failAfterTimeout(promise, defaultTimeout);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ public class SynchronizerConfiguration {
private static final Logger LOG = LogManager.getLogger();

// TODO: Determine reasonable defaults here
public static int DEFAULT_PIVOT_DISTANCE_FROM_HEAD = 500;
public static float DEFAULT_FULL_VALIDATION_RATE = .1f;
public static int DEFAULT_FAST_SYNC_MINIMUM_PEERS = 5;
public static final int DEFAULT_PIVOT_DISTANCE_FROM_HEAD = 500;
public static final float DEFAULT_FULL_VALIDATION_RATE = .1f;
public static final int DEFAULT_FAST_SYNC_MINIMUM_PEERS = 5;
private static final Duration DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME = Duration.ofMinutes(5);
private static final int DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST = 200;
private static final int DEFAULT_WORLD_STATE_REQUEST_PARALLELISM = 10;
Expand Down Expand Up @@ -62,6 +62,7 @@ public class SynchronizerConfiguration {
private final int maxTrailingPeers;
private final int downloaderParallelism;
private final int transactionsParallelism;
private final int computationParallelism;

private SynchronizerConfiguration(
final SyncMode requestedSyncMode,
Expand All @@ -82,7 +83,8 @@ private SynchronizerConfiguration(
final long trailingPeerBlocksBehindThreshold,
final int maxTrailingPeers,
final int downloaderParallelism,
final int transactionsParallelism) {
final int transactionsParallelism,
final int computationParallelism) {
this.requestedSyncMode = requestedSyncMode;
this.fastSyncPivotDistance = fastSyncPivotDistance;
this.fastSyncFullValidationRate = fastSyncFullValidationRate;
Expand All @@ -102,6 +104,7 @@ private SynchronizerConfiguration(
this.maxTrailingPeers = maxTrailingPeers;
this.downloaderParallelism = downloaderParallelism;
this.transactionsParallelism = transactionsParallelism;
this.computationParallelism = computationParallelism;
}

/**
Expand Down Expand Up @@ -147,7 +150,8 @@ public SynchronizerConfiguration validated(final Blockchain blockchain) {
trailingPeerBlocksBehindThreshold,
maxTrailingPeers,
downloaderParallelism,
transactionsParallelism);
transactionsParallelism,
computationParallelism);
}

public static Builder builder() {
Expand Down Expand Up @@ -232,6 +236,10 @@ public int transactionsParallelism() {
return transactionsParallelism;
}

public int computationParallelism() {
return computationParallelism;
}

/**
* The rate at which blocks should be fully validated during fast sync. At a rate of 1f, all
* blocks are fully validated. At rates less than 1f, a subset of blocks will undergo light-weight
Expand Down Expand Up @@ -274,6 +282,7 @@ public static class Builder {
private int maxTrailingPeers = Integer.MAX_VALUE;
private int downloaderParallelism = 2;
private int transactionsParallelism = 2;
private int computationParallelism = Runtime.getRuntime().availableProcessors();

public Builder fastSyncPivotDistance(final int distance) {
fastSyncPivotDistance = distance;
Expand Down Expand Up @@ -350,6 +359,11 @@ public Builder transactionsParallelism(final int transactionsParallelism) {
return this;
}

public Builder computationParallelism(final int computationParallelism) {
this.computationParallelism = computationParallelism;
return this;
}

public SynchronizerConfiguration build() {
return new SynchronizerConfiguration(
syncMode,
Expand All @@ -370,7 +384,8 @@ public SynchronizerConfiguration build() {
trailingPeerBlocksBehindThreshold,
maxTrailingPeers,
downloaderParallelism,
transactionsParallelism);
transactionsParallelism,
computationParallelism);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ public DeterministicEthScheduler() {
}

public DeterministicEthScheduler(final TimeoutPolicy timeoutPolicy) {
super(new MockExecutorService(), new MockScheduledExecutor(), new MockExecutorService());
super(
new MockExecutorService(),
new MockScheduledExecutor(),
new MockExecutorService(),
new MockExecutorService(),
new MockExecutorService());
this.timeoutPolicy = timeoutPolicy;
}

Expand Down
Loading

0 comments on commit cd07193

Please sign in to comment.