diff --git a/beacon_chain/era_db.nim b/beacon_chain/era_db.nim index 035c293b4a..53a5f7d146 100644 --- a/beacon_chain/era_db.nim +++ b/beacon_chain/era_db.nim @@ -169,7 +169,7 @@ proc verify*(f: EraFile, cfg: RuntimeConfig): Result[Eth2Digest, string] = rng = HmacDrbgContext.new() taskpool = Taskpool.new() - var verifier = BatchVerifier(rng: rng, taskpool: taskpool) + var verifier = BatchVerifier.init(rng, taskpool) var tmp: seq[byte] ? f.getStateSSZ(startSlot, tmp) diff --git a/beacon_chain/gossip_processing/batch_validation.nim b/beacon_chain/gossip_processing/batch_validation.nim index 55dd4ce3e9..2a3c4d554a 100644 --- a/beacon_chain/gossip_processing/batch_validation.nim +++ b/beacon_chain/gossip_processing/batch_validation.nim @@ -8,17 +8,18 @@ {.push raises: [].} import - std/[deques, sequtils], + std/[atomics, deques, sequtils], + stew/ptrops, metrics, # Status - chronicles, chronos, + chronicles, chronos, chronos/threadsync, ../spec/signatures_batch, ../consensus_object_pools/[blockchain_dag, spec_cache] export signatures_batch, blockchain_dag logScope: - topics = "gossip_checks" + topics = "batch_validation" declareCounter batch_verification_batches, "Total number of batches processed" @@ -26,6 +27,8 @@ declareCounter batch_verification_signatures, "Total number of verified signatures before aggregation" declareCounter batch_verification_aggregates, "Total number of verified signatures after aggregation" +declareCounter batch_verification_batches_skipped, + "Total number of batches skipped" # Batched gossip validation # ---------------------------------------------------------------- @@ -52,93 +55,119 @@ declareCounter batch_verification_aggregates, # different signatures, as most validators have the same view of the network - # at least 2/3 or we're in deep trouble :) +const + BatchAttAccumTime = 10.milliseconds + ## Amount of time spent accumulating signatures from the network before + ## performing verification + + BatchedCryptoSize = 72 + ## Threshold for immediate trigger of batch verification. + ## A balance between throughput and worst case latency. + ## At least 6 so that the constant factors + ## (RNG for blinding and Final Exponentiation) + ## are amortized, but not too big as we need to redo checks one-by-one if + ## one failed. + ## The current value is based on experiments, where 72 gives an average + ## batch size of ~30 signatures per batch, or 2.5 signatures per aggregate + ## (meaning an average of 12 verifications per batch which on a raspberry + ## should be doable in less than 30ms). In the same experiment, a value of + ## 36 resulted in 17-18 signatures per batch and 1.7-1.9 signatures per + ## aggregate - this node was running on mainnet with + ## `--subscribe-all-subnets` turned on - typical nodes will see smaller + ## batches. + + InflightVerifications = 2 + ## Maximum number of concurrent in-flight verifications + type BatchResult* {.pure.} = enum Invalid # Invalid by default Valid Timeout - Eager = proc(): bool {.gcsafe, raises: [Defect].} ##\ - ## Callback that returns true if eager processing should be done to lower - ## latency at the expense of spending more cycles validating things, creating - ## a crude timesharing priority mechanism. + Eager = proc(): bool {.gcsafe, raises: [].} + ## Callback that returns true if eager processing should be done to lower + ## latency at the expense of spending more cycles validating things, + ## creating a crude timesharing priority mechanism. BatchItem* = object sigset: SignatureSet fut: Future[BatchResult] Batch* = object + ## A batch represents up to BatchedCryptoSize non-aggregated signatures created: Moment sigsets: seq[SignatureSet] items: seq[BatchItem] + VerifierItem = object + verifier: ref BatchVerifier + signal: ThreadSignalPtr + inflight: Future[void] + BatchCrypto* = object - # Each batch is bounded by BatchedCryptoSize which was chosen: - # - based on "nimble bench" in nim-blscurve - # so that low power devices like Raspberry Pi 4 can process - # that many batched verifications within ~30ms on average - # - based on the accumulation rate of attestations and aggregates - # in large instances which were 12000 per slot (12s) - # hence 1 per ms (but the pattern is bursty around the 4s mark) - # The number of batches is bounded by time - batch validation is skipped if - # we can't process them in the time that one slot takes, and we return - # timeout instead which prevents the gossip layer from forwarding the - # batch. batches: Deque[ref Batch] - eager: Eager ##\ - ## Eager is used to enable eager processing of attestations when it's - ## prudent to do so (instead of leaving the CPU for other, presumably more - ## important work like block processing) - ## - verifier: BatchVerifier + eager: Eager + ## Eager is used to enable eager processing of attestations when it's + ## prudent to do so (instead of leaving the CPU for other, presumably more + ## important work like block processing) + + taskpool: Taskpool + rng: ref HmacDrbgContext + + verifiers: array[InflightVerifications, VerifierItem] + ## Each batch verification reqires a separate verifier + verifier: int pruneTime: Moment ## last time we had to prune something - # `nim-metrics` library is a bit too slow to update on every batch, so - # we accumulate here instead counts: tuple[signatures, batches, aggregates: int64] + # `nim-metrics` library is a bit too slow to update on every batch, so + # we accumulate here instead - # Most scheduled checks require this immutable value, so don't require it - # to be provided separately each time genesis_validators_root: Eth2Digest + # Most scheduled checks require this immutable value, so don't require it + # to be provided separately each time -const - # We cap waiting for an idle slot in case there's a lot of network traffic - # taking up all CPU - we don't want to _completely_ stop processing - # attestations - doing so also allows us to benefit from more batching / - # larger network reads when under load. - BatchAttAccumTime = 10.milliseconds + processor: Future[void] - # Threshold for immediate trigger of batch verification. - # A balance between throughput and worst case latency. - # At least 6 so that the constant factors - # (RNG for blinding and Final Exponentiation) - # are amortized, but not too big as we need to redo checks one-by-one if one - # failed. - # The current value is based on experiments, where 72 gives an average batch - # size of ~30 signatures per batch, or 2.5 signatures per aggregate (meaning - # an average of 12 verifications per batch which on a raspberry should be - # doable in less than 30ms). In the same experiment, a value of 36 resulted - # in 17-18 signatures per batch and 1.7-1.9 signatures per aggregate - this - # node was running on mainnet with `--subscribe-all-subnets` turned on - - # typical nodes will see smaller batches. - BatchedCryptoSize = 72 + BatchTask = object + ok: Atomic[bool] + setsPtr: ptr UncheckedArray[SignatureSet] + numSets: int + secureRandomBytes: array[32, byte] + taskpool: Taskpool + cache: ptr BatchedBLSVerifierCache + signal: ThreadSignalPtr proc new*( T: type BatchCrypto, rng: ref HmacDrbgContext, eager: Eager, genesis_validators_root: Eth2Digest, taskpool: TaskPoolPtr): - ref BatchCrypto = - (ref BatchCrypto)( - verifier: BatchVerifier(rng: rng, taskpool: taskpool), + Result[ref BatchCrypto, string] = + let res = (ref BatchCrypto)( + rng: rng, taskpool: taskpool, eager: eager, genesis_validators_root: genesis_validators_root, pruneTime: Moment.now()) -func len(batch: Batch): int = - batch.items.len() + for i in 0..= BatchedCryptoSize + batch.items.len() >= BatchedCryptoSize + +func half(batch: Batch): bool = + batch.items.len() >= (BatchedCryptoSize div 2) proc complete(batchItem: var BatchItem, v: BatchResult) = batchItem.fut.complete(v) @@ -146,26 +175,36 @@ proc complete(batchItem: var BatchItem, v: BatchResult) = proc complete(batchItem: var BatchItem, ok: bool) = batchItem.fut.complete(if ok: BatchResult.Valid else: BatchResult.Invalid) - batchItem.fut = nil proc skip(batch: var Batch) = for res in batch.items.mitems(): res.complete(BatchResult.Timeout) -proc pruneBatchQueue(batchCrypto: ref BatchCrypto) = - let - now = Moment.now() +proc complete(batchCrypto: var BatchCrypto, batch: var Batch, ok: bool) = + if ok: + for res in batch.items.mitems(): + res.complete(BatchResult.Valid) + else: + # Batched verification failed meaning that some of the signature checks + # failed, but we don't know which ones - check each signature separately + # instead + debug "batch crypto - failure, falling back", + items = batch.items.len() - # If batches haven't been processed for more than 12 seconds - while batchCrypto.batches.len() > 0: - if batchCrypto.batches[0][].created + SECONDS_PER_SLOT.int64.seconds > now: - break - if batchCrypto.pruneTime + SECONDS_PER_SLOT.int64.seconds > now: - notice "Batch queue pruned, skipping attestation validation", - batches = batchCrypto.batches.len() - batchCrypto.pruneTime = Moment.now() + for item in batch.items.mitems(): + item.complete(blsVerify item.sigset) + + batchCrypto.counts.batches += 1 + batchCrypto.counts.signatures += batch.items.len() + batchCrypto.counts.aggregates += batch.sigsets.len() + + if batchCrypto.counts.batches >= 256: + # Not too often, so as not to overwhelm our metrics + batch_verification_batches.inc(batchCrypto.counts.batches) + batch_verification_signatures.inc(batchCrypto.counts.signatures) + batch_verification_aggregates.inc(batchCrypto.counts.aggregates) - batchCrypto.batches.popFirst()[].skip() + reset(batchCrypto.counts) func combine(a: var Signature, b: Signature) = var tmp = AggregateSignature.init(CookedSig(a)) @@ -177,135 +216,158 @@ func combine(a: var PublicKey, b: PublicKey) = tmp.aggregate(b) a = PublicKey(tmp.finish()) -proc processBatch(batchCrypto: ref BatchCrypto) = - ## Process one batch, if there is any - - # Pruning the queue here makes sure we catch up with processing if need be - batchCrypto.pruneBatchQueue() # Skip old batches +proc batchVerifyTask(task: ptr BatchTask) {.nimcall.} = + # Task suitable for running in taskpools - look, no GC! + let + tp = task[].taskpool + ok = tp.spawn batchVerify( + tp, task[].cache, task[].setsPtr, task[].numSets, + addr task[].secureRandomBytes) + + task[].ok.store(sync ok) + + discard task[].signal.fireSync() + +proc batchVerifyAsync*( + verifier: ref BatchVerifier, signal: ThreadSignalPtr, + batch: ref Batch): Future[bool] {.async.} = + var task = BatchTask( + setsPtr: makeUncheckedArray(baseAddr batch[].sigsets), + numSets: batch[].sigsets.len, + taskpool: verifier[].taskpool, + cache: addr verifier[].sigVerifCache, + signal: signal, + ) + verifier[].rng[].generate(task.secureRandomBytes) + + # task will stay allocated in the async environment at least until the signal + # has fired at which point it's safe to release it + let taskPtr = addr task + verifier[].taskpool.spawn batchVerifyTask(taskPtr) + await signal.wait() + task.ok.load() + +proc processBatch( + batchCrypto: ref BatchCrypto, batch: ref Batch, + verifier: ref BatchVerifier, signal: ThreadSignalPtr) {.async.} = + let + numSets = batch[].sigsets.len() - if batchCrypto[].batches.len() == 0: - # No more batches left, they might have been eagerly processed or pruned + if numSets == 0: + # Nothing to do in this batch, can happen when a batch is created without + # there being any signatures successfully added to it return let - batch = batchCrypto[].batches.popFirst() - batchSize = batch[].sigsets.len() + startTick = Moment.now() + + # If the hardware is too slow to keep up or an event caused a temporary + # buildup of signature verification tasks, the batch will be dropped so as to + # recover and not cause even further buildup - this puts an (elastic) upper + # bound on the amount of queued-up work + if batch[].created + SECONDS_PER_SLOT.int64.seconds < startTick: + if batchCrypto.pruneTime + SECONDS_PER_SLOT.int64.seconds < startTick: + notice "Batch queue pruned, skipping attestation validation", + batches = batchCrypto.batches.len() + batchCrypto.pruneTime = startTick - if batchSize == 0: - # Nothing to do in this batch, can happen when a batch is created without - # there being any signatures successfully added to it - discard - else: - trace "batch crypto - starting", - batchSize - - let - startTick = Moment.now() - ok = - if batchSize == 1: blsVerify(batch[].sigsets[0]) - else: batchCrypto.verifier.batchVerify(batch[].sigsets) - - trace "batch crypto - finished", - batchSize, - cryptoVerified = ok, - batchDur = Moment.now() - startTick + batch[].skip() - if ok: - for res in batch.items.mitems(): - res.complete(BatchResult.Valid) - else: - # Batched verification failed meaning that some of the signature checks - # failed, but we don't know which ones - check each signature separately - # instead - debug "batch crypto - failure, falling back", - items = batch[].items.len() + batch_verification_batches_skipped.inc() - for item in batch[].items.mitems(): - item.complete(blsVerify item.sigset) + return - batchCrypto[].counts.batches += 1 - batchCrypto[].counts.signatures += batch[].items.len() - batchCrypto[].counts.aggregates += batch[].sigsets.len() + trace "batch crypto - starting", numSets, items = batch[].items.len - if batchCrypto[].counts.batches >= 256: - # Not too often, so as not to overwhelm our metrics - batch_verification_batches.inc(batchCrypto[].counts.batches) - batch_verification_signatures.inc(batchCrypto[].counts.signatures) - batch_verification_aggregates.inc(batchCrypto[].counts.aggregates) + if numSets == 1: + # Avoid taskpools overhead when there's only a single signature to verify + trace "batch crypto - finished (1)", + numSets, items = batch[].items.len(), + batchDur = Moment.now() - startTick - reset(batchCrypto[].counts) + batchCrypto[].complete(batch[], blsVerify(batch[].sigsets[0])) + return -proc deferCryptoProcessing(batchCrypto: ref BatchCrypto) {.async.} = - ## Process pending crypto check after some time has passed - the time is - ## chosen such that there's time to fill the batch but not so long that - ## latency across the network is negatively affected - await sleepAsync(BatchAttAccumTime) + let ok = await batchVerifyAsync(verifier, signal, batch) - # Take the first batch in the queue and process it - if eager processing has - # stolen it already, that's fine - batchCrypto.processBatch() + trace "batch crypto - finished", + numSets, items = batch[].items.len(), ok, + batchDur = Moment.now() - startTick -proc getBatch(batchCrypto: ref BatchCrypto): (ref Batch, bool) = - # Get a batch suitable for attestation processing - in particular, attestation - # batches might be skipped - batchCrypto.pruneBatchQueue() + batchCrypto[].complete(batch[], ok) +proc processLoop(batchCrypto: ref BatchCrypto) {.async.} = + ## Process pending crypto check after some time has passed - the time is + ## chosen such that there's time to fill the batch but not so long that + ## latency across the network is negatively affected + while batchCrypto[].batches.len() > 0: + # When eager processing is enabled, we can start processing the next batch + # as soon as it's full - otherwise, wait for more signatures to accumulate + if not batchCrypto[].batches.peekFirst()[].full() or + not batchCrypto[].eager(): + + await sleepAsync(BatchAttAccumTime) + + # We still haven't filled even half the batch - wait a bit more (and give + # chonos time to work its task queue) + if not batchCrypto[].batches.peekFirst()[].half(): + await sleepAsync(BatchAttAccumTime div 2) + + # Pick the "next" verifier + let verifier = (batchCrypto[].verifier + 1) mod batchCrypto.verifiers.len + batchCrypto[].verifier = verifier + + # BatchVerifier:s may not be shared, so make sure the previous round + # using this verifier is finished + if batchCrypto[].verifiers[verifier].inflight != nil and + not batchCrypto[].verifiers[verifier].inflight.finished(): + await batchCrypto[].verifiers[verifier].inflight + + batchCrypto[].verifiers[verifier].inflight = batchCrypto.processBatch( + batchCrypto[].batches.popFirst(), + batchCrypto[].verifiers[verifier].verifier, + batchCrypto[].verifiers[verifier].signal) + +proc getBatch(batchCrypto: var BatchCrypto): ref Batch = if batchCrypto.batches.len() == 0 or batchCrypto.batches.peekLast[].full(): - # There are no batches in progress - start a new batch and schedule a - # deferred task to eventually handle it let batch = (ref Batch)(created: Moment.now()) - batchCrypto[].batches.addLast(batch) - (batch, true) + batchCrypto.batches.addLast(batch) + batch else: - let batch = batchCrypto[].batches.peekLast() - # len will be 0 when the batch was created but nothing added to it - # because of early failures - (batch, batch[].len() == 0) - -proc scheduleBatch(batchCrypto: ref BatchCrypto, fresh: bool) = - if fresh: - # Every time we start a new round of batching, we need to launch a deferred - # task that will compute the result of the batch eventually in case the - # batch is never filled or eager processing is blocked - asyncSpawn batchCrypto.deferCryptoProcessing() - - if batchCrypto.batches.len() > 0 and - batchCrypto.batches.peekFirst()[].full() and - batchCrypto.eager(): - # If there's a full batch, process it eagerly assuming the callback allows - batchCrypto.processBatch() - -template withBatch( - batchCrypto: ref BatchCrypto, name: cstring, - body: untyped): Future[BatchResult] = - block: - let - (batch, fresh) = batchCrypto.getBatch() - - let - fut = newFuture[BatchResult](name) - sigset = body - - var found = false - # Find existing signature sets with the same message - if we can verify an - # aggregate instead of several signatures, that is _much_ faster - for item in batch[].sigsets.mitems(): - if item.message == sigset.message: - item.signature.combine(sigset.signature) - item.pubkey.combine(sigset.pubkey) - found = true - break - - if not found: - batch[].sigsets.add sigset - - # We need to keep the "original" sigset to allow verifying each signature - # one by one in the case the combined operation fails - batch[].items.add(BatchItem(sigset: sigset, fut: fut)) - - batchCrypto.scheduleBatch(fresh) - fut + batchCrypto.batches.peekLast() + +proc scheduleProcessor(batchCrypto: ref BatchCrypto) = + if batchCrypto.processor == nil or batchCrypto.processor.finished(): + batchCrypto.processor = batchCrypto.processLoop() + +proc verifySoon( + batchCrypto: ref BatchCrypto, name: static string, + sigset: SignatureSet): Future[BatchResult] = + let + batch = batchCrypto[].getBatch() + fut = newFuture[BatchResult](name) + + var found = false + # Find existing signature sets with the same message - if we can verify an + # aggregate instead of several signatures, that is _much_ faster + for item in batch[].sigsets.mitems(): + if item.message == sigset.message: + item.signature.combine(sigset.signature) + item.pubkey.combine(sigset.pubkey) + found = true + break + + if not found: + batch[].sigsets.add sigset + + # We need to keep the "original" sigset to allow verifying each signature + # one by one in the case the combined operation fails + batch[].items.add(BatchItem(sigset: sigset, fut: fut)) + + batchCrypto.scheduleProcessor() + + fut # See also verify_attestation_signature proc scheduleAttestationCheck*( @@ -325,7 +387,7 @@ proc scheduleAttestationCheck*( let sig = signature.load().valueOr: return err("attestation: cannot load signature") - fut = batchCrypto.withBatch("batch_validation.scheduleAttestationCheck"): + fut = batchCrypto.verifySoon("batch_validation.scheduleAttestationCheck"): attestation_signature_set( fork, batchCrypto[].genesis_validators_root, attestationData, pubkey, sig) @@ -370,15 +432,15 @@ proc scheduleAggregateChecks*( return err("aggregateAndProof: invalid aggregate signature") let - aggregatorFut = batchCrypto.withBatch("scheduleAggregateChecks.aggregator"): + aggregatorFut = batchCrypto.verifySoon("scheduleAggregateChecks.aggregator"): aggregate_and_proof_signature_set( fork, batchCrypto[].genesis_validators_root, aggregate_and_proof, aggregatorKey, aggregatorSig) - slotFut = batchCrypto.withBatch("scheduleAggregateChecks.selection_proof"): + slotFut = batchCrypto.verifySoon("scheduleAggregateChecks.selection_proof"): slot_signature_set( fork, batchCrypto[].genesis_validators_root, aggregate.data.slot, aggregatorKey, slotSig) - aggregateFut = batchCrypto.withBatch("scheduleAggregateChecks.aggregate"): + aggregateFut = batchCrypto.verifySoon("scheduleAggregateChecks.aggregate"): attestation_signature_set( fork, batchCrypto[].genesis_validators_root, aggregate.data, aggregateKey, aggregateSig) @@ -402,7 +464,7 @@ proc scheduleSyncCommitteeMessageCheck*( let sig = signature.load().valueOr: return err("SyncCommitteMessage: cannot load signature") - fut = batchCrypto.withBatch("scheduleSyncCommitteeMessageCheck"): + fut = batchCrypto.verifySoon("scheduleSyncCommitteeMessageCheck"): sync_committee_message_signature_set( fork, batchCrypto[].genesis_validators_root, slot, beacon_block_root, pubkey, sig) @@ -444,15 +506,15 @@ proc scheduleContributionChecks*( dag, dag.syncCommitteeParticipants(contribution.slot + 1, subcommitteeIdx), contribution.aggregation_bits) let - aggregatorFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.aggregator"): + aggregatorFut = batchCrypto.verifySoon("scheduleContributionAndProofChecks.aggregator"): contribution_and_proof_signature_set( fork, batchCrypto[].genesis_validators_root, contribution_and_proof, aggregatorKey, aggregatorSig) - proofFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.selection_proof"): + proofFut = batchCrypto.verifySoon("scheduleContributionAndProofChecks.selection_proof"): sync_committee_selection_proof_set( fork, batchCrypto[].genesis_validators_root, contribution.slot, subcommitteeIdx, aggregatorKey, proofSig) - contributionFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.contribution"): + contributionFut = batchCrypto.verifySoon("scheduleContributionAndProofChecks.contribution"): sync_committee_message_signature_set( fork, batchCrypto[].genesis_validators_root, contribution.slot, contribution.beacon_block_root, contributionKey, contributionSig) @@ -460,11 +522,10 @@ proc scheduleContributionChecks*( ok((aggregatorFut, proofFut, contributionFut, contributionSig)) proc scheduleBlsToExecutionChangeCheck*( - batchCrypto: ref BatchCrypto, - genesisFork: Fork, - signedBLSToExecutionChange: SignedBLSToExecutionChange): Result[tuple[ - blsToExecutionFut: Future[BatchResult], - sig: CookedSig], cstring] = + batchCrypto: ref BatchCrypto, + genesis_fork: Fork, signedBLSToExecutionChange: SignedBLSToExecutionChange, + dag: ChainDAGRef): + Result[tuple[fut: Future[BatchResult], sig: CookedSig], cstring] = ## Schedule crypto verification of all signatures in a ## SignedBLSToExecutionChange message ## @@ -481,16 +542,15 @@ proc scheduleBlsToExecutionChangeCheck*( let # Only called when matching already-known withdrawal credentials, so it's # resistant to allowing loadWithCache DoSing - validatorChangePubkey = - signedBLSToExecutionChange.message.from_bls_pubkey.loadWithCache.valueOr: - return err("scheduleBlsToExecutionChangeCheck: cannot load BLS to withdrawals pubkey") - - validatorChangeSig = signedBLSToExecutionChange.signature.load().valueOr: + pubkey = dag.validatorKey( + signedBLSToExecutionChange.message.validator_index).valueOr: + return err("SignedAggregateAndProof: invalid validator index") + sig = signedBLSToExecutionChange.signature.load().valueOr: return err("scheduleBlsToExecutionChangeCheck: invalid validator change signature") - validatorChangeFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.contribution"): + fut = batchCrypto.verifySoon("scheduleContributionAndProofChecks.contribution"): bls_to_execution_change_signature_set( genesis_fork, batchCrypto[].genesis_validators_root, signedBLSToExecutionChange.message, - validatorChangePubkey, validatorChangeSig) + pubkey, sig) - ok((validatorChangeFut, validatorChangeSig)) + ok((fut, sig)) diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index 5613abf87b..51e9c1aa74 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -148,7 +148,7 @@ proc new*(T: type BlockProcessor, validatorMonitor: validatorMonitor, blobQuarantine: blobQuarantine, getBeaconTime: getBeaconTime, - verifier: BatchVerifier(rng: rng, taskpool: taskpool), + verifier: BatchVerifier.init(rng, taskpool) ) # Sync callbacks diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index 2b50413080..6799085833 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -194,7 +194,8 @@ proc new*(T: type Eth2Processor, # Only run eager attestation signature verification if we're not # processing blocks in order to give priority to block processing eager = proc(): bool = not blockProcessor[].hasBlocks(), - genesis_validators_root = dag.genesis_validators_root, taskpool) + genesis_validators_root = dag.genesis_validators_root, taskpool).expect( + "working batcher") ) # Each validator logs, validates then passes valid data to its destination diff --git a/beacon_chain/gossip_processing/gossip_validation.nim b/beacon_chain/gossip_processing/gossip_validation.nim index b1cdcfe9c3..8778cfb8a2 100644 --- a/beacon_chain/gossip_processing/gossip_validation.nim +++ b/beacon_chain/gossip_processing/gossip_validation.nim @@ -974,7 +974,7 @@ proc validateBlsToExecutionChange*( # BLS to execution change signatures are batch-verified let deferredCrypto = batchCrypto.scheduleBlsToExecutionChangeCheck( - pool.dag.cfg.genesisFork, signed_address_change) + pool.dag.cfg.genesisFork, signed_address_change, pool.dag) if deferredCrypto.isErr(): return pool.checkedReject(deferredCrypto.error) diff --git a/beacon_chain/spec/signatures_batch.nim b/beacon_chain/spec/signatures_batch.nim index 1d848ccf85..23586b5756 100644 --- a/beacon_chain/spec/signatures_batch.nim +++ b/beacon_chain/spec/signatures_batch.nim @@ -28,13 +28,30 @@ type TaskPoolPtr* = Taskpool BatchVerifier* = object - sigVerifCache*: BatchedBLSVerifierCache ##\ - ## A cache for batch BLS signature verification contexts - rng*: ref HmacDrbgContext ##\ - ## A reference to the Nimbus application-wide RNG - + sigVerifCache*: BatchedBLSVerifierCache + ## A cache for batch BLS signature verification contexts + rng*: ref HmacDrbgContext + ## A reference to the Nimbus application-wide RNG taskpool*: TaskPoolPtr +proc init*( + T: type BatchVerifier, rng: ref HmacDrbgContext, + taskpool: TaskPoolPtr): BatchVerifier = + BatchVerifier( + sigVerifCache: BatchedBLSVerifierCache.init(taskpool), + rng: rng, + taskpool: taskpool, + ) + +proc new*( + T: type BatchVerifier, rng: ref HmacDrbgContext, + taskpool: TaskPoolPtr): ref BatchVerifier = + (ref BatchVerifier)( + sigVerifCache: BatchedBLSVerifierCache.init(taskpool), + rng: rng, + taskpool: taskpool, + ) + func `$`*(s: SignatureSet): string = "(pubkey: 0x" & s.pubkey.toHex() & ", signing_root: 0x" & s.message.toHex() & @@ -433,7 +450,5 @@ proc collectSignatureSets*( proc batchVerify*(verifier: var BatchVerifier, sigs: openArray[SignatureSet]): bool = let bytes = verifier.rng[].generate(array[32, byte]) - try: - verifier.taskpool.batchVerify(verifier.sigVerifCache, sigs, bytes) - except Exception as exc: - raiseAssert exc.msg # Shouldn't happen + + verifier.taskpool.batchVerify(verifier.sigVerifCache, sigs, bytes) diff --git a/research/block_sim.nim b/research/block_sim.nim index 2e50daca4e..163a7cad6f 100644 --- a/research/block_sim.nim +++ b/research/block_sim.nim @@ -320,12 +320,13 @@ cli do(slots = SLOTS_PER_EPOCH * 6, eth1Chain = Eth1Chain.init(cfg, db, 0, default Eth2Digest) merkleizer = DepositsMerkleizer.init(depositTreeSnapshot.depositContractState) taskpool = Taskpool.new() - verifier = BatchVerifier(rng: rng, taskpool: taskpool) + verifier = BatchVerifier.init(rng, taskpool) quarantine = newClone(Quarantine.init()) attPool = AttestationPool.init(dag, quarantine) batchCrypto = BatchCrypto.new( rng, eager = func(): bool = true, - genesis_validators_root = dag.genesis_validators_root, taskpool) + genesis_validators_root = dag.genesis_validators_root, + taskpool).expect("working batcher") syncCommitteePool = newClone SyncCommitteeMsgPool.init(rng, cfg) timers: array[Timers, RunningStat] attesters: RunningStat diff --git a/tests/consensus_spec/test_fixture_fork_choice.nim b/tests/consensus_spec/test_fixture_fork_choice.nim index d77781fd6c..24d02b616e 100644 --- a/tests/consensus_spec/test_fixture_fork_choice.nim +++ b/tests/consensus_spec/test_fixture_fork_choice.nim @@ -351,7 +351,7 @@ proc doRunTest(path: string, fork: ConsensusFork) = let rng = HmacDrbgContext.new() taskpool = Taskpool.new() - var verifier = BatchVerifier(rng: rng, taskpool: taskpool) + var verifier = BatchVerifier.init(rng, taskpool) let steps = loadOps(path, fork) var time = stores.fkChoice.checkpoints.time diff --git a/tests/test_attestation_pool.nim b/tests/test_attestation_pool.nim index c7074e727d..6a5ad558da 100644 --- a/tests/test_attestation_pool.nim +++ b/tests/test_attestation_pool.nim @@ -65,7 +65,7 @@ suite "Attestation pool processing" & preset(): ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 6), validatorMonitor, {}) taskpool = Taskpool.new() - verifier = BatchVerifier(rng: rng, taskpool: taskpool) + verifier = BatchVerifier.init(rng, taskpool) quarantine = newClone(Quarantine.init()) pool = newClone(AttestationPool.init(dag, quarantine)) state = newClone(dag.headState) diff --git a/tests/test_block_processor.nim b/tests/test_block_processor.nim index 39bb04d921..d6ad9c4c2e 100644 --- a/tests/test_block_processor.nim +++ b/tests/test_block_processor.nim @@ -40,7 +40,7 @@ suite "Block processor" & preset(): validatorMonitor = newClone(ValidatorMonitor.init()) dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {}) taskpool = Taskpool.new() - verifier = BatchVerifier(rng: rng, taskpool: taskpool) + verifier = BatchVerifier.init(rng, taskpool) quarantine = newClone(Quarantine.init()) blobQuarantine = newClone(BlobQuarantine()) attestationPool = newClone(AttestationPool.init(dag, quarantine)) diff --git a/tests/test_blockchain_dag.nim b/tests/test_blockchain_dag.nim index ec8eaaa45b..1e3cdd6d83 100644 --- a/tests/test_blockchain_dag.nim +++ b/tests/test_blockchain_dag.nim @@ -44,7 +44,8 @@ suite "Block pool processing" & preset(): db = makeTestDB(SLOTS_PER_EPOCH) validatorMonitor = newClone(ValidatorMonitor.init()) dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {}) - verifier = BatchVerifier(rng: rng, taskpool: Taskpool.new()) + taskpool = Taskpool.new() + verifier = BatchVerifier.init(rng, taskpool) quarantine = Quarantine.init() state = newClone(dag.headState) cache = StateCache() @@ -293,7 +294,8 @@ suite "Block pool altair processing" & preset(): db = makeTestDB(SLOTS_PER_EPOCH) validatorMonitor = newClone(ValidatorMonitor.init()) dag = init(ChainDAGRef, cfg, db, validatorMonitor, {}) - verifier = BatchVerifier(rng: rng, taskpool: Taskpool.new()) + taskpool = Taskpool.new() + verifier = BatchVerifier.init(rng, taskpool) quarantine = Quarantine.init() state = newClone(dag.headState) cache = StateCache() @@ -369,7 +371,8 @@ suite "chain DAG finalization tests" & preset(): db = makeTestDB(SLOTS_PER_EPOCH) validatorMonitor = newClone(ValidatorMonitor.init()) dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {}) - verifier = BatchVerifier(rng: rng, taskpool: Taskpool.new()) + taskpool = Taskpool.new() + verifier = BatchVerifier.init(rng, taskpool) quarantine = Quarantine.init() cache = StateCache() info = ForkedEpochInfo() @@ -639,7 +642,8 @@ suite "Old database versions" & preset(): {skipBlsValidation})) genBlock = get_initial_beacon_block(genState[]) var - verifier = BatchVerifier(rng: rng, taskpool: Taskpool.new()) + taskpool = Taskpool.new() + verifier = BatchVerifier.init(rng, taskpool) quarantine = Quarantine.init() test "pre-1.1.0": @@ -687,7 +691,8 @@ suite "Diverging hardforks": db = makeTestDB(SLOTS_PER_EPOCH) validatorMonitor = newClone(ValidatorMonitor.init()) dag = init(ChainDAGRef, phase0RuntimeConfig, db, validatorMonitor, {}) - verifier = BatchVerifier(rng: rng, taskpool: Taskpool.new()) + taskpool = Taskpool.new() + verifier = BatchVerifier.init(rng, taskpool) quarantine = newClone(Quarantine.init()) cache = StateCache() info = ForkedEpochInfo() @@ -929,7 +934,7 @@ suite "Backfill": taskpool = Taskpool.new() var cache: StateCache - verifier = BatchVerifier(rng: rng, taskpool: taskpool) + verifier = BatchVerifier.init(rng, taskpool) quarantine = newClone(Quarantine.init()) let @@ -1070,7 +1075,8 @@ suite "Latest valid hash" & preset(): db = makeTestDB(SLOTS_PER_EPOCH) validatorMonitor = newClone(ValidatorMonitor.init()) dag = init(ChainDAGRef, runtimeConfig, db, validatorMonitor, {}) - verifier = BatchVerifier(rng: rng, taskpool: Taskpool.new()) + taskpool = Taskpool.new() + verifier = BatchVerifier.init(rng, taskpool) quarantine = newClone(Quarantine.init()) cache = StateCache() info = ForkedEpochInfo() @@ -1139,7 +1145,8 @@ suite "Pruning": tmpState = assignClone(dag.headState) var - verifier = BatchVerifier(rng: rng, taskpool: Taskpool.new()) + taskpool = Taskpool.new() + verifier = BatchVerifier.init(rng, taskpool) quarantine = Quarantine.init() cache = StateCache() blocks = @[dag.head] @@ -1203,7 +1210,7 @@ suite "Shufflings": taskpool = Taskpool.new() var - verifier = BatchVerifier(rng: rng, taskpool: taskpool) + verifier = BatchVerifier.init(rng, taskpool) graffiti: GraffitiBytes proc addBlocks(blocks: uint64, attested: bool, cache: var StateCache) = inc distinctBase(graffiti)[0] # Avoid duplicate blocks across branches diff --git a/tests/test_gossip_validation.nim b/tests/test_gossip_validation.nim index 49c0dd4dd0..584c525512 100644 --- a/tests/test_gossip_validation.nim +++ b/tests/test_gossip_validation.nim @@ -43,7 +43,7 @@ suite "Gossip validation " & preset(): ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 3), validatorMonitor, {}) taskpool = Taskpool.new() - verifier = BatchVerifier(rng: rng, taskpool: taskpool) + verifier = BatchVerifier.init(rng, taskpool) quarantine = newClone(Quarantine.init()) pool = newClone(AttestationPool.init(dag, quarantine)) state = newClone(dag.headState) @@ -51,7 +51,8 @@ suite "Gossip validation " & preset(): info = ForkedEpochInfo() batchCrypto = BatchCrypto.new( rng, eager = proc(): bool = false, - genesis_validators_root = dag.genesis_validators_root, taskpool) + genesis_validators_root = dag.genesis_validators_root, taskpool).expect( + "working batcher") # Slot 0 is a finalized slot - won't be making attestations for it.. check: process_slots( @@ -190,7 +191,7 @@ suite "Gossip validation - Extra": # Not based on preset config quarantine = newClone(Quarantine.init()) rng = HmacDrbgContext.new() var - verifier = BatchVerifier(rng: rng, taskpool: Taskpool.new()) + verifier = BatchVerifier.init(rng, taskpool) dag = block: let validatorMonitor = newClone(ValidatorMonitor.init()) @@ -223,7 +224,8 @@ suite "Gossip validation - Extra": # Not based on preset config let batchCrypto = BatchCrypto.new( rng, eager = proc(): bool = false, - genesis_validators_root = dag.genesis_validators_root, taskpool) + genesis_validators_root = dag.genesis_validators_root, taskpool).expect( + "working batcher") var state = assignClone(dag.headState.altairData) diff --git a/tests/test_light_client.nim b/tests/test_light_client.nim index a317944a29..688f66abd6 100644 --- a/tests/test_light_client.nim +++ b/tests/test_light_client.nim @@ -96,7 +96,8 @@ suite "Light client" & preset(): quarantine = newClone(Quarantine.init()) rng = HmacDrbgContext.new() taskpool = Taskpool.new() - var verifier = BatchVerifier(rng: rng, taskpool: taskpool) + var + verifier = BatchVerifier.init(rng, taskpool) test "Pre-Altair": # Genesis diff --git a/tests/test_light_client_processor.nim b/tests/test_light_client_processor.nim index 9e8cb03667..d6526df3d6 100644 --- a/tests/test_light_client_processor.nim +++ b/tests/test_light_client_processor.nim @@ -44,7 +44,7 @@ suite "Light client processor" & preset(): quarantine = newClone(Quarantine.init()) rng = HmacDrbgContext.new() taskpool = Taskpool.new() - var verifier = BatchVerifier(rng: rng, taskpool: taskpool) + var verifier =BatchVerifier.init(rng, taskpool) var cache: StateCache proc addBlocks(blocks: uint64, syncCommitteeRatio: float) = diff --git a/vendor/nim-blscurve b/vendor/nim-blscurve index b71a163126..be1ad53e5b 160000 --- a/vendor/nim-blscurve +++ b/vendor/nim-blscurve @@ -1 +1 @@ -Subproject commit b71a16312699f6d26dee6a710b4aafea6ee5680d +Subproject commit be1ad53e5b3bdb4dc15db018f4382874896c60a5 diff --git a/vendor/nim-chronos b/vendor/nim-chronos index 0035f4fa66..1d6ded6d9f 160000 --- a/vendor/nim-chronos +++ b/vendor/nim-chronos @@ -1 +1 @@ -Subproject commit 0035f4fa6692e85756aa192b4df84c21d3cacacb +Subproject commit 1d6ded6d9febb194f9d061e96b3f476cf4432767 diff --git a/vendor/nim-taskpools b/vendor/nim-taskpools index 89d693d3ff..478b163946 160000 --- a/vendor/nim-taskpools +++ b/vendor/nim-taskpools @@ -1 +1 @@ -Subproject commit 89d693d3ffc9e53aa470a9a05166e4f2b58d282a +Subproject commit 478b163946e8cc769edece77b1c11aad14a041ce