diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTask.java index e77bbc822f..8dec1e100b 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTask.java @@ -12,6 +12,9 @@ */ package tech.pegasys.pantheon.ethereum.eth.manager; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; + import java.util.Collection; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; @@ -22,14 +25,21 @@ public abstract class AbstractEthTask implements EthTask { + protected double taskTimeInSec = -1.0D; + protected OperationTimer taskTimer; protected final AtomicReference> result = new AtomicReference<>(); protected volatile Collection> subTaskFutures = new ConcurrentLinkedDeque<>(); + /** @param ethTasksTimer The metrics timer to use to time the duration of the task. */ + protected AbstractEthTask(final LabelledMetric ethTasksTimer) { + taskTimer = ethTasksTimer.labels(getClass().getSimpleName()); + } + @Override public final CompletableFuture run() { if (result.compareAndSet(null, new CompletableFuture<>())) { - executeTask(); + executeTaskTimed(); result .get() .whenComplete( @@ -117,6 +127,20 @@ public final T result() { /** Execute core task logic. */ protected abstract void executeTask(); + /** Executes the task while timed by a timer. */ + public void executeTaskTimed() { + final OperationTimer.TimingContext timingContext = taskTimer.startTimer(); + try { + executeTask(); + } finally { + taskTimeInSec = timingContext.stopTimer(); + } + } + + public double getTaskTimeInSec() { + return taskTimeInSec; + } + /** Cleanup any resources when task completes. */ protected void cleanup() { for (final CompletableFuture subTaskFuture : subTaskFutures) { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractPeerRequestTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractPeerRequestTask.java index 19a1df56d9..2347dea444 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractPeerRequestTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractPeerRequestTask.java @@ -18,6 +18,8 @@ import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.ethereum.rlp.RLPException; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; import tech.pegasys.pantheon.util.ExceptionUtils; import java.util.Optional; @@ -29,8 +31,11 @@ public abstract class AbstractPeerRequestTask extends AbstractPeerTask { private final int requestCode; private volatile ResponseStream responseStream; - protected AbstractPeerRequestTask(final EthContext ethContext, final int requestCode) { - super(ethContext); + protected AbstractPeerRequestTask( + final EthContext ethContext, + final int requestCode, + final LabelledMetric ethTasksTimer) { + super(ethContext, ethTasksTimer); this.requestCode = requestCode; } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractPeerTask.java index 00525ae56e..173156b147 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractPeerTask.java @@ -16,6 +16,8 @@ import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.NoAvailablePeersException; import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerDisconnectedException; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; import java.util.Optional; @@ -23,7 +25,9 @@ public abstract class AbstractPeerTask extends AbstractEthTask assignedPeer = Optional.empty(); protected final EthContext ethContext; - protected AbstractPeerTask(final EthContext ethContext) { + protected AbstractPeerTask( + final EthContext ethContext, final LabelledMetric ethTasksTimer) { + super(ethTasksTimer); this.ethContext = ethContext; } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java index d86dc22e8a..e5406f23d3 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java @@ -15,6 +15,8 @@ import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.MaxRetriesReachedException; import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.NoAvailablePeersException; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; import tech.pegasys.pantheon.util.ExceptionUtils; import java.time.Duration; @@ -37,13 +39,20 @@ public abstract class AbstractRetryingPeerTask> extends private final EthContext ethContext; private final int maxRetries; private int retryCount = 0; + private final LabelledMetric ethTasksTimer; /** * @param ethContext The context of the current Eth network we are attached to. * @param maxRetries Maximum number of retries to accept before completing exceptionally. + * @param ethTasksTimer The metrics timer to use to time the duration of the task. */ - public AbstractRetryingPeerTask(final EthContext ethContext, final int maxRetries) { + public AbstractRetryingPeerTask( + final EthContext ethContext, + final int maxRetries, + final LabelledMetric ethTasksTimer) { + super(ethTasksTimer); this.ethContext = ethContext; + this.ethTasksTimer = ethTasksTimer; this.maxRetries = maxRetries; } @@ -69,7 +78,7 @@ protected void executeTask() { if (peerResult.size() > 0) { retryCount = 0; } - executeTask(); + executeTaskTimed(); } }); } @@ -87,13 +96,13 @@ private void handleTaskError(final Throwable error) { if (cause instanceof NoAvailablePeersException) { LOG.info("No peers available, wait for peer."); // Wait for new peer to connect - final WaitForPeerTask waitTask = WaitForPeerTask.create(ethContext); + final WaitForPeerTask waitTask = WaitForPeerTask.create(ethContext, ethTasksTimer); executeSubTask( () -> ethContext .getScheduler() .timeout(waitTask, Duration.ofSeconds(5)) - .whenComplete((r, t) -> executeTask())); + .whenComplete((r, t) -> executeTaskTimed())); return; } @@ -104,7 +113,9 @@ private void handleTaskError(final Throwable error) { // Wait before retrying on failure executeSubTask( () -> - ethContext.getScheduler().scheduleFutureTask(this::executeTask, Duration.ofSeconds(1))); + ethContext + .getScheduler() + .scheduleFutureTask(this::executeTaskTimed, Duration.ofSeconds(1))); } protected abstract boolean isRetryableError(Throwable error); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java index 82a1a6c515..0d482a69a1 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java @@ -34,8 +34,7 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.ethereum.rlp.RLPException; -import tech.pegasys.pantheon.metrics.MetricCategory; -import tech.pegasys.pantheon.metrics.MetricsSystem; +import tech.pegasys.pantheon.metrics.LabelledMetric; import tech.pegasys.pantheon.metrics.OperationTimer; import tech.pegasys.pantheon.util.uint.UInt256; @@ -62,12 +61,12 @@ public class BlockPropagationManager { private final ProtocolContext protocolContext; private final EthContext ethContext; private final SyncState syncState; + private final LabelledMetric ethTasksTimer; private final AtomicBoolean started = new AtomicBoolean(false); private final Set requestedBlocks = new ConcurrentSet<>(); private final PendingBlocks pendingBlocks; - private final OperationTimer announcedBlockIngestTimer; BlockPropagationManager( final SynchronizerConfiguration config, @@ -76,20 +75,15 @@ public class BlockPropagationManager { final EthContext ethContext, final SyncState syncState, final PendingBlocks pendingBlocks, - final MetricsSystem metricsSystem) { + final LabelledMetric ethTasksTimer) { this.config = config; this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethContext = ethContext; + this.ethTasksTimer = ethTasksTimer; this.syncState = syncState; this.pendingBlocks = pendingBlocks; - - this.announcedBlockIngestTimer = - metricsSystem.createTimer( - MetricCategory.BLOCKCHAIN, - "pantheon_blockchain_announcedBlock_ingest", - "Time to ingest a single announced block"); } public void start() { @@ -125,7 +119,11 @@ private void onBlockAdded(final BlockAddedEvent blockAddedEvent, final Blockchai if (!readyForImport.isEmpty()) { final Supplier>> importBlocksTask = PersistBlockTask.forUnorderedBlocks( - protocolSchedule, protocolContext, readyForImport, HeaderValidationMode.FULL); + protocolSchedule, + protocolContext, + readyForImport, + HeaderValidationMode.FULL, + ethTasksTimer); ethContext .getScheduler() .scheduleSyncWorkerTask(importBlocksTask) @@ -225,7 +223,8 @@ private void handleNewBlockHashesFromNetwork(final EthMessage message) { private CompletableFuture processAnnouncedBlock( final EthPeer peer, final NewBlockHash newBlock) { final AbstractPeerTask getBlockTask = - GetBlockFromPeerTask.create(protocolSchedule, ethContext, newBlock.hash()).assignPeer(peer); + GetBlockFromPeerTask.create(protocolSchedule, ethContext, newBlock.hash(), ethTasksTimer) + .assignPeer(peer); return getBlockTask.run().thenCompose((r) -> importOrSavePendingBlock(r.getResult())); } @@ -251,8 +250,7 @@ CompletableFuture importOrSavePendingBlock(final Block block) { // Import block final PersistBlockTask importTask = PersistBlockTask.create( - protocolSchedule, protocolContext, block, HeaderValidationMode.FULL); - final OperationTimer.TimingContext blockTimer = announcedBlockIngestTimer.startTimer(); + protocolSchedule, protocolContext, block, HeaderValidationMode.FULL, ethTasksTimer); return ethContext .getScheduler() .scheduleSyncWorkerTask(importTask::run) @@ -265,7 +263,7 @@ CompletableFuture importOrSavePendingBlock(final Block block) { block.getHeader().getNumber(), block.getHash()); } else { - final double timeInMs = blockTimer.stopTimer() * 1000; + final double timeInMs = importTask.getTaskTimeInSec() * 1000; LOG.info( String.format( "Successfully imported announced block %d (%s) in %01.3fms.", diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainHeadTracker.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainHeadTracker.java index 96623cff6d..4d2a1041e7 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainHeadTracker.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainHeadTracker.java @@ -23,6 +23,8 @@ import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetHeadersFromPeerByHashTask; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; import org.apache.logging.log4j.Logger; @@ -33,21 +35,25 @@ public class ChainHeadTracker implements ConnectCallback { private final EthContext ethContext; private final ProtocolSchedule protocolSchedule; private final TrailingPeerLimiter trailingPeerLimiter; + private final LabelledMetric ethTasksTimer; public ChainHeadTracker( final EthContext ethContext, final ProtocolSchedule protocolSchedule, - final TrailingPeerLimiter trailingPeerLimiter) { + final TrailingPeerLimiter trailingPeerLimiter, + final LabelledMetric ethTasksTimer) { this.ethContext = ethContext; this.protocolSchedule = protocolSchedule; this.trailingPeerLimiter = trailingPeerLimiter; + this.ethTasksTimer = ethTasksTimer; } public static void trackChainHeadForPeers( final EthContext ethContext, final ProtocolSchedule protocolSchedule, final Blockchain blockchain, - final SynchronizerConfiguration syncConfiguration) { + final SynchronizerConfiguration syncConfiguration, + final LabelledMetric ethTasksTimer) { final TrailingPeerLimiter trailingPeerLimiter = new TrailingPeerLimiter( ethContext.getEthPeers(), @@ -55,7 +61,7 @@ public static void trackChainHeadForPeers( syncConfiguration.trailingPeerBlocksBehindThreshold(), syncConfiguration.maxTrailingPeers()); final ChainHeadTracker tracker = - new ChainHeadTracker(ethContext, protocolSchedule, trailingPeerLimiter); + new ChainHeadTracker(ethContext, protocolSchedule, trailingPeerLimiter, ethTasksTimer); ethContext.getEthPeers().subscribeConnect(tracker); blockchain.observeBlockAdded(trailingPeerLimiter); } @@ -64,7 +70,10 @@ public static void trackChainHeadForPeers( public void onPeerConnected(final EthPeer peer) { LOG.debug("Requesting chain head info for {}", peer); GetHeadersFromPeerByHashTask.forSingleHash( - protocolSchedule, ethContext, Hash.wrap(peer.chainState().getBestBlock().getHash())) + protocolSchedule, + ethContext, + Hash.wrap(peer.chainState().getBestBlock().getHash()), + ethTasksTimer) .assignPeer(peer) .run() .whenComplete( diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java index 20fcb3fafd..118a68ee75 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java @@ -19,7 +19,8 @@ import tech.pegasys.pantheon.ethereum.eth.sync.state.PendingBlocks; import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; -import tech.pegasys.pantheon.metrics.MetricsSystem; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -42,7 +43,7 @@ public DefaultSynchronizer( final ProtocolContext protocolContext, final EthContext ethContext, final SyncState syncState, - final MetricsSystem metricsSystem) { + final LabelledMetric ethTasksTimer) { this.syncState = syncState; this.blockPropagationManager = new BlockPropagationManager<>( @@ -52,12 +53,13 @@ public DefaultSynchronizer( ethContext, syncState, new PendingBlocks(), - metricsSystem); + ethTasksTimer); this.downloader = - new Downloader<>(syncConfig, protocolSchedule, protocolContext, ethContext, syncState); + new Downloader<>( + syncConfig, protocolSchedule, protocolContext, ethContext, syncState, ethTasksTimer); ChainHeadTracker.trackChainHeadForPeers( - ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig); + ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig, ethTasksTimer); if (syncConfig.syncMode().equals(SyncMode.FAST)) { LOG.info("Fast sync enabled."); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/Downloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/Downloader.java index 72620dbfe7..754f0e7902 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/Downloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/Downloader.java @@ -33,6 +33,8 @@ import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; import tech.pegasys.pantheon.util.ExceptionUtils; import tech.pegasys.pantheon.util.uint.UInt256; @@ -60,6 +62,7 @@ public class Downloader { private final ProtocolContext protocolContext; private final EthContext ethContext; private final SyncState syncState; + private final LabelledMetric ethTasksTimer; private final Deque checkpointHeaders = new ConcurrentLinkedDeque<>(); private int checkpointTimeouts = 0; @@ -75,7 +78,9 @@ public class Downloader { final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, - final SyncState syncState) { + final SyncState syncState, + final LabelledMetric ethTasksTimer) { + this.ethTasksTimer = ethTasksTimer; this.config = config; this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; @@ -124,13 +129,13 @@ private CompletableFuture executeDownload() { } private CompletableFuture waitForPeers() { - return WaitForPeersTask.create(ethContext, 1).run(); + return WaitForPeersTask.create(ethContext, 1, ethTasksTimer).run(); } private CompletableFuture waitForNewPeer() { return ethContext .getScheduler() - .timeout(WaitForPeerTask.create(ethContext), Duration.ofSeconds(5)); + .timeout(WaitForPeerTask.create(ethContext, ethTasksTimer), Duration.ofSeconds(5)); } private CompletableFuture findSyncTarget() { @@ -159,7 +164,8 @@ private CompletableFuture findSyncTarget() { protocolContext, ethContext, bestPeer, - config.downloaderHeaderRequestSize()) + config.downloaderHeaderRequestSize(), + ethTasksTimer) .run() .handle((r, t) -> r) .thenCompose( @@ -325,7 +331,8 @@ private EthTask>> checkpointHeadersTask( lastHeader.getHash(), lastHeader.getNumber(), config.downloaderHeaderRequestSize() + 1, - config.downloaderChainSegmentSize() - 1) + config.downloaderChainSegmentSize() - 1, + ethTasksTimer) .assignPeer(syncTarget.peer()); } @@ -344,7 +351,8 @@ private CompletableFuture> importBlocks() { protocolContext, ethContext, checkpointHeaders.getFirst(), - config.downloaderChainSegmentSize()); + config.downloaderChainSegmentSize(), + ethTasksTimer); importedBlocks = importTask.run().thenApply(PeerTaskResult::getResult); } else { final PipelinedImportChainSegmentTask importTask = @@ -353,6 +361,7 @@ private CompletableFuture> importBlocks() { protocolContext, ethContext, config.downloaderParallelism(), + ethTasksTimer, Lists.newArrayList(checkpointHeaders)); importedBlocks = importTask.run(); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/AbstractGetHeadersFromPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/AbstractGetHeadersFromPeerTask.java index 2c176cf254..821b59cc16 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/AbstractGetHeadersFromPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/AbstractGetHeadersFromPeerTask.java @@ -22,6 +22,8 @@ import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; import java.util.ArrayList; import java.util.Collections; @@ -50,8 +52,9 @@ protected AbstractGetHeadersFromPeerTask( final long minimumRequiredBlockNumber, final int count, final int skip, - final boolean reverse) { - super(ethContext, EthPV62.GET_BLOCK_HEADERS); + final boolean reverse, + final LabelledMetric ethTasksTimer) { + super(ethContext, EthPV62.GET_BLOCK_HEADERS, ethTasksTimer); checkArgument(count > 0); this.protocolSchedule = protocolSchedule; this.count = count; diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java index f67807fb0b..fe6b7443db 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java @@ -24,6 +24,8 @@ import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerBreachedProtocolException; import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerDisconnectedException; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; import java.util.HashMap; import java.util.List; @@ -48,6 +50,7 @@ public class CompleteBlocksTask extends AbstractRetryingPeerTask> private final EthContext ethContext; private final ProtocolSchedule protocolSchedule; + private final LabelledMetric ethTasksTimer; private final List headers; private final Map blocks; @@ -57,11 +60,13 @@ private CompleteBlocksTask( final ProtocolSchedule protocolSchedule, final EthContext ethContext, final List headers, - final int maxRetries) { - super(ethContext, maxRetries); + final int maxRetries, + final LabelledMetric ethTasksTimer) { + super(ethContext, maxRetries, ethTasksTimer); checkArgument(headers.size() > 0, "Must supply a non-empty headers list"); this.protocolSchedule = protocolSchedule; this.ethContext = ethContext; + this.ethTasksTimer = ethTasksTimer; this.headers = headers; this.blocks = new HashMap<>(); @@ -71,15 +76,19 @@ public static CompleteBlocksTask forHeaders( final ProtocolSchedule protocolSchedule, final EthContext ethContext, final List headers, - final int maxRetries) { - return new CompleteBlocksTask<>(protocolSchedule, ethContext, headers, maxRetries); + final int maxRetries, + final LabelledMetric ethTasksTimer) { + return new CompleteBlocksTask<>( + protocolSchedule, ethContext, headers, maxRetries, ethTasksTimer); } public static CompleteBlocksTask forHeaders( final ProtocolSchedule protocolSchedule, final EthContext ethContext, - final List headers) { - return new CompleteBlocksTask<>(protocolSchedule, ethContext, headers, DEFAULT_RETRIES); + final List headers, + final LabelledMetric ethTasksTimer) { + return new CompleteBlocksTask<>( + protocolSchedule, ethContext, headers, DEFAULT_RETRIES, ethTasksTimer); } @Override @@ -111,7 +120,8 @@ private CompletableFuture>> requestBodies() { return executeSubTask( () -> { final GetBodiesFromPeerTask task = - GetBodiesFromPeerTask.forHeaders(protocolSchedule, ethContext, incompleteHeaders); + GetBodiesFromPeerTask.forHeaders( + protocolSchedule, ethContext, incompleteHeaders, ethTasksTimer); assignedPeer.ifPresent(task::assignPeer); return task.run(); }); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DetermineCommonAncestorTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DetermineCommonAncestorTask.java index 3bc5e46948..19e4a2d8b4 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DetermineCommonAncestorTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DetermineCommonAncestorTask.java @@ -20,6 +20,8 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.util.BlockchainUtil; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; import java.util.List; import java.util.OptionalInt; @@ -34,6 +36,7 @@ public class DetermineCommonAncestorTask extends AbstractEthTask private final EthContext ethContext; private final ProtocolSchedule protocolSchedule; private final ProtocolContext protocolContext; + private final LabelledMetric ethTasksTimer; private final EthPeer peer; private final int headerRequestSize; @@ -47,10 +50,13 @@ private DetermineCommonAncestorTask( final ProtocolContext protocolContext, final EthContext ethContext, final EthPeer peer, - final int headerRequestSize) { + final int headerRequestSize, + final LabelledMetric ethTasksTimer) { + super(ethTasksTimer); this.protocolSchedule = protocolSchedule; this.ethContext = ethContext; this.protocolContext = protocolContext; + this.ethTasksTimer = ethTasksTimer; this.peer = peer; this.headerRequestSize = headerRequestSize; @@ -65,9 +71,10 @@ public static DetermineCommonAncestorTask create( final ProtocolContext protocolContext, final EthContext ethContext, final EthPeer peer, - final int headerRequestSize) { + final int headerRequestSize, + final LabelledMetric ethTasksTimer) { return new DetermineCommonAncestorTask<>( - protocolSchedule, protocolContext, ethContext, peer, headerRequestSize); + protocolSchedule, protocolContext, ethContext, peer, headerRequestSize, ethTasksTimer); } @Override @@ -89,7 +96,7 @@ protected void executeTask() { if (error != null) { result.get().completeExceptionally(error); } else if (!result.get().isDone()) { - executeTask(); + executeTaskTimed(); } }); } @@ -112,7 +119,8 @@ CompletableFuture>> requestHea ethContext, maximumPossibleCommonAncestorNumber, count, - skipInterval) + skipInterval, + ethTasksTimer) .assignPeer(peer) .run()); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java index ef4cd750f0..856be8712e 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java @@ -29,6 +29,8 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; import java.util.Arrays; import java.util.List; @@ -52,6 +54,7 @@ public class DownloadHeaderSequenceTask extends AbstractRetryingPeerTask protocolContext; private final ProtocolSchedule protocolSchedule; + private final LabelledMetric ethTasksTimer; private final BlockHeader[] headers; private final BlockHeader referenceHeader; @@ -66,13 +69,15 @@ private DownloadHeaderSequenceTask( final EthContext ethContext, final BlockHeader referenceHeader, final int segmentLength, - final int maxRetries) { - super(ethContext, maxRetries); + final int maxRetries, + final LabelledMetric ethTasksTimer) { + super(ethContext, maxRetries, ethTasksTimer); this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethContext = ethContext; this.referenceHeader = referenceHeader; this.segmentLength = segmentLength; + this.ethTasksTimer = ethTasksTimer; startingBlockNumber = referenceHeader.getNumber() - segmentLength; headers = new BlockHeader[segmentLength]; @@ -85,9 +90,16 @@ public static DownloadHeaderSequenceTask endingAtHeader( final EthContext ethContext, final BlockHeader referenceHeader, final int segmentLength, - final int maxRetries) { + final int maxRetries, + final LabelledMetric ethTasksTimer) { return new DownloadHeaderSequenceTask<>( - protocolSchedule, protocolContext, ethContext, referenceHeader, segmentLength, maxRetries); + protocolSchedule, + protocolContext, + ethContext, + referenceHeader, + segmentLength, + maxRetries, + ethTasksTimer); } public static DownloadHeaderSequenceTask endingAtHeader( @@ -95,14 +107,16 @@ public static DownloadHeaderSequenceTask endingAtHeader( final ProtocolContext protocolContext, final EthContext ethContext, final BlockHeader referenceHeader, - final int segmentLength) { + final int segmentLength, + final LabelledMetric ethTasksTimer) { return new DownloadHeaderSequenceTask<>( protocolSchedule, protocolContext, ethContext, referenceHeader, segmentLength, - DEFAULT_RETRIES); + DEFAULT_RETRIES, + ethTasksTimer); } @Override @@ -149,7 +163,8 @@ private CompletableFuture>> downloadHeaders() { ethContext, referenceHash, referenceHeaderForNextRequest.getNumber(), - count + 1); + count + 1, + ethTasksTimer); return headersTask.run(); }); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBlockFromPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBlockFromPeerTask.java index 4b36c0cc21..99d29b3a6d 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBlockFromPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBlockFromPeerTask.java @@ -21,6 +21,8 @@ import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.IncompleteResultsException; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -33,18 +35,26 @@ public class GetBlockFromPeerTask extends AbstractPeerTask { private static final Logger LOG = LogManager.getLogger(); private final ProtocolSchedule protocolSchedule; + private final LabelledMetric ethTasksTimer; private final Hash hash; protected GetBlockFromPeerTask( - final ProtocolSchedule protocolSchedule, final EthContext ethContext, final Hash hash) { - super(ethContext); + final ProtocolSchedule protocolSchedule, + final EthContext ethContext, + final Hash hash, + final LabelledMetric ethTasksTimer) { + super(ethContext, ethTasksTimer); this.protocolSchedule = protocolSchedule; + this.ethTasksTimer = ethTasksTimer; this.hash = hash; } public static GetBlockFromPeerTask create( - final ProtocolSchedule protocolSchedule, final EthContext ethContext, final Hash hash) { - return new GetBlockFromPeerTask(protocolSchedule, ethContext, hash); + final ProtocolSchedule protocolSchedule, + final EthContext ethContext, + final Hash hash, + final LabelledMetric ethTasksTimer) { + return new GetBlockFromPeerTask(protocolSchedule, ethContext, hash, ethTasksTimer); } @Override @@ -70,7 +80,8 @@ protected void executeTaskWithPeer(final EthPeer peer) throws PeerNotConnected { private CompletableFuture>> downloadHeader(final EthPeer peer) { return executeSubTask( () -> - GetHeadersFromPeerByHashTask.forSingleHash(protocolSchedule, ethContext, hash) + GetHeadersFromPeerByHashTask.forSingleHash( + protocolSchedule, ethContext, hash, ethTasksTimer) .assignPeer(peer) .run()); } @@ -87,7 +98,7 @@ private CompletableFuture>> completeBlock( () -> { final GetBodiesFromPeerTask task = GetBodiesFromPeerTask.forHeaders( - protocolSchedule, ethContext, headerResult.getResult()); + protocolSchedule, ethContext, headerResult.getResult(), ethTasksTimer); task.assignPeer(headerResult.getPeer()); return task.run(); }); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBodiesFromPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBodiesFromPeerTask.java index fdb90093c7..504e3ff5d8 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBodiesFromPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBodiesFromPeerTask.java @@ -29,6 +29,8 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; import tech.pegasys.pantheon.util.bytes.Bytes32; import java.util.ArrayList; @@ -59,8 +61,9 @@ public class GetBodiesFromPeerTask extends AbstractPeerRequestTask protocolSchedule, final EthContext ethContext, - final List headers) { - super(ethContext, EthPV62.GET_BLOCK_BODIES); + final List headers, + final LabelledMetric ethTasksTimer) { + super(ethContext, EthPV62.GET_BLOCK_BODIES, ethTasksTimer); checkArgument(headers.size() > 0); this.protocolSchedule = protocolSchedule; @@ -76,8 +79,9 @@ private GetBodiesFromPeerTask( public static GetBodiesFromPeerTask forHeaders( final ProtocolSchedule protocolSchedule, final EthContext ethContext, - final List headers) { - return new GetBodiesFromPeerTask<>(protocolSchedule, ethContext, headers); + final List headers, + final LabelledMetric ethTasksTimer) { + return new GetBodiesFromPeerTask<>(protocolSchedule, ethContext, headers, ethTasksTimer); } @Override diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByHashTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByHashTask.java index f4caed86a0..f45306fc50 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByHashTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByHashTask.java @@ -21,6 +21,8 @@ import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; import com.google.common.annotations.VisibleForTesting; import org.apache.logging.log4j.LogManager; @@ -40,8 +42,16 @@ public class GetHeadersFromPeerByHashTask extends AbstractGetHeadersFromPeerTask final long minimumRequiredBlockNumber, final int count, final int skip, - final boolean reverse) { - super(protocolSchedule, ethContext, minimumRequiredBlockNumber, count, skip, reverse); + final boolean reverse, + final LabelledMetric ethTasksTimer) { + super( + protocolSchedule, + ethContext, + minimumRequiredBlockNumber, + count, + skip, + reverse, + ethTasksTimer); checkNotNull(referenceHash); this.referenceHash = referenceHash; } @@ -51,9 +61,17 @@ public static AbstractGetHeadersFromPeerTask startingAtHash( final EthContext ethContext, final Hash firstHash, final long firstBlockNumber, - final int segmentLength) { + final int segmentLength, + final LabelledMetric ethTasksTimer) { return new GetHeadersFromPeerByHashTask( - protocolSchedule, ethContext, firstHash, firstBlockNumber, segmentLength, 0, false); + protocolSchedule, + ethContext, + firstHash, + firstBlockNumber, + segmentLength, + 0, + false, + ethTasksTimer); } public static AbstractGetHeadersFromPeerTask startingAtHash( @@ -62,9 +80,17 @@ public static AbstractGetHeadersFromPeerTask startingAtHash( final Hash firstHash, final long firstBlockNumber, final int segmentLength, - final int skip) { + final int skip, + final LabelledMetric ethTasksTimer) { return new GetHeadersFromPeerByHashTask( - protocolSchedule, ethContext, firstHash, firstBlockNumber, segmentLength, skip, false); + protocolSchedule, + ethContext, + firstHash, + firstBlockNumber, + segmentLength, + skip, + false, + ethTasksTimer); } public static AbstractGetHeadersFromPeerTask endingAtHash( @@ -72,14 +98,26 @@ public static AbstractGetHeadersFromPeerTask endingAtHash( final EthContext ethContext, final Hash lastHash, final long lastBlockNumber, - final int segmentLength) { + final int segmentLength, + final LabelledMetric ethTasksTimer) { return new GetHeadersFromPeerByHashTask( - protocolSchedule, ethContext, lastHash, lastBlockNumber, segmentLength, 0, true); + protocolSchedule, + ethContext, + lastHash, + lastBlockNumber, + segmentLength, + 0, + true, + ethTasksTimer); } public static AbstractGetHeadersFromPeerTask forSingleHash( - final ProtocolSchedule protocolSchedule, final EthContext ethContext, final Hash hash) { - return new GetHeadersFromPeerByHashTask(protocolSchedule, ethContext, hash, 0, 1, 0, false); + final ProtocolSchedule protocolSchedule, + final EthContext ethContext, + final Hash hash, + final LabelledMetric ethTasksTimer) { + return new GetHeadersFromPeerByHashTask( + protocolSchedule, ethContext, hash, 0, 1, 0, false, ethTasksTimer); } @Override diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByNumberTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByNumberTask.java index a86fb55a14..bd622f2bdd 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByNumberTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByNumberTask.java @@ -18,6 +18,8 @@ import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; import com.google.common.annotations.VisibleForTesting; import org.apache.logging.log4j.LogManager; @@ -36,8 +38,9 @@ public class GetHeadersFromPeerByNumberTask extends AbstractGetHeadersFromPeerTa final long blockNumber, final int count, final int skip, - final boolean reverse) { - super(protocolSchedule, ethContext, blockNumber, count, skip, reverse); + final boolean reverse, + final LabelledMetric ethTasksTimer) { + super(protocolSchedule, ethContext, blockNumber, count, skip, reverse, ethTasksTimer); this.blockNumber = blockNumber; } @@ -45,18 +48,20 @@ public static AbstractGetHeadersFromPeerTask startingAtNumber( final ProtocolSchedule protocolSchedule, final EthContext ethContext, final long firstBlockNumber, - final int segmentLength) { + final int segmentLength, + final LabelledMetric ethTasksTimer) { return new GetHeadersFromPeerByNumberTask( - protocolSchedule, ethContext, firstBlockNumber, segmentLength, 0, false); + protocolSchedule, ethContext, firstBlockNumber, segmentLength, 0, false, ethTasksTimer); } public static AbstractGetHeadersFromPeerTask endingAtNumber( final ProtocolSchedule protocolSchedule, final EthContext ethContext, final long lastlockNumber, - final int segmentLength) { + final int segmentLength, + final LabelledMetric ethTasksTimer) { return new GetHeadersFromPeerByNumberTask( - protocolSchedule, ethContext, lastlockNumber, segmentLength, 0, true); + protocolSchedule, ethContext, lastlockNumber, segmentLength, 0, true, ethTasksTimer); } public static AbstractGetHeadersFromPeerTask endingAtNumber( @@ -64,17 +69,19 @@ public static AbstractGetHeadersFromPeerTask endingAtNumber( final EthContext ethContext, final long lastlockNumber, final int segmentLength, - final int skip) { + final int skip, + final LabelledMetric ethTasksTimer) { return new GetHeadersFromPeerByNumberTask( - protocolSchedule, ethContext, lastlockNumber, segmentLength, skip, true); + protocolSchedule, ethContext, lastlockNumber, segmentLength, skip, true, ethTasksTimer); } public static AbstractGetHeadersFromPeerTask forSingleNumber( final ProtocolSchedule protocolSchedule, final EthContext ethContext, - final long blockNumber) { + final long blockNumber, + final LabelledMetric ethTasksTimer) { return new GetHeadersFromPeerByNumberTask( - protocolSchedule, ethContext, blockNumber, 1, 0, false); + protocolSchedule, ethContext, blockNumber, 1, 0, false, ethTasksTimer); } @Override diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTask.java index 9b77756546..8a4413b721 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTask.java @@ -21,6 +21,8 @@ import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; import java.util.Collections; import java.util.List; @@ -40,6 +42,7 @@ public class ImportBlocksTask extends AbstractPeerTask> { private final ProtocolContext protocolContext; private final ProtocolSchedule protocolSchedule; + private final LabelledMetric ethTasksTimer; private final long startNumber; private final BlockHeader referenceHeader; @@ -51,12 +54,14 @@ protected ImportBlocksTask( final ProtocolContext protocolContext, final EthContext ethContext, final BlockHeader referenceHeader, - final int maxBlocks) { - super(ethContext); + final int maxBlocks, + final LabelledMetric ethTasksTimer) { + super(ethContext, ethTasksTimer); this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.referenceHeader = referenceHeader; this.maxBlocks = maxBlocks; + this.ethTasksTimer = ethTasksTimer; this.startNumber = referenceHeader.getNumber(); } @@ -66,9 +71,10 @@ public static ImportBlocksTask fromHeader( final ProtocolContext protocolContext, final EthContext ethContext, final BlockHeader previousHeader, - final int maxBlocks) { + final int maxBlocks, + final LabelledMetric ethTasksTimer) { return new ImportBlocksTask<>( - protocolSchedule, protocolContext, ethContext, previousHeader, maxBlocks); + protocolSchedule, protocolContext, ethContext, previousHeader, maxBlocks, ethTasksTimer); } @Override @@ -97,7 +103,8 @@ private CompletableFuture>> downloadHeaders() { ethContext, referenceHeader.getHash(), referenceHeader.getNumber(), - maxBlocks) + maxBlocks, + ethTasksTimer) .assignPeer(peer); return executeSubTask(task::run); } @@ -108,7 +115,8 @@ private CompletableFuture> completeBlocks( return CompletableFuture.completedFuture(Collections.emptyList()); } final CompleteBlocksTask task = - CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, headers.getResult()) + CompleteBlocksTask.forHeaders( + protocolSchedule, ethContext, headers.getResult(), ethTasksTimer) .assignPeer(peer); return executeSubTask(() -> ethContext.getScheduler().timeout(task)); } @@ -123,7 +131,7 @@ private CompletableFuture> importBlocks(final List blocks) { } final Supplier>> task = PersistBlockTask.forSequentialBlocks( - protocolSchedule, protocolContext, blocks, HeaderValidationMode.FULL); + protocolSchedule, protocolContext, blocks, HeaderValidationMode.FULL, ethTasksTimer); return executeWorkerSubTask(ethContext.getScheduler(), task); } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PersistBlockTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PersistBlockTask.java index a6321db110..a227c9de5a 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PersistBlockTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PersistBlockTask.java @@ -22,6 +22,8 @@ import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; import java.util.ArrayList; import java.util.List; @@ -39,7 +41,9 @@ private PersistBlockTask( final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final Block block, - final HeaderValidationMode headerValidationMode) { + final HeaderValidationMode headerValidationMode, + final LabelledMetric ethTasksTimer) { + super(ethTasksTimer); this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.block = block; @@ -50,15 +54,18 @@ public static PersistBlockTask create( final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final Block block, - final HeaderValidationMode headerValidationMode) { - return new PersistBlockTask<>(protocolSchedule, protocolContext, block, headerValidationMode); + final HeaderValidationMode headerValidationMode, + final LabelledMetric ethTasksTimer) { + return new PersistBlockTask<>( + protocolSchedule, protocolContext, block, headerValidationMode, ethTasksTimer); } public static Supplier>> forSequentialBlocks( final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final List blocks, - final HeaderValidationMode headerValidationMode) { + final HeaderValidationMode headerValidationMode, + final LabelledMetric ethTasksTimer) { checkArgument(blocks.size() > 0); return () -> { final List successfulImports = new ArrayList<>(); @@ -71,7 +78,8 @@ public static Supplier>> forSequentialBlocks( protocolContext, block, successfulImports, - headerValidationMode); + headerValidationMode, + ethTasksTimer); continue; } future = @@ -82,7 +90,8 @@ public static Supplier>> forSequentialBlocks( protocolContext, block, successfulImports, - headerValidationMode)); + headerValidationMode, + ethTasksTimer)); } return future.thenApply(r -> successfulImports); }; @@ -93,8 +102,10 @@ private static CompletableFuture importBlockAndAddToList( final ProtocolContext protocolContext, final Block block, final List list, - final HeaderValidationMode headerValidationMode) { - return PersistBlockTask.create(protocolSchedule, protocolContext, block, headerValidationMode) + final HeaderValidationMode headerValidationMode, + final LabelledMetric ethTasksTimer) { + return PersistBlockTask.create( + protocolSchedule, protocolContext, block, headerValidationMode, ethTasksTimer) .run() .whenComplete( (r, t) -> { @@ -108,7 +119,8 @@ public static Supplier>> forUnorderedBlocks( final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final List blocks, - final HeaderValidationMode headerValidationMode) { + final HeaderValidationMode headerValidationMode, + final LabelledMetric ethTasksTimer) { checkArgument(blocks.size() > 0); return () -> { final CompletableFuture> finalResult = new CompletableFuture<>(); @@ -118,7 +130,7 @@ public static Supplier>> forUnorderedBlocks( if (future == null) { future = PersistBlockTask.create( - protocolSchedule, protocolContext, block, headerValidationMode) + protocolSchedule, protocolContext, block, headerValidationMode, ethTasksTimer) .run(); continue; } @@ -131,7 +143,11 @@ public static Supplier>> forUnorderedBlocks( successfulImports.add(r); } return PersistBlockTask.create( - protocolSchedule, protocolContext, block, headerValidationMode) + protocolSchedule, + protocolContext, + block, + headerValidationMode, + ethTasksTimer) .run(); }); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTask.java index 95387073ad..0f8fc725a1 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTask.java @@ -23,6 +23,8 @@ import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; import tech.pegasys.pantheon.util.ExceptionUtils; import java.util.ArrayList; @@ -45,6 +47,7 @@ public class PipelinedImportChainSegmentTask extends AbstractEthTask protocolContext; private final ProtocolSchedule protocolSchedule; private final List importedBlocks = new ArrayList<>(); + private final LabelledMetric ethTasksTimer; // First header is assumed to already be imported private final List checkpointHeaders; @@ -67,10 +70,13 @@ protected PipelinedImportChainSegmentTask( final ProtocolContext protocolContext, final EthContext ethContext, final int maxActiveChunks, - final List checkpointHeaders) { + final List checkpointHeaders, + final LabelledMetric ethTasksTimer) { + super(ethTasksTimer); this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethContext = ethContext; + this.ethTasksTimer = ethTasksTimer; this.checkpointHeaders = checkpointHeaders; this.chunksInTotal = checkpointHeaders.size() - 1; this.chunksIssued = 0; @@ -83,12 +89,14 @@ public static PipelinedImportChainSegmentTask forCheckpoints( final ProtocolContext protocolContext, final EthContext ethContext, final int maxActiveChunks, + final LabelledMetric ethTasksTimer, final BlockHeader... checkpointHeaders) { return forCheckpoints( protocolSchedule, protocolContext, ethContext, maxActiveChunks, + ethTasksTimer, Arrays.asList(checkpointHeaders)); } @@ -97,9 +105,15 @@ public static PipelinedImportChainSegmentTask forCheckpoints( final ProtocolContext protocolContext, final EthContext ethContext, final int maxActiveChunks, + final LabelledMetric ethTasksTimer, final List checkpointHeaders) { return new PipelinedImportChainSegmentTask<>( - protocolSchedule, protocolContext, ethContext, maxActiveChunks, checkpointHeaders); + protocolSchedule, + protocolContext, + ethContext, + maxActiveChunks, + checkpointHeaders, + ethTasksTimer); } @Override @@ -195,7 +209,12 @@ private CompletableFuture> downloadNextHeaders( } final DownloadHeaderSequenceTask task = DownloadHeaderSequenceTask.endingAtHeader( - protocolSchedule, protocolContext, ethContext, lastChunkHeader, segmentLength); + protocolSchedule, + protocolContext, + ethContext, + lastChunkHeader, + segmentLength, + ethTasksTimer); return executeSubTask(task::run) .thenApply( headers -> { @@ -239,7 +258,7 @@ private CompletableFuture> downloadBlocks(final List he headers.get(0).getNumber(), headers.get(headers.size() - 1).getNumber()); final CompleteBlocksTask task = - CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, headers); + CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, headers, ethTasksTimer); return executeSubTask(task::run); } @@ -250,7 +269,11 @@ private CompletableFuture> validateAndImportBlocks(final List blocks.get(blocks.size() - 1).getHeader().getNumber()); final Supplier>> task = PersistBlockTask.forSequentialBlocks( - protocolSchedule, protocolContext, blocks, HeaderValidationMode.SKIP_DETACHED); + protocolSchedule, + protocolContext, + blocks, + HeaderValidationMode.SKIP_DETACHED, + ethTasksTimer); return executeWorkerSubTask(ethContext.getScheduler(), task); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/WaitForPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/WaitForPeerTask.java index 6492172b13..f4f97b02dc 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/WaitForPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/WaitForPeerTask.java @@ -15,6 +15,8 @@ import tech.pegasys.pantheon.ethereum.eth.manager.AbstractEthTask; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -26,12 +28,15 @@ public class WaitForPeerTask extends AbstractEthTask { private final EthContext ethContext; private volatile Long peerListenerId; - private WaitForPeerTask(final EthContext ethContext) { + private WaitForPeerTask( + final EthContext ethContext, final LabelledMetric ethTasksTimer) { + super(ethTasksTimer); this.ethContext = ethContext; } - public static WaitForPeerTask create(final EthContext ethContext) { - return new WaitForPeerTask(ethContext); + public static WaitForPeerTask create( + final EthContext ethContext, final LabelledMetric ethTasksTimer) { + return new WaitForPeerTask(ethContext, ethTasksTimer); } @Override diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/WaitForPeersTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/WaitForPeersTask.java index 99ca896be7..d2359026ae 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/WaitForPeersTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/WaitForPeersTask.java @@ -15,6 +15,8 @@ import tech.pegasys.pantheon.ethereum.eth.manager.AbstractEthTask; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -27,13 +29,20 @@ public class WaitForPeersTask extends AbstractEthTask { private final EthContext ethContext; private volatile Long peerListenerId; - private WaitForPeersTask(final EthContext ethContext, final int targetPeerCount) { + private WaitForPeersTask( + final EthContext ethContext, + final int targetPeerCount, + final LabelledMetric ethTasksTimer) { + super(ethTasksTimer); this.targetPeerCount = targetPeerCount; this.ethContext = ethContext; } - public static WaitForPeersTask create(final EthContext ethContext, final int targetPeerCount) { - return new WaitForPeersTask(ethContext, targetPeerCount); + public static WaitForPeersTask create( + final EthContext ethContext, + final int targetPeerCount, + final LabelledMetric ethTasksTimer) { + return new WaitForPeersTask(ethContext, targetPeerCount, ethTasksTimer); } @Override diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTaskTest.java index 5e9f5ad41a..7554810d67 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTaskTest.java @@ -14,6 +14,8 @@ import static org.assertj.core.api.Assertions.assertThat; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; + import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; @@ -81,6 +83,7 @@ private class EthTaskWithMultipleSubtasks extends AbstractEthTask { private final List> subtasks; private EthTaskWithMultipleSubtasks(final List> subtasks) { + super(NoOpMetricsSystem.NO_OP_LABELLED_TIMER); this.subtasks = subtasks; } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockEthTask.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockEthTask.java index 419977eed0..4f65be85b8 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockEthTask.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockEthTask.java @@ -12,10 +12,16 @@ */ package tech.pegasys.pantheon.ethereum.eth.manager; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; + public class MockEthTask extends AbstractEthTask { private boolean executed = false; + protected MockEthTask() { + super(NoOpMetricsSystem.NO_OP_LABELLED_TIMER); + } + @Override protected void executeTask() { executed = true; diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java index e61a191705..b3671c3cb1 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java @@ -24,6 +24,9 @@ import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -43,6 +46,7 @@ public abstract class AbstractMessageTaskTest { protected static Blockchain blockchain; protected static ProtocolSchedule protocolSchedule; protected static ProtocolContext protocolContext; + protected static LabelledMetric ethTasksTimer; protected EthProtocolManager ethProtocolManager; protected EthContext ethContext; protected AtomicBoolean peersDoTimeout; @@ -55,6 +59,7 @@ public static void setup() { blockchain = blockchainSetupUtil.getBlockchain(); protocolSchedule = blockchainSetupUtil.getProtocolSchedule(); protocolContext = blockchainSetupUtil.getProtocolContext(); + ethTasksTimer = NoOpMetricsSystem.NO_OP_LABELLED_TIMER; assert (blockchainSetupUtil.getMaxBlockNumber() >= 20L); } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java index da051d3c96..72ad7e1333 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java @@ -35,7 +35,8 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator; import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator.BlockOptions; -import tech.pegasys.pantheon.metrics.MetricsSystem; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.util.uint.UInt256; @@ -58,7 +59,8 @@ public class BlockPropagationManagerTest { private SynchronizerConfiguration syncConfig; private final PendingBlocks pendingBlocks = new PendingBlocks(); private SyncState syncState; - private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); + private final LabelledMetric ethTasksTimer = + NoOpMetricsSystem.NO_OP_LABELLED_TIMER; @BeforeClass public static void setupSuite() { @@ -91,7 +93,7 @@ public void setup() { ethProtocolManager.ethContext(), syncState, pendingBlocks, - metricsSystem); + ethTasksTimer); } @Test @@ -467,7 +469,7 @@ public void purgesOldBlocks() { ethProtocolManager.ethContext(), syncState, pendingBlocks, - metricsSystem); + ethTasksTimer); final BlockDataGenerator gen = new BlockDataGenerator(); // Import some blocks diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainHeadTrackerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainHeadTrackerTest.java index 4f2afe187b..9013a8bdbf 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainHeadTrackerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainHeadTrackerTest.java @@ -26,6 +26,7 @@ import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.BlockchainSetupUtil; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.util.uint.UInt256; import org.junit.Test; @@ -46,7 +47,11 @@ public class ChainHeadTrackerTest { DevelopmentProtocolSchedule.create(GenesisConfigFile.DEFAULT.getConfigOptions()); private final TrailingPeerLimiter trailingPeerLimiter = mock(TrailingPeerLimiter.class); private final ChainHeadTracker chainHeadTracker = - new ChainHeadTracker(ethProtocolManager.ethContext(), protocolSchedule, trailingPeerLimiter); + new ChainHeadTracker( + ethProtocolManager.ethContext(), + protocolSchedule, + trailingPeerLimiter, + NoOpMetricsSystem.NO_OP_LABELLED_TIMER); @Test public void shouldRequestHeaderChainHeadWhenNewPeerConnects() { diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/DownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/DownloaderTest.java index 09be7545c4..ba7fbf63c8 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/DownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/DownloaderTest.java @@ -40,6 +40,9 @@ import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.util.uint.UInt256; import java.util.ArrayList; @@ -65,6 +68,7 @@ public class DownloaderTest { protected MutableBlockchain localBlockchain; private BlockchainSetupUtil otherBlockchainSetup; protected Blockchain otherBlockchain; + private LabelledMetric ethTashsTimer; @Before public void setupTest() { @@ -79,10 +83,13 @@ public void setupTest() { ethProtocolManager = EthProtocolManagerTestUtil.create(localBlockchain); ethContext = ethProtocolManager.ethContext(); syncState = new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers()); + + ethTashsTimer = NoOpMetricsSystem.NO_OP_LABELLED_TIMER; } private Downloader downloader(final SynchronizerConfiguration syncConfig) { - return new Downloader<>(syncConfig, protocolSchedule, protocolContext, ethContext, syncState); + return new Downloader<>( + syncConfig, protocolSchedule, protocolContext, ethContext, syncState, ethTashsTimer); } private Downloader downloader() { diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTaskTest.java index c2ce413e0f..2266f2788e 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTaskTest.java @@ -17,6 +17,7 @@ import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.eth.manager.EthTask; import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskTest; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import java.util.ArrayList; import java.util.List; @@ -41,6 +42,10 @@ protected EthTask> createTask(final List requestedData) { final List headersToComplete = requestedData.stream().map(Block::getHeader).collect(Collectors.toList()); return CompleteBlocksTask.forHeaders( - protocolSchedule, ethContext, headersToComplete, maxRetries); + protocolSchedule, + ethContext, + headersToComplete, + maxRetries, + NoOpMetricsSystem.NO_OP_LABELLED_TIMER); } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DetermineCommonAncestorTaskParameterizedTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DetermineCommonAncestorTaskParameterizedTest.java index ee979016ba..37e873b778 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DetermineCommonAncestorTaskParameterizedTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DetermineCommonAncestorTaskParameterizedTest.java @@ -31,6 +31,9 @@ import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.util.uint.UInt256; import java.io.IOException; @@ -52,6 +55,8 @@ public class DetermineCommonAncestorTaskParameterizedTest { private final ProtocolSchedule protocolSchedule = MainnetProtocolSchedule.create(); private static final BlockDataGenerator blockDataGenerator = new BlockDataGenerator(); + private final LabelledMetric ethTasksTimer = + NoOpMetricsSystem.NO_OP_LABELLED_TIMER; private static Block genesisBlock; private static MutableBlockchain localBlockchain; @@ -154,7 +159,8 @@ public void searchesAgainstNetwork() { protocolContext, ethContext, respondingEthPeer.getEthPeer(), - headerRequestSize); + headerRequestSize, + ethTasksTimer); final CompletableFuture future = task.run(); respondingEthPeer.respondWhile(responder, () -> !future.isDone()); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DetermineCommonAncestorTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DetermineCommonAncestorTaskTest.java index 9b8b5272fb..4088bd6f65 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DetermineCommonAncestorTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DetermineCommonAncestorTaskTest.java @@ -41,6 +41,9 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.util.ExceptionUtils; import tech.pegasys.pantheon.util.uint.UInt256; @@ -56,6 +59,8 @@ public class DetermineCommonAncestorTaskTest { private final ProtocolSchedule protocolSchedule = MainnetProtocolSchedule.create(); private final BlockDataGenerator blockDataGenerator = new BlockDataGenerator(); + private final LabelledMetric ethTasksTimer = + NoOpMetricsSystem.NO_OP_LABELLED_TIMER; private MutableBlockchain localBlockchain; private final int defaultHeaderRequestSize = 10; Block genesisBlock; @@ -110,7 +115,8 @@ public void shouldThrowExceptionNoCommonBlock() { protocolContext, ethContext, respondingEthPeer.getEthPeer(), - defaultHeaderRequestSize); + defaultHeaderRequestSize, + ethTasksTimer); final CompletableFuture future = task.run(); respondingEthPeer.respondWhile(responder, () -> !future.isDone()); @@ -146,7 +152,8 @@ public void shouldFailIfPeerDisconnects() { protocolContext, ethContext, respondingEthPeer.getEthPeer(), - defaultHeaderRequestSize); + defaultHeaderRequestSize, + ethTasksTimer); // Execute task and wait for response final AtomicReference failure = new AtomicReference<>(); @@ -216,7 +223,8 @@ public void shouldGracefullyHandleExecutionsForNoCommonAncestor() { protocolContext, ethContext, respondingEthPeer.getEthPeer(), - defaultHeaderRequestSize); + defaultHeaderRequestSize, + ethTasksTimer); final DetermineCommonAncestorTask spy = spy(task); // Execute task @@ -272,7 +280,8 @@ public void shouldIssueConsistentNumberOfRequestsToPeer() { protocolContext, ethContext, respondingEthPeer.getEthPeer(), - defaultHeaderRequestSize); + defaultHeaderRequestSize, + ethTasksTimer); final DetermineCommonAncestorTask spy = spy(task); // Execute task @@ -343,7 +352,8 @@ public void shouldShortCircuitOnHeaderInInitialRequest() { protocolContext, ethContext, respondingEthPeer.getEthPeer(), - defaultHeaderRequestSize); + defaultHeaderRequestSize, + ethTasksTimer); final DetermineCommonAncestorTask spy = spy(task); // Execute task @@ -370,7 +380,12 @@ public void returnsImmediatelyWhenThereIsNoWorkToDo() throws Exception { final EthTask task = DetermineCommonAncestorTask.create( - protocolSchedule, protocolContext, ethContext, peer, defaultHeaderRequestSize); + protocolSchedule, + protocolContext, + ethContext, + peer, + defaultHeaderRequestSize, + ethTasksTimer); final CompletableFuture result = task.run(); assertThat(result).isCompletedWithValue(genesisBlock.getHeader()); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java index 107f7b7d86..25e3a521b6 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java @@ -59,7 +59,8 @@ protected EthTask> createTask(final List requeste ethContext, referenceHeader, requestedData.size(), - maxRetries); + maxRetries, + ethTasksTimer); } @Test @@ -68,10 +69,16 @@ public void failsWhenPeerReturnsOnlyReferenceHeader() { EthProtocolManagerTestUtil.createPeer(ethProtocolManager); // Execute task and wait for response - BlockHeader referenceHeader = blockchain.getChainHeadHeader(); + final BlockHeader referenceHeader = blockchain.getChainHeadHeader(); final EthTask> task = DownloadHeaderSequenceTask.endingAtHeader( - protocolSchedule, protocolContext, ethContext, referenceHeader, 10, maxRetries); + protocolSchedule, + protocolContext, + ethContext, + referenceHeader, + 10, + maxRetries, + ethTasksTimer); final CompletableFuture> future = task.run(); // Respond with only the reference header @@ -91,23 +98,30 @@ public void failsWhenPeerReturnsOnlySubsetOfHeaders() { EthProtocolManagerTestUtil.createPeer(ethProtocolManager); // Execute task and wait for response - BlockHeader referenceHeader = blockchain.getChainHeadHeader(); + final BlockHeader referenceHeader = blockchain.getChainHeadHeader(); final EthTask> task = DownloadHeaderSequenceTask.endingAtHeader( - protocolSchedule, protocolContext, ethContext, referenceHeader, 10, maxRetries); + protocolSchedule, + protocolContext, + ethContext, + referenceHeader, + 10, + maxRetries, + ethTasksTimer); final CompletableFuture> future = task.run(); // Filter response to include only reference header and previous header final Responder fullResponder = RespondingEthPeer.blockchainResponder(blockchain); final Responder responder = (cap, message) -> { - Optional fullResponse = fullResponder.respond(cap, message); + final Optional fullResponse = fullResponder.respond(cap, message); if (!fullResponse.isPresent() || message.getCode() != EthPV62.GET_BLOCK_HEADERS) { return fullResponse; } - BlockHeadersMessage headersMessage = BlockHeadersMessage.readFrom(fullResponse.get()); + final BlockHeadersMessage headersMessage = + BlockHeadersMessage.readFrom(fullResponse.get()); // Filter for a subset of headers - List headerSubset = + final List headerSubset = Streams.stream(headersMessage.getHeaders(protocolSchedule)) .filter(h -> h.getNumber() >= referenceHeader.getNumber() - 1L) .collect(Collectors.toList()); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBlockFromPeerTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBlockFromPeerTaskTest.java index 596bf21a58..8b6122291c 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBlockFromPeerTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBlockFromPeerTaskTest.java @@ -47,7 +47,8 @@ protected Block generateDataToBeRequested() { @Override protected EthTask> createTask(final Block requestedData) { - return GetBlockFromPeerTask.create(protocolSchedule, ethContext, requestedData.getHash()); + return GetBlockFromPeerTask.create( + protocolSchedule, ethContext, requestedData.getHash(), ethTasksTimer); } @Override diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBodiesFromPeerTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBodiesFromPeerTaskTest.java index bf6a68c229..2642190fc9 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBodiesFromPeerTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBodiesFromPeerTaskTest.java @@ -20,6 +20,7 @@ import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResult; import tech.pegasys.pantheon.ethereum.eth.manager.EthTask; import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.PeerMessageTaskTest; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import java.util.ArrayList; import java.util.List; @@ -42,7 +43,8 @@ protected List generateDataToBeRequested() { protected EthTask>> createTask(final List requestedData) { final List headersToComplete = requestedData.stream().map(Block::getHeader).collect(Collectors.toList()); - return GetBodiesFromPeerTask.forHeaders(protocolSchedule, ethContext, headersToComplete); + return GetBodiesFromPeerTask.forHeaders( + protocolSchedule, ethContext, headersToComplete, NoOpMetricsSystem.NO_OP_LABELLED_TIMER); } @Override diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByHashTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByHashTaskTest.java index 5253cd0dee..cd25c884c2 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByHashTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByHashTaskTest.java @@ -61,7 +61,8 @@ protected EthTask>> createTask( ethContext, firstHeader.getHash(), firstHeader.getNumber(), - requestedData.size()); + requestedData.size(), + ethTasksTimer); } @Test @@ -108,7 +109,8 @@ private void getHeadersFromHash(final int skip, final boolean reverse) { startNumber, count, skip, - reverse); + reverse, + ethTasksTimer); final AtomicReference>> actualResult = new AtomicReference<>(); final AtomicBoolean done = new AtomicBoolean(false); final CompletableFuture>> future = task.run(); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByNumberTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByNumberTaskTest.java index b425489821..cce1a9851b 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByNumberTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByNumberTaskTest.java @@ -20,6 +20,7 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthTask; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.PeerMessageTaskTest; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import java.util.ArrayList; import java.util.List; @@ -56,7 +57,11 @@ protected EthTask>> createTask final List requestedData) { final BlockHeader firstHeader = requestedData.get(0); return GetHeadersFromPeerByNumberTask.startingAtNumber( - protocolSchedule, ethContext, firstHeader.getNumber(), requestedData.size()); + protocolSchedule, + ethContext, + firstHeader.getNumber(), + requestedData.size(), + NoOpMetricsSystem.NO_OP_LABELLED_TIMER); } @Test @@ -97,7 +102,13 @@ private void getHeadersFromHash(final int skip, final boolean reverse) { // Execute task and wait for response final AbstractGetHeadersFromPeerTask task = new GetHeadersFromPeerByNumberTask( - protocolSchedule, ethContext, startNumber, count, skip, reverse); + protocolSchedule, + ethContext, + startNumber, + count, + skip, + reverse, + NoOpMetricsSystem.NO_OP_LABELLED_TIMER); final AtomicReference>> actualResult = new AtomicReference<>(); final AtomicBoolean done = new AtomicBoolean(false); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTaskTest.java index 17c6c02924..abdb841f11 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTaskTest.java @@ -32,6 +32,7 @@ import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import java.util.ArrayList; import java.util.List; @@ -71,12 +72,13 @@ protected EthTask>> createTask(final List requ shortBlockchain, protocolContext.getWorldStateArchive(), protocolContext.getConsensusState()); - return ImportBlocksTask.fromHeader( + return ImportBlocksTask.fromHeader( protocolSchedule, modifiedContext, ethContext, firstBlock.getHeader(), - requestedData.size()); + requestedData.size(), + NoOpMetricsSystem.NO_OP_LABELLED_TIMER); } @Override diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PersistBlockTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PersistBlockTaskTest.java index a12235b329..f3598b8675 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PersistBlockTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PersistBlockTaskTest.java @@ -24,6 +24,9 @@ import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import java.util.Arrays; import java.util.Collections; @@ -41,6 +44,8 @@ public class PersistBlockTaskTest { private ProtocolSchedule protocolSchedule; private ProtocolContext protocolContext; private MutableBlockchain blockchain; + private final LabelledMetric ethTasksTimer = + NoOpMetricsSystem.NO_OP_LABELLED_TIMER; @Before public void setup() { @@ -61,7 +66,7 @@ public void importsValidBlock() throws Exception { // Create task final PersistBlockTask task = PersistBlockTask.create( - protocolSchedule, protocolContext, nextBlock, HeaderValidationMode.FULL); + protocolSchedule, protocolContext, nextBlock, HeaderValidationMode.FULL, ethTasksTimer); final CompletableFuture result = task.run(); Awaitility.await().atMost(30, SECONDS).until(result::isDone); @@ -83,7 +88,7 @@ public void failsToImportInvalidBlock() { // Create task final PersistBlockTask task = PersistBlockTask.create( - protocolSchedule, protocolContext, nextBlock, HeaderValidationMode.FULL); + protocolSchedule, protocolContext, nextBlock, HeaderValidationMode.FULL, ethTasksTimer); final CompletableFuture result = task.run(); Awaitility.await().atMost(30, SECONDS).until(result::isDone); @@ -107,7 +112,11 @@ public void importsValidBlockSequence() throws Exception { // Create task final CompletableFuture> task = PersistBlockTask.forSequentialBlocks( - protocolSchedule, protocolContext, nextBlocks, HeaderValidationMode.FULL) + protocolSchedule, + protocolContext, + nextBlocks, + HeaderValidationMode.FULL, + ethTasksTimer) .get(); Awaitility.await().atMost(30, SECONDS).until(task::isDone); @@ -133,7 +142,11 @@ public void failsToImportInvalidBlockSequenceWhereSecondBlockFails() throws Exce // Create task final CompletableFuture> task = PersistBlockTask.forSequentialBlocks( - protocolSchedule, protocolContext, nextBlocks, HeaderValidationMode.FULL) + protocolSchedule, + protocolContext, + nextBlocks, + HeaderValidationMode.FULL, + ethTasksTimer) .get(); Awaitility.await().atMost(30, SECONDS).until(task::isDone); @@ -158,7 +171,11 @@ public void failsToImportInvalidBlockSequenceWhereFirstBlockFails() throws Excep // Create task final CompletableFuture> task = PersistBlockTask.forSequentialBlocks( - protocolSchedule, protocolContext, nextBlocks, HeaderValidationMode.FULL) + protocolSchedule, + protocolContext, + nextBlocks, + HeaderValidationMode.FULL, + ethTasksTimer) .get(); Awaitility.await().atMost(30, SECONDS).until(task::isDone); @@ -183,7 +200,11 @@ public void importsValidUnorderedBlocks() throws Exception { // Create task final CompletableFuture> task = PersistBlockTask.forUnorderedBlocks( - protocolSchedule, protocolContext, nextBlocks, HeaderValidationMode.FULL) + protocolSchedule, + protocolContext, + nextBlocks, + HeaderValidationMode.FULL, + ethTasksTimer) .get(); Awaitility.await().atMost(30, SECONDS).until(task::isDone); @@ -211,7 +232,11 @@ public void importsInvalidUnorderedBlock() throws Exception { // Create task final CompletableFuture> task = PersistBlockTask.forUnorderedBlocks( - protocolSchedule, protocolContext, nextBlocks, HeaderValidationMode.FULL) + protocolSchedule, + protocolContext, + nextBlocks, + HeaderValidationMode.FULL, + ethTasksTimer) .get(); Awaitility.await().atMost(30, SECONDS).until(task::isDone); @@ -236,7 +261,11 @@ public void importsInvalidUnorderedBlocks() throws Exception { // Create task final CompletableFuture> task = PersistBlockTask.forUnorderedBlocks( - protocolSchedule, protocolContext, nextBlocks, HeaderValidationMode.FULL) + protocolSchedule, + protocolContext, + nextBlocks, + HeaderValidationMode.FULL, + ethTasksTimer) .get(); Awaitility.await().atMost(30, SECONDS).until(task::isDone); @@ -263,7 +292,11 @@ public void importsUnorderedBlocksWithMixOfValidAndInvalidBlocks() throws Except // Create task final CompletableFuture> task = PersistBlockTask.forUnorderedBlocks( - protocolSchedule, protocolContext, nextBlocks, HeaderValidationMode.FULL) + protocolSchedule, + protocolContext, + nextBlocks, + HeaderValidationMode.FULL, + ethTasksTimer) .get(); Awaitility.await().atMost(30, SECONDS).until(task::isDone); @@ -286,7 +319,7 @@ public void cancelBeforeRunning() throws Exception { // Create task final PersistBlockTask task = PersistBlockTask.create( - protocolSchedule, protocolContext, nextBlock, HeaderValidationMode.FULL); + protocolSchedule, protocolContext, nextBlock, HeaderValidationMode.FULL, ethTasksTimer); task.cancel(); final CompletableFuture result = task.run(); @@ -306,9 +339,9 @@ public void cancelAfterRunning() throws Exception { // Create task final PersistBlockTask task = PersistBlockTask.create( - protocolSchedule, protocolContext, nextBlock, HeaderValidationMode.FULL); + protocolSchedule, protocolContext, nextBlock, HeaderValidationMode.FULL, ethTasksTimer); final PersistBlockTask taskSpy = Mockito.spy(task); - Mockito.doNothing().when(taskSpy).executeTask(); + Mockito.doNothing().when(taskSpy).executeTaskTimed(); final CompletableFuture result = taskSpy.run(); taskSpy.cancel(); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTaskTest.java index c663f8636a..7559b14a9a 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTaskTest.java @@ -84,7 +84,9 @@ protected EthTask> createTask(final List requestedData) { modifiedContext, ethContext, 1, - new BlockHeader[] {previousBlock.getHeader(), lastBlock.getHeader()}); + ethTasksTimer, + previousBlock.getHeader(), + lastBlock.getHeader()); } @Override @@ -116,6 +118,7 @@ public void betweenContiguousHeadersSucceeds() { modifiedContext, ethContext, 1, + ethTasksTimer, firstBlock.getHeader(), secondBlock.getHeader()); @@ -165,6 +168,7 @@ public void betweenUnconnectedHeadersFails() { modifiedContext, ethContext, 1, + ethTasksTimer, fakeFirstBlock.getHeader(), thirdBlock.getHeader()); @@ -214,7 +218,7 @@ public void shouldSyncInSequencesOfChunksSequentially() { protocolContext.getConsensusState()); final EthTask> task = PipelinedImportChainSegmentTask.forCheckpoints( - protocolSchedule, modifiedContext, ethContext, 1, checkpointHeaders); + protocolSchedule, modifiedContext, ethContext, 1, ethTasksTimer, checkpointHeaders); // Execute task and wait for response final AtomicReference> actualResult = new AtomicReference<>(); @@ -270,7 +274,7 @@ public void shouldPipelineChainSegmentImportsUpToMaxActiveChunks() { protocolContext.getConsensusState()); final EthTask> task = PipelinedImportChainSegmentTask.forCheckpoints( - protocolSchedule, modifiedContext, ethContext, 2, checkpointHeaders); + protocolSchedule, modifiedContext, ethContext, 2, ethTasksTimer, checkpointHeaders); // Execute task and wait for response final AtomicReference> actualResult = new AtomicReference<>(); @@ -330,7 +334,7 @@ public void shouldPipelineChainSegmentImportsWithinMaxActiveChunks() { protocolContext.getConsensusState()); final EthTask> task = PipelinedImportChainSegmentTask.forCheckpoints( - protocolSchedule, modifiedContext, ethContext, 3, checkpointHeaders); + protocolSchedule, modifiedContext, ethContext, 3, ethTasksTimer, checkpointHeaders); // Execute task and wait for response final AtomicReference> actualResult = new AtomicReference<>(); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/WaitForPeerTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/WaitForPeerTaskTest.java index 4af5190140..3b0d69fcb5 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/WaitForPeerTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/WaitForPeerTaskTest.java @@ -18,6 +18,9 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; import tech.pegasys.pantheon.ethereum.eth.manager.EthTask; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -29,18 +32,20 @@ public class WaitForPeerTaskTest { private EthProtocolManager ethProtocolManager; private EthContext ethContext; + private LabelledMetric ethTasksTimer; @Before public void setupTest() { ethProtocolManager = EthProtocolManagerTestUtil.create(); ethContext = ethProtocolManager.ethContext(); + ethTasksTimer = NoOpMetricsSystem.NO_OP_LABELLED_TIMER; } @Test public void completesWhenPeerConnects() throws ExecutionException, InterruptedException { // Execute task and wait for response final AtomicBoolean successful = new AtomicBoolean(false); - final EthTask task = WaitForPeerTask.create(ethContext); + final EthTask task = WaitForPeerTask.create(ethContext, ethTasksTimer); final CompletableFuture future = task.run(); future.whenComplete( (result, error) -> { @@ -55,7 +60,7 @@ public void completesWhenPeerConnects() throws ExecutionException, InterruptedEx @Test public void doesNotCompleteWhenNoPeerConnects() throws ExecutionException, InterruptedException { final AtomicBoolean successful = new AtomicBoolean(false); - final EthTask task = WaitForPeerTask.create(ethContext); + final EthTask task = WaitForPeerTask.create(ethContext, ethTasksTimer); final CompletableFuture future = task.run(); future.whenComplete( (result, error) -> { @@ -70,7 +75,7 @@ public void doesNotCompleteWhenNoPeerConnects() throws ExecutionException, Inter @Test public void cancel() throws ExecutionException, InterruptedException { // Execute task - final EthTask task = WaitForPeerTask.create(ethContext); + final EthTask task = WaitForPeerTask.create(ethContext, ethTasksTimer); final CompletableFuture future = task.run(); assertThat(future.isDone()).isFalse(); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/WaitForPeersTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/WaitForPeersTaskTest.java index fc6b96a69c..bff593a19f 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/WaitForPeersTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/WaitForPeersTaskTest.java @@ -18,6 +18,9 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; import tech.pegasys.pantheon.ethereum.eth.manager.EthTask; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -29,18 +32,20 @@ public class WaitForPeersTaskTest { private EthProtocolManager ethProtocolManager; private EthContext ethContext; + private LabelledMetric ethTasksTimer; @Before public void setupTest() { ethProtocolManager = EthProtocolManagerTestUtil.create(); ethContext = ethProtocolManager.ethContext(); + ethTasksTimer = NoOpMetricsSystem.NO_OP_LABELLED_TIMER; } @Test public void completesWhenPeersConnects() throws ExecutionException, InterruptedException { // Execute task and wait for response final AtomicBoolean successful = new AtomicBoolean(false); - final EthTask task = WaitForPeersTask.create(ethContext, 2); + final EthTask task = WaitForPeersTask.create(ethContext, 2, ethTasksTimer); final CompletableFuture future = task.run(); future.whenComplete( (result, error) -> { @@ -56,7 +61,7 @@ public void completesWhenPeersConnects() throws ExecutionException, InterruptedE @Test public void doesNotCompleteWhenNoPeerConnects() throws ExecutionException, InterruptedException { final AtomicBoolean successful = new AtomicBoolean(false); - final EthTask task = WaitForPeersTask.create(ethContext, 2); + final EthTask task = WaitForPeersTask.create(ethContext, 2, ethTasksTimer); final CompletableFuture future = task.run(); future.whenComplete( (result, error) -> { @@ -72,7 +77,7 @@ public void doesNotCompleteWhenNoPeerConnects() throws ExecutionException, Inter public void doesNotCompleteWhenSomePeersConnects() throws ExecutionException, InterruptedException { final AtomicBoolean successful = new AtomicBoolean(false); - final EthTask task = WaitForPeersTask.create(ethContext, 2); + final EthTask task = WaitForPeersTask.create(ethContext, 2, ethTasksTimer); final CompletableFuture future = task.run(); future.whenComplete( (result, error) -> { @@ -88,7 +93,7 @@ public void doesNotCompleteWhenSomePeersConnects() @Test public void cancel() throws ExecutionException, InterruptedException { // Execute task - final EthTask task = WaitForPeersTask.create(ethContext, 2); + final EthTask task = WaitForPeersTask.create(ethContext, 2, ethTasksTimer); final CompletableFuture future = task.run(); assertThat(future.isDone()).isFalse(); diff --git a/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java b/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java index d1b12e40f3..2b5d00e7e7 100644 --- a/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java +++ b/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java @@ -17,7 +17,8 @@ public enum MetricCategory { RPC("rpc"), JVM("jvm", false), PROCESS("process", false), - BLOCKCHAIN("blockchain"); + BLOCKCHAIN("blockchain"), + SYNCHRONIZER("synchronizer"); private final String name; private final boolean pantheonSpecific; diff --git a/metrics/src/main/java/tech/pegasys/pantheon/metrics/noop/NoOpMetricsSystem.java b/metrics/src/main/java/tech/pegasys/pantheon/metrics/noop/NoOpMetricsSystem.java index 9b0f43d9a7..54e44f8f94 100644 --- a/metrics/src/main/java/tech/pegasys/pantheon/metrics/noop/NoOpMetricsSystem.java +++ b/metrics/src/main/java/tech/pegasys/pantheon/metrics/noop/NoOpMetricsSystem.java @@ -28,6 +28,7 @@ public class NoOpMetricsSystem implements MetricsSystem { private static final Counter NO_OP_COUNTER = new NoOpCounter(); private static final TimingContext NO_OP_TIMING_CONTEXT = () -> 0; private static final OperationTimer NO_OP_TIMER = () -> NO_OP_TIMING_CONTEXT; + public static final LabelledMetric NO_OP_LABELLED_TIMER = label -> NO_OP_TIMER; @Override public LabelledMetric createLabelledCounter( diff --git a/metrics/src/main/java/tech/pegasys/pantheon/metrics/prometheus/PrometheusMetricsSystem.java b/metrics/src/main/java/tech/pegasys/pantheon/metrics/prometheus/PrometheusMetricsSystem.java index f97fd24a53..00ed5fd6f1 100644 --- a/metrics/src/main/java/tech/pegasys/pantheon/metrics/prometheus/PrometheusMetricsSystem.java +++ b/metrics/src/main/java/tech/pegasys/pantheon/metrics/prometheus/PrometheusMetricsSystem.java @@ -35,7 +35,7 @@ import io.prometheus.client.Collector.MetricFamilySamples.Sample; import io.prometheus.client.Collector.Type; import io.prometheus.client.Counter; -import io.prometheus.client.Histogram; +import io.prometheus.client.Summary; import io.prometheus.client.hotspot.BufferPoolsExports; import io.prometheus.client.hotspot.ClassLoadingExports; import io.prometheus.client.hotspot.GarbageCollectorExports; @@ -84,9 +84,18 @@ public LabelledMetric createLabelledTimer( final String name, final String help, final String... labelNames) { - final Histogram histogram = Histogram.build(name, help).labelNames(labelNames).create(); - addCollector(category, histogram); - return new PrometheusTimer(histogram); + final Summary summary = + Summary.build(convertToPrometheusName(category, name), help) + .quantile(0.2, 0.02) + .quantile(0.5, 0.05) + .quantile(0.8, 0.02) + .quantile(0.95, 0.005) + .quantile(0.99, 0.001) + .quantile(1.0, 0) + .labelNames(labelNames) + .create(); + addCollector(category, summary); + return new PrometheusTimer(summary); } @Override @@ -127,6 +136,9 @@ private Observation createObservationFromSample( if (familySamples.type == Type.HISTOGRAM) { return convertHistogramSampleNamesToLabels(category, sample, familySamples); } + if (familySamples.type == Type.SUMMARY) { + return convertSummarySampleNamesToLabels(category, sample, familySamples); + } return new Observation( category, convertFromPrometheusName(category, sample.name), @@ -149,6 +161,23 @@ private Observation convertHistogramSampleNamesToLabels( labelValues); } + private Observation convertSummarySampleNamesToLabels( + final MetricCategory category, final Sample sample, final MetricFamilySamples familySamples) { + final List labelValues = new ArrayList<>(sample.labelValues); + if (sample.name.endsWith("_sum")) { + labelValues.add("sum"); + } else if (sample.name.endsWith("_count")) { + labelValues.add("count"); + } else { + labelValues.add(labelValues.size() - 1, "quantile"); + } + return new Observation( + category, + convertFromPrometheusName(category, familySamples.name), + sample.value, + labelValues); + } + private String convertToPrometheusName(final MetricCategory category, final String name) { return prometheusPrefix(category) + name; } diff --git a/metrics/src/main/java/tech/pegasys/pantheon/metrics/prometheus/PrometheusTimer.java b/metrics/src/main/java/tech/pegasys/pantheon/metrics/prometheus/PrometheusTimer.java index 55e75a4ff5..f60ef7fb6c 100644 --- a/metrics/src/main/java/tech/pegasys/pantheon/metrics/prometheus/PrometheusTimer.java +++ b/metrics/src/main/java/tech/pegasys/pantheon/metrics/prometheus/PrometheusTimer.java @@ -15,20 +15,19 @@ import tech.pegasys.pantheon.metrics.LabelledMetric; import tech.pegasys.pantheon.metrics.OperationTimer; -import io.prometheus.client.Histogram; -import io.prometheus.client.Histogram.Child; +import io.prometheus.client.Summary; class PrometheusTimer implements LabelledMetric { - private final Histogram histogram; + private final Summary summary; - public PrometheusTimer(final Histogram histogram) { - this.histogram = histogram; + public PrometheusTimer(final Summary summary) { + this.summary = summary; } @Override public OperationTimer labels(final String... labels) { - final Child metric = histogram.labels(labels); + final Summary.Child metric = summary.labels(labels); return () -> metric.startTimer()::observeDuration; } } diff --git a/metrics/src/test/java/tech/pegasys/pantheon/metrics/prometheus/PrometheusMetricsSystemTest.java b/metrics/src/test/java/tech/pegasys/pantheon/metrics/prometheus/PrometheusMetricsSystemTest.java index 243695355a..eeadcdf7e6 100644 --- a/metrics/src/test/java/tech/pegasys/pantheon/metrics/prometheus/PrometheusMetricsSystemTest.java +++ b/metrics/src/test/java/tech/pegasys/pantheon/metrics/prometheus/PrometheusMetricsSystemTest.java @@ -91,23 +91,14 @@ public void shouldCreateObservationsFromTimer() { assertThat(metricsSystem.getMetrics()) .usingElementComparator(IGNORE_VALUES) .containsExactlyInAnyOrder( - new Observation(RPC, "request", null, asList("bucket", "0.005")), - new Observation(RPC, "request", null, asList("bucket", "0.01")), - new Observation(RPC, "request", null, asList("bucket", "0.025")), - new Observation(RPC, "request", null, asList("bucket", "0.05")), - new Observation(RPC, "request", null, asList("bucket", "0.075")), - new Observation(RPC, "request", null, asList("bucket", "0.1")), - new Observation(RPC, "request", null, asList("bucket", "0.25")), - new Observation(RPC, "request", null, asList("bucket", "0.5")), - new Observation(RPC, "request", null, asList("bucket", "0.75")), - new Observation(RPC, "request", null, asList("bucket", "1.0")), - new Observation(RPC, "request", null, asList("bucket", "2.5")), - new Observation(RPC, "request", null, asList("bucket", "5.0")), - new Observation(RPC, "request", null, asList("bucket", "7.5")), - new Observation(RPC, "request", null, asList("bucket", "10.0")), - new Observation(RPC, "request", null, asList("bucket", "+Inf")), + new Observation(RPC, "request", null, asList("quantile", "0.2")), + new Observation(RPC, "request", null, asList("quantile", "0.5")), + new Observation(RPC, "request", null, asList("quantile", "0.8")), + new Observation(RPC, "request", null, asList("quantile", "0.95")), + new Observation(RPC, "request", null, asList("quantile", "0.99")), + new Observation(RPC, "request", null, asList("quantile", "1.0")), new Observation(RPC, "request", null, singletonList("sum")), - new Observation(RPC, "request", 1d, singletonList("count"))); + new Observation(RPC, "request", null, singletonList("count"))); } @Test @@ -120,21 +111,12 @@ public void shouldCreateObservationsFromTimerWithLabels() { assertThat(metricsSystem.getMetrics()) .usingElementComparator(IGNORE_VALUES) // We don't know how long it will actually take. .containsExactlyInAnyOrder( - new Observation(RPC, "request", null, asList("method", "bucket", "0.005")), - new Observation(RPC, "request", null, asList("method", "bucket", "0.01")), - new Observation(RPC, "request", null, asList("method", "bucket", "0.025")), - new Observation(RPC, "request", null, asList("method", "bucket", "0.05")), - new Observation(RPC, "request", null, asList("method", "bucket", "0.075")), - new Observation(RPC, "request", null, asList("method", "bucket", "0.1")), - new Observation(RPC, "request", null, asList("method", "bucket", "0.25")), - new Observation(RPC, "request", null, asList("method", "bucket", "0.5")), - new Observation(RPC, "request", null, asList("method", "bucket", "0.75")), - new Observation(RPC, "request", null, asList("method", "bucket", "1.0")), - new Observation(RPC, "request", null, asList("method", "bucket", "2.5")), - new Observation(RPC, "request", null, asList("method", "bucket", "5.0")), - new Observation(RPC, "request", null, asList("method", "bucket", "7.5")), - new Observation(RPC, "request", null, asList("method", "bucket", "10.0")), - new Observation(RPC, "request", null, asList("method", "bucket", "+Inf")), + new Observation(RPC, "request", null, asList("method", "quantile", "0.2")), + new Observation(RPC, "request", null, asList("method", "quantile", "0.5")), + new Observation(RPC, "request", null, asList("method", "quantile", "0.8")), + new Observation(RPC, "request", null, asList("method", "quantile", "0.95")), + new Observation(RPC, "request", null, asList("method", "quantile", "0.99")), + new Observation(RPC, "request", null, asList("method", "quantile", "1.0")), new Observation(RPC, "request", null, asList("method", "sum")), new Observation(RPC, "request", null, asList("method", "count"))); } diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java index dd5959b64c..fd5ab472bd 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java @@ -54,6 +54,7 @@ import tech.pegasys.pantheon.ethereum.p2p.config.SubProtocolConfiguration; import tech.pegasys.pantheon.ethereum.storage.StorageProvider; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; +import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricsSystem; import java.io.IOException; @@ -156,7 +157,8 @@ public static PantheonController init( protocolContext, ethProtocolManager.ethContext(), syncState, - metricsSystem); + metricsSystem.createLabelledTimer( + MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName")); final TransactionPool transactionPool = TransactionPoolFactory.createTransactionPool( diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java index c600d5f591..2eab1f6b60 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java @@ -53,6 +53,7 @@ import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol; import tech.pegasys.pantheon.ethereum.storage.StorageProvider; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; +import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricsSystem; import java.io.IOException; @@ -167,7 +168,8 @@ public static PantheonController init( protocolContext, ethProtocolManager.ethContext(), syncState, - metricsSystem); + metricsSystem.createLabelledTimer( + MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName")); final Runnable closer = () -> { diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java index 5a679e74cf..42597ec8fe 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java @@ -61,6 +61,7 @@ import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol; import tech.pegasys.pantheon.ethereum.storage.StorageProvider; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; +import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricsSystem; import java.io.IOException; @@ -164,7 +165,8 @@ public static PantheonController init( protocolContext, ethProtocolManager.ethContext(), syncState, - metricsSystem); + metricsSystem.createLabelledTimer( + MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName")); final TransactionPool transactionPool = TransactionPoolFactory.createTransactionPool( diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java index 190bc62c8c..20c332332b 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java @@ -41,6 +41,7 @@ import tech.pegasys.pantheon.ethereum.p2p.api.ProtocolManager; import tech.pegasys.pantheon.ethereum.p2p.config.SubProtocolConfiguration; import tech.pegasys.pantheon.ethereum.storage.StorageProvider; +import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricsSystem; import java.io.IOException; @@ -129,7 +130,8 @@ public static PantheonController init( protocolContext, ethProtocolManager.ethContext(), syncState, - metricsSystem); + metricsSystem.createLabelledTimer( + MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName")); final TransactionPool transactionPool = TransactionPoolFactory.createTransactionPool(