From b51152153a747f8e826bb0989539f87f0102a474 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Tue, 18 Apr 2023 00:31:54 +0300 Subject: [PATCH] VC: Hardening and optimizing time handling. (#4743) * Fix durationToNextSlot() and durationToNextEpoch() to work not only after Genesis, but also before Genesis. Change VC pre-genesis behavior, add runPreGenesisWaitingLoop() and runGenesisWaitingLoop(). Add checkedWaitForSlot() and checkedWaitForNextSlot() to strictly check current time and print warnings. Fix VC main loop to use checkedWaitForNextSlot(). Fix attestation_service to run attestations processing only until the end of the duty slot. Change attestation_service main loop to use checkedWaitForNextSlot(). Change block_service to properly cancel all the pending proposer tasks. Use checkedWaitForSlot to wait for block proposal. Fix block_service waitForBlockPublished() to be compatible with BN. Fix sync_committee_service to avoid asyncSpawn. Fix sync_committee_service to run only until the end of the duty slot. Fix sync_committee_service to use checkedWaitForNextSlot(). * Refactor validator logging. Fix aggregated attestation publishing missing delay. * Fix doppelganger detection should not start at pre-genesis time. Fix fallback service sync status spam. Fix false `sync committee subnets subscription error`. * Address review comments part 1. * Address review comments. * Fix condition issue for near genesis waiting loop. * Address review comments. * Address review comments 2. --- beacon_chain/beacon_clock.nim | 37 +- beacon_chain/nimbus_validator_client.nim | 204 +++++--- .../validator_client/attestation_service.nim | 148 +++--- .../validator_client/block_service.nim | 256 ++++++---- beacon_chain/validator_client/common.nim | 109 ++++- .../validator_client/doppelganger_service.nim | 15 + .../validator_client/duties_service.nim | 460 +++++++++--------- .../validator_client/fallback_service.nim | 23 +- .../validator_client/fork_service.nim | 67 +-- .../sync_committee_service.nim | 58 ++- 10 files changed, 861 insertions(+), 516 deletions(-) diff --git a/beacon_chain/beacon_clock.nim b/beacon_chain/beacon_clock.nim index 1bc87a62c2..a914174c4b 100644 --- a/beacon_chain/beacon_clock.nim +++ b/beacon_chain/beacon_clock.nim @@ -70,18 +70,39 @@ proc fromNow*(c: BeaconClock, slot: Slot): tuple[inFuture: bool, offset: Duratio c.fromNow(slot.start_beacon_time()) proc durationToNextSlot*(c: BeaconClock): Duration = - let (afterGenesis, slot) = c.now().toSlot() - if afterGenesis: - c.fromNow(slot + 1'u64).offset + let + currentTime = c.now() + currentSlot = currentTime.toSlot() + + if currentSlot.afterGenesis: + let nextSlot = currentSlot.slot + 1 + chronos.nanoseconds( + (nextSlot.start_beacon_time() - currentTime).nanoseconds) else: - c.fromNow(Slot(0)).offset + # absoluteTime = BeaconTime(-currentTime.ns_since_genesis). + let + absoluteTime = Slot(0).start_beacon_time() + + (Slot(0).start_beacon_time() - currentTime) + timeToNextSlot = absoluteTime - currentSlot.slot.start_beacon_time() + chronos.nanoseconds(timeToNextSlot.nanoseconds) proc durationToNextEpoch*(c: BeaconClock): Duration = - let (afterGenesis, slot) = c.now().toSlot() - if afterGenesis: - c.fromNow((slot.epoch + 1).start_slot()).offset + let + currentTime = c.now() + currentSlot = currentTime.toSlot() + + if currentSlot.afterGenesis: + let nextEpochSlot = (currentSlot.slot.epoch() + 1).start_slot() + chronos.nanoseconds( + (nextEpochSlot.start_beacon_time() - currentTime).nanoseconds) else: - c.fromNow(Epoch(0).start_slot()).offset + # absoluteTime = BeaconTime(-currentTime.ns_since_genesis). + let + absoluteTime = Slot(0).start_beacon_time() + + (Slot(0).start_beacon_time() - currentTime) + timeToNextEpoch = absoluteTime - + currentSlot.slot.epoch().start_slot().start_beacon_time() + chronos.nanoseconds(timeToNextEpoch.nanoseconds) func saturate*(d: tuple[inFuture: bool, offset: Duration]): Duration = if d.inFuture: d.offset else: seconds(0) diff --git a/beacon_chain/nimbus_validator_client.nim b/beacon_chain/nimbus_validator_client.nim index f8a822f91e..9d537f7f01 100644 --- a/beacon_chain/nimbus_validator_client.nim +++ b/beacon_chain/nimbus_validator_client.nim @@ -9,9 +9,12 @@ import libp2p/crypto/crypto, ./rpc/rest_key_management_api, ./validator_client/[ - common, fallback_service, duties_service, fork_service, + common, fallback_service, duties_service, fork_service, block_service, doppelganger_service, attestation_service, sync_committee_service] +const + PREGENESIS_EPOCHS_COUNT = 1 + proc initGenesis(vc: ValidatorClientRef): Future[RestGenesis] {.async.} = info "Initializing genesis", nodes_count = len(vc.beaconNodes) var nodes = vc.beaconNodes @@ -93,16 +96,22 @@ proc initValidators(vc: ValidatorClientRef): Future[bool] {.async.} = proc initClock(vc: ValidatorClientRef): Future[BeaconClock] {.async.} = # This procedure performs initialization of BeaconClock using current genesis # information. It also performs waiting for genesis. - let res = BeaconClock.init(vc.beaconGenesis.genesis_time) - let currentSlot = res.now().slotOrZero() - let currentEpoch = currentSlot.epoch() - info "Initializing beacon clock", - genesis_time = vc.beaconGenesis.genesis_time, - current_slot = currentSlot, current_epoch = currentEpoch - let genesisTime = res.fromNow(start_beacon_time(Slot(0))) + let + res = BeaconClock.init(vc.beaconGenesis.genesis_time) + currentTime = res.now() + currentSlot = currentTime.slotOrZero() + currentEpoch = currentSlot.epoch() + genesisTime = res.fromNow(Slot(0)) + if genesisTime.inFuture: - notice "Waiting for genesis", genesisIn = genesisTime.offset - await sleepAsync(genesisTime.offset) + info "Initializing beacon clock", + genesis_time = vc.beaconGenesis.genesis_time, + current_slot = "", current_epoch = "", + time_to_genesis = genesisTime.offset + else: + info "Initializing beacon clock", + genesis_time = vc.beaconGenesis.genesis_time, + current_slot = currentSlot, current_epoch = currentEpoch return res proc initMetrics(vc: ValidatorClientRef): Future[bool] {.async.} = @@ -139,58 +148,66 @@ proc shutdownSlashingProtection(vc: ValidatorClientRef) = info "Closing slashing protection", path = vc.config.validatorsDir() vc.attachedValidators[].slashingProtection.close() -proc onSlotStart(vc: ValidatorClientRef, wallTime: BeaconTime, - lastSlot: Slot): Future[bool] {.async.} = - ## Called at the beginning of a slot - usually every slot, but sometimes might - ## skip a few in case we're running late. - ## wallTime: current system time - we will strive to perform all duties up - ## to this point in time - ## lastSlot: the last slot that we successfully processed, so we know where to - ## start work from - there might be jumps if processing is delayed +proc runVCSlotLoop(vc: ValidatorClientRef) {.async.} = + var + startTime = vc.beaconClock.now() + curSlot = startTime.slotOrZero() + nextSlot = curSlot + 1 # No earlier than GENESIS_SLOT + 1 + timeToNextSlot = nextSlot.start_beacon_time() - startTime - let - # The slot we should be at, according to the clock - beaconTime = wallTime - wallSlot = wallTime.toSlot() + info "Scheduling first slot action", + start_time = shortLog(startTime), + current_slot = shortLog(curSlot), + next_slot = shortLog(nextSlot), + time_to_next_slot = shortLog(timeToNextSlot) - let - # If everything was working perfectly, the slot that we should be processing - expectedSlot = lastSlot + 1 - delay = wallTime - expectedSlot.start_beacon_time() + var currentSlot = Opt.some(curSlot) - if checkIfShouldStopAtEpoch(wallSlot.slot, vc.config.stopAtEpoch): - return true + while true: + currentSlot = await vc.checkedWaitForNextSlot(currentSlot, ZeroTimeDiff, + true) + if currentSlot.isNone(): + ## Fatal log line should be printed by checkedWaitForNextSlot(). + return - if len(vc.beaconNodes) > 1: let - counts = vc.getNodeCounts() - # Good nodes are nodes which can be used for ALL the requests. - goodNodes = counts.data[int(RestBeaconNodeStatus.Synced)] - # Viable nodes are nodes which can be used only SOME of the requests. - viableNodes = counts.data[int(RestBeaconNodeStatus.OptSynced)] + - counts.data[int(RestBeaconNodeStatus.NotSynced)] + - counts.data[int(RestBeaconNodeStatus.Compatible)] - # Bad nodes are nodes which can't be used at all. - badNodes = counts.data[int(RestBeaconNodeStatus.Offline)] + - counts.data[int(RestBeaconNodeStatus.Online)] + - counts.data[int(RestBeaconNodeStatus.Incompatible)] - info "Slot start", - slot = shortLog(wallSlot.slot), - attestationIn = vc.getDurationToNextAttestation(wallSlot.slot), - blockIn = vc.getDurationToNextBlock(wallSlot.slot), - validators = vc.attachedValidators[].count(), - good_nodes = goodNodes, viable_nodes = viableNodes, bad_nodes = badNodes, - delay = shortLog(delay) - else: - info "Slot start", - slot = shortLog(wallSlot.slot), - attestationIn = vc.getDurationToNextAttestation(wallSlot.slot), - blockIn = vc.getDurationToNextBlock(wallSlot.slot), - validators = vc.attachedValidators[].count(), - node_status = $vc.beaconNodes[0].status, - delay = shortLog(delay) - - return false + wallTime = vc.beaconClock.now() + wallSlot = currentSlot.get() + delay = wallTime - wallSlot.start_beacon_time() + + if checkIfShouldStopAtEpoch(wallSlot, vc.config.stopAtEpoch): + return + + if len(vc.beaconNodes) > 1: + let + counts = vc.getNodeCounts() + # Good nodes are nodes which can be used for ALL the requests. + goodNodes = counts.data[int(RestBeaconNodeStatus.Synced)] + # Viable nodes are nodes which can be used only SOME of the requests. + viableNodes = counts.data[int(RestBeaconNodeStatus.OptSynced)] + + counts.data[int(RestBeaconNodeStatus.NotSynced)] + + counts.data[int(RestBeaconNodeStatus.Compatible)] + # Bad nodes are nodes which can't be used at all. + badNodes = counts.data[int(RestBeaconNodeStatus.Offline)] + + counts.data[int(RestBeaconNodeStatus.Online)] + + counts.data[int(RestBeaconNodeStatus.Incompatible)] + info "Slot start", + slot = shortLog(wallSlot), + epoch = shortLog(wallSlot.epoch()), + attestationIn = vc.getDurationToNextAttestation(wallSlot), + blockIn = vc.getDurationToNextBlock(wallSlot), + validators = vc.attachedValidators[].count(), + good_nodes = goodNodes, viable_nodes = viableNodes, + bad_nodes = badNodes, delay = shortLog(delay) + else: + info "Slot start", + slot = shortLog(wallSlot), + epoch = shortLog(wallSlot.epoch()), + attestationIn = vc.getDurationToNextAttestation(wallSlot), + blockIn = vc.getDurationToNextBlock(wallSlot), + validators = vc.attachedValidators[].count(), + node_status = $vc.beaconNodes[0].status, + delay = shortLog(delay) proc new*(T: type ValidatorClientRef, config: ValidatorClientConf, @@ -224,6 +241,8 @@ proc new*(T: type ValidatorClientRef, config: config, beaconNodes: beaconNodes, graffitiBytes: config.graffiti.get(defaultGraffitiBytes()), + preGenesisEvent: newAsyncEvent(), + genesisEvent: newAsyncEvent(), nodesAvailable: newAsyncEvent(), forksAvailable: newAsyncEvent(), doppelExit: newAsyncEvent(), @@ -239,6 +258,8 @@ proc new*(T: type ValidatorClientRef, config: config, beaconNodes: beaconNodes, graffitiBytes: config.graffiti.get(defaultGraffitiBytes()), + preGenesisEvent: newAsyncEvent(), + genesisEvent: newAsyncEvent(), nodesAvailable: newAsyncEvent(), forksAvailable: newAsyncEvent(), indicesAvailable: newAsyncEvent(), @@ -260,8 +281,8 @@ proc asyncInit(vc: ValidatorClientRef): Future[ValidatorClientRef] {.async.} = vc.beaconGenesis = await vc.initGenesis() info "Genesis information", genesis_time = vc.beaconGenesis.genesis_time, - genesis_fork_version = vc.beaconGenesis.genesis_fork_version, - genesis_root = vc.beaconGenesis.genesis_validators_root + genesis_fork_version = vc.beaconGenesis.genesis_fork_version, + genesis_root = vc.beaconGenesis.genesis_validators_root vc.beaconClock = await vc.initClock() @@ -295,6 +316,7 @@ proc asyncInit(vc: ValidatorClientRef): Future[ValidatorClientRef] {.async.} = vc.dutiesService = await DutiesServiceRef.init(vc) vc.doppelgangerService = await DoppelgangerServiceRef.init(vc) vc.attestationService = await AttestationServiceRef.init(vc) + vc.blockService = await BlockServiceRef.init(vc) vc.syncCommitteeService = await SyncCommitteeServiceRef.init(vc) vc.keymanagerServer = keymanagerInitResult.server if vc.keymanagerServer != nil: @@ -322,12 +344,65 @@ proc asyncInit(vc: ValidatorClientRef): Future[ValidatorClientRef] {.async.} = return vc +proc runPreGenesisWaitingLoop(vc: ValidatorClientRef) {.async.} = + var breakLoop = false + while not(breakLoop): + let + genesisTime = vc.beaconClock.fromNow(Slot(0)) + currentEpoch = vc.beaconClock.now().toSlot().slot.epoch() + + if not(genesisTime.inFuture) or currentEpoch < PREGENESIS_EPOCHS_COUNT: + break + + notice "Waiting for genesis", + genesis_time = vc.beaconGenesis.genesis_time, + time_to_genesis = genesisTime.offset + + breakLoop = + try: + await sleepAsync(vc.beaconClock.durationToNextSlot()) + false + except CancelledError: + debug "Pre-genesis waiting loop was interrupted" + true + except CatchableError as exc: + error "Pre-genesis waiting loop failed with unexpected error", + err_name = $exc.name, err_msg = $exc.msg + true + vc.preGenesisEvent.fire() + +proc runGenesisWaitingLoop(vc: ValidatorClientRef) {.async.} = + var breakLoop = false + while not(breakLoop): + let genesisTime = vc.beaconClock.fromNow(Slot(0)) + + if not(genesisTime.inFuture): + break + + notice "Waiting for genesis", + genesis_time = vc.beaconGenesis.genesis_time, + time_to_genesis = genesisTime.offset + + breakLoop = + try: + await sleepAsync(vc.beaconClock.durationToNextSlot()) + false + except CancelledError: + debug "Genesis waiting loop was interrupted" + true + except CatchableError as exc: + error "Genesis waiting loop failed with unexpected error", + err_name = $exc.name, err_msg = $exc.msg + true + vc.genesisEvent.fire() + proc asyncRun*(vc: ValidatorClientRef) {.async.} = vc.fallbackService.start() vc.forkService.start() vc.dutiesService.start() vc.doppelgangerService.start() vc.attestationService.start() + vc.blockService.start() vc.syncCommitteeService.start() if not isNil(vc.keymanagerServer): @@ -337,7 +412,12 @@ proc asyncRun*(vc: ValidatorClientRef) {.async.} = let doppelEventFut = vc.doppelExit.wait() try: - vc.runSlotLoopFut = runSlotLoop(vc, vc.beaconClock.now(), onSlotStart) + # Waiting for `GENESIS - PREGENESIS_EPOCHS_COUNT` loop. + await vc.runPreGenesisWaitingLoop() + # Waiting for `GENESIS` loop. + await vc.runGenesisWaitingLoop() + # Main processing loop. + vc.runSlotLoopFut = vc.runVCSlotLoop() vc.runKeystoreCachePruningLoopFut = runKeystorecachePruningLoop(vc.keystoreCache) discard await race(vc.runSlotLoopFut, doppelEventFut) @@ -355,8 +435,6 @@ proc asyncRun*(vc: ValidatorClientRef) {.async.} = if doppelEventFut.completed(): # Critically, database has been shut down - the rest doesn't matter, we need # to stop as soon as possible - # TODO we need to actually quit _before_ any other async tasks have had the - # chance to happen quitDoppelganger() debug "Stopping main processing loop" @@ -373,10 +451,10 @@ proc asyncRun*(vc: ValidatorClientRef) {.async.} = pending.add(vc.dutiesService.stop()) pending.add(vc.doppelgangerService.stop()) pending.add(vc.attestationService.stop()) + pending.add(vc.blockService.stop()) pending.add(vc.syncCommitteeService.stop()) if not isNil(vc.keymanagerServer): pending.add(vc.keymanagerServer.stop()) - await allFutures(pending) template runWithSignals(vc: ValidatorClientRef, body: untyped): bool = diff --git a/beacon_chain/validator_client/attestation_service.nim b/beacon_chain/validator_client/attestation_service.nim index 109a457df9..8433bb3fa4 100644 --- a/beacon_chain/validator_client/attestation_service.nim +++ b/beacon_chain/validator_client/attestation_service.nim @@ -33,6 +33,9 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData, doAssert(validator.index.isSome()) let vindex = validator.index.get() + logScope: + validator = validatorLog(validator) + # TODO: signing_root is recomputed in getAttestationSignature just after, # but not for locally attached validators. let signingRoot = @@ -47,8 +50,7 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData, warn "Slashing protection activated for attestation", attestationData = shortLog(adata), signingRoot = shortLog(signingRoot), - validator = shortLog(validator), - validator_index = vindex, badVoteDetails = $notSlashable.error + badVoteDetails = $notSlashable.error return false let attestation = block: @@ -57,8 +59,7 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData, let res = await validator.getAttestationSignature( fork, vc.beaconGenesis.genesis_validators_root, adata) if res.isErr(): - warn "Unable to sign attestation", validator = shortLog(validator), - error_msg = res.error() + warn "Unable to sign attestation", reason = res.error() return false res.get() except CancelledError as exc: @@ -74,9 +75,11 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData, int(duty.data.committee_length), adata, signature).expect( "data validity checked earlier") - debug "Sending attestation", attestation = shortLog(attestation), - validator = shortLog(validator), validator_index = vindex, - delay = vc.getDelay(adata.slot.attestation_deadline()) + logScope: + attestation = shortLog(attestation) + delay = vc.getDelay(adata.slot.attestation_deadline()) + + debug "Sending attestation" validator.doppelgangerActivity(attestation.data.slot.epoch) @@ -84,36 +87,23 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData, try: await vc.submitPoolAttestations(@[attestation], ApiStrategyKind.First) except ValidatorApiError as exc: - warn "Unable to publish attestation", - attestation = shortLog(attestation), - validator = shortLog(validator), - validator_index = vindex, - reason = exc.getFailureReason() + warn "Unable to publish attestation", reason = exc.getFailureReason() return false except CancelledError as exc: debug "Attestation publishing process was interrupted" raise exc except CatchableError as exc: error "Unexpected error occured while publishing attestation", - attestation = shortLog(attestation), - validator = shortLog(validator), - validator_index = vindex, err_name = exc.name, err_msg = exc.msg return false - let delay = vc.getDelay(adata.slot.attestation_deadline()) if res: + let delay = vc.getDelay(adata.slot.attestation_deadline()) beacon_attestations_sent.inc() beacon_attestation_sent_delay.observe(delay.toFloatSeconds()) - notice "Attestation published", attestation = shortLog(attestation), - validator = shortLog(validator), - validator_index = vindex, - delay = delay + notice "Attestation published" else: - warn "Attestation was not accepted by beacon node", - attestation = shortLog(attestation), - validator = shortLog(validator), - validator_index = vindex, delay = delay + warn "Attestation was not accepted by beacon node" return res proc serveAggregateAndProof*(service: AttestationServiceRef, @@ -124,21 +114,21 @@ proc serveAggregateAndProof*(service: AttestationServiceRef, vc = service.client genesisRoot = vc.beaconGenesis.genesis_validators_root slot = proof.aggregate.data.slot - vindex = validator.index.get() fork = vc.forkAtEpoch(slot.epoch) - debug "Signing aggregate", validator = shortLog(validator), - attestation = shortLog(proof.aggregate), fork = fork + logScope: + validator = validatorLog(validator) + attestation = shortLog(proof.aggregate) + + debug "Signing aggregate", fork = fork let signature = try: - let res = await validator.getAggregateAndProofSignature( - fork, genesisRoot, proof) + let res = + await validator.getAggregateAndProofSignature(fork, genesisRoot, proof) if res.isErr(): warn "Unable to sign aggregate and proof using remote signer", - validator = shortLog(validator), - attestation = shortLog(proof.aggregate), - error_msg = res.error() + reason = res.error() return false res.get() except CancelledError as exc: @@ -146,19 +136,15 @@ proc serveAggregateAndProof*(service: AttestationServiceRef, raise exc except CatchableError as exc: error "Unexpected error occured while signing aggregated attestation", - validator = shortLog(validator), - attestation = shortLog(proof.aggregate), - validator_index = vindex, err_name = exc.name, err_msg = exc.msg return false let signedProof = SignedAggregateAndProof(message: proof, signature: signature) + logScope: + delay = vc.getDelay(slot.aggregate_deadline()) - debug "Sending aggregated attestation", fork = fork, - attestation = shortLog(signedProof.message.aggregate), - validator = shortLog(validator), validator_index = vindex, - delay = vc.getDelay(slot.aggregate_deadline()) + debug "Sending aggregated attestation", fork = fork validator.doppelgangerActivity(proof.aggregate.data.slot.epoch) @@ -167,32 +153,21 @@ proc serveAggregateAndProof*(service: AttestationServiceRef, await vc.publishAggregateAndProofs(@[signedProof], ApiStrategyKind.First) except ValidatorApiError as exc: warn "Unable to publish aggregated attestation", - attestation = shortLog(signedProof.message.aggregate), - validator = shortLog(validator), - validator_index = vindex, - reason = exc.getFailureReason() + reason = exc.getFailureReason() return false except CancelledError as exc: debug "Publish aggregate and proofs request was interrupted" raise exc except CatchableError as exc: error "Unexpected error occured while publishing aggregated attestation", - attestation = shortLog(signedProof.message.aggregate), - validator = shortLog(validator), err_name = exc.name, err_msg = exc.msg return false if res: beacon_aggregates_sent.inc() - notice "Aggregated attestation published", - attestation = shortLog(signedProof.message.aggregate), - validator = shortLog(validator), - validator_index = vindex + notice "Aggregated attestation published" else: - warn "Aggregated attestation was not accepted by beacon node", - attestation = shortLog(signedProof.message.aggregate), - validator = shortLog(validator), - validator_index = vindex + warn "Aggregated attestation was not accepted by beacon node" return res proc produceAndPublishAttestations*(service: AttestationServiceRef, @@ -394,7 +369,7 @@ proc publishAttestationsAndAggregates(service: AttestationServiceRef, await service.produceAndPublishAggregates(ad, duties) proc spawnAttestationTasks(service: AttestationServiceRef, - slot: Slot) = + slot: Slot) {.async.} = let vc = service.client let dutiesByCommittee = block: @@ -405,34 +380,67 @@ proc spawnAttestationTasks(service: AttestationServiceRef, res.mgetOrPut(item.data.committee_index, default).add(item) res - var dutiesSkipped: seq[string] - for index, duties in dutiesByCommittee: - asyncSpawn service.publishAttestationsAndAggregates(slot, index, duties) - if len(dutiesSkipped) > 0: - info "Doppelganger protection disabled validator duties", - validators = len(dutiesSkipped) - trace "Doppelganger protection disabled validator duties dump", - validators = dutiesSkipped + var tasks: seq[Future[void]] + try: + for index, duties in dutiesByCommittee: + tasks.add(service.publishAttestationsAndAggregates(slot, index, duties)) + let timeout = vc.beaconClock.durationToNextSlot() + await allFutures(tasks).wait(timeout) + except AsyncTimeoutError: + # Cancelling all the pending tasks. + let pending = tasks.filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await allFutures(pending) + except CancelledError as exc: + # Cancelling all the pending tasks. + let pending = tasks.filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await allFutures(pending) + raise exc + except CatchableError as exc: + error "Unexpected error while processing attestation duties", + error_name = exc.name, error_message = exc.msg proc mainLoop(service: AttestationServiceRef) {.async.} = let vc = service.client service.state = ServiceState.Running debug "Service started" + debug "Attester loop is waiting for initialization" + try: + await allFutures( + vc.preGenesisEvent.wait(), + vc.genesisEvent.wait(), + vc.indicesAvailable.wait(), + vc.forksAvailable.wait() + ) + except CancelledError: + debug "Service interrupted" + return + except CatchableError as exc: + warn "Service crashed with unexpected error", err_name = exc.name, + err_msg = exc.msg + return + + doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point") + + var currentSlot: Opt[Slot] while true: # This loop could look much more nicer/better, when # https://github.com/nim-lang/Nim/issues/19911 will be fixed, so it could # become safe to combine loops, breaks and exception handlers. let breakLoop = try: - let sleepTime = - attestationSlotOffset + vc.beaconClock.durationToNextSlot() - let sres = vc.getCurrentSlot() - if sres.isSome(): - let currentSlot = sres.get() - service.spawnAttestationTasks(currentSlot) - await sleepAsync(sleepTime) - false + let + # We use zero offset here, because we do waiting in + # waitForBlockPublished(attestationSlotOffset). + slot = await vc.checkedWaitForNextSlot(currentSlot, + ZeroTimeDiff, false) + if slot.isNone(): + debug "System time adjusted backwards significantly, exiting" + true + else: + currentSlot = slot + await service.spawnAttestationTasks(currentSlot.get()) + false except CancelledError: debug "Service interrupted" true diff --git a/beacon_chain/validator_client/block_service.nim b/beacon_chain/validator_client/block_service.nim index 6dd35788ba..e76be0c3c9 100644 --- a/beacon_chain/validator_client/block_service.nim +++ b/beacon_chain/validator_client/block_service.nim @@ -11,7 +11,10 @@ import ".."/spec/forks, common, api -logScope: service = "block_service" +const + ServiceName = "block_service" + +logScope: service = ServiceName type PreparedBeaconBlock = object @@ -300,26 +303,28 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot, proc proposeBlock(vc: ValidatorClientRef, slot: Slot, proposerKey: ValidatorPubKey) {.async.} = - let (inFuture, timeToSleep) = vc.beaconClock.fromNow(slot) - try: - if inFuture: - debug "Proposing block", timeIn = timeToSleep, - validator = shortLog(proposerKey) - await sleepAsync(timeToSleep) - else: - debug "Proposing block", timeIn = 0.seconds, - validator = shortLog(proposerKey) + let + currentSlot = (await vc.checkedWaitForSlot(slot, ZeroTimeDiff, + false)).valueOr: + error "Unable to perform block production because of system time" + return - let sres = vc.getCurrentSlot() - if sres.isSome(): - let - currentSlot = sres.get() - validator = vc.getValidatorForDuties(proposerKey, slot).valueOr: return - await vc.publishBlock(currentSlot, slot, validator) + if currentSlot > slot: + warn "Skip block production for expired slot", + current_slot = currentSlot, duties_slot = slot + return + + let validator = vc.getValidatorForDuties(proposerKey, slot).valueOr: return + + try: + await vc.publishBlock(currentSlot, slot, validator) except CancelledError as exc: - debug "Block proposing was interrupted", slot = slot, - validator = shortLog(proposerKey) + debug "Block proposing process was interrupted", + slot = slot, validator = shortLog(proposerKey) raise exc + except CatchableError as exc: + error "Unexpected error encountered while proposing block", + slot = slot, validator = shortLog(validator) proc spawnProposalTask(vc: ValidatorClientRef, duty: RestProposerDuty): ProposerTask = @@ -356,73 +361,72 @@ proc checkDuty(duty: RestProposerDuty, epoch: Epoch, slot: Slot): bool = proc addOrReplaceProposers*(vc: ValidatorClientRef, epoch: Epoch, dependentRoot: Eth2Digest, duties: openArray[RestProposerDuty]) = - let default = ProposedData(epoch: Epoch(0xFFFF_FFFF_FFFF_FFFF'u64)) - let sres = vc.getCurrentSlot() - if sres.isSome(): - let - currentSlot = sres.get() - epochDuties = vc.proposers.getOrDefault(epoch, default) - if not(epochDuties.isDefault()): - if epochDuties.dependentRoot != dependentRoot: - warn "Proposer duties re-organization", duties_count = len(duties), - wall_slot = currentSlot, epoch = epoch, - prior_dependent_root = epochDuties.dependentRoot, - dependent_root = dependentRoot, wall_slot = currentSlot - let tasks = - block: - var res: seq[ProposerTask] - var hashset = initHashSet[Slot]() - - for task in epochDuties.duties: - if task notin duties: - # Task is no more relevant, so cancel it. - debug "Cancelling running proposal duty task", - slot = task.duty.slot, - validator = shortLog(task.duty.pubkey) - task.future.cancel() - else: - # If task is already running for proper slot, we keep it alive. - debug "Keep running previous proposal duty task", - slot = task.duty.slot, - validator = shortLog(task.duty.pubkey) - res.add(task) - - for duty in duties: - if duty notin res: - debug "New proposal duty received", slot = duty.slot, - validator = shortLog(duty.pubkey) - if checkDuty(duty, epoch, currentSlot): - let task = vc.spawnProposalTask(duty) - if duty.slot in hashset: - warn "Multiple block proposers for this slot, " & - "producing blocks for all proposers", slot = duty.slot - else: - hashset.incl(duty.slot) - res.add(task) - res - vc.proposers[epoch] = ProposedData.init(epoch, dependentRoot, tasks) - else: - debug "New block proposal duties received", - dependent_root = dependentRoot, duties_count = len(duties), - wall_slot = currentSlot, epoch = epoch - # Spawn new proposer tasks and modify proposers map. + let + default = ProposedData(epoch: FAR_FUTURE_EPOCH) + currentSlot = vc.getCurrentSlot().get(Slot(0)) + epochDuties = vc.proposers.getOrDefault(epoch, default) + + if not(epochDuties.isDefault()): + if epochDuties.dependentRoot != dependentRoot: + warn "Proposer duties re-organization", duties_count = len(duties), + wall_slot = currentSlot, epoch = epoch, + prior_dependent_root = epochDuties.dependentRoot, + dependent_root = dependentRoot let tasks = block: - var hashset = initHashSet[Slot]() var res: seq[ProposerTask] - for duty in duties: - debug "New proposal duty received", slot = duty.slot, - validator = shortLog(duty.pubkey) - if checkDuty(duty, epoch, currentSlot): - let task = vc.spawnProposalTask(duty) - if duty.slot in hashset: - warn "Multiple block proposers for this slot, " & - "producing blocks for all proposers", slot = duty.slot - else: - hashset.incl(duty.slot) + var hashset = initHashSet[Slot]() + + for task in epochDuties.duties: + if task notin duties: + # Task is no more relevant, so cancel it. + debug "Cancelling running proposal duty task", + slot = task.duty.slot, + validator = shortLog(task.duty.pubkey) + task.future.cancel() + else: + # If task is already running for proper slot, we keep it alive. + debug "Keep running previous proposal duty task", + slot = task.duty.slot, + validator = shortLog(task.duty.pubkey) res.add(task) + + for duty in duties: + if duty notin res: + debug "New proposal duty received", slot = duty.slot, + validator = shortLog(duty.pubkey) + if checkDuty(duty, epoch, currentSlot): + let task = vc.spawnProposalTask(duty) + if duty.slot in hashset: + error "Multiple block proposers for this slot, " & + "producing blocks for all proposers", slot = duty.slot + else: + hashset.incl(duty.slot) + res.add(task) res vc.proposers[epoch] = ProposedData.init(epoch, dependentRoot, tasks) + else: + debug "New block proposal duties received", + dependent_root = dependentRoot, duties_count = len(duties), + wall_slot = currentSlot, epoch = epoch + # Spawn new proposer tasks and modify proposers map. + let tasks = + block: + var hashset = initHashSet[Slot]() + var res: seq[ProposerTask] + for duty in duties: + debug "New proposal duty received", slot = duty.slot, + validator = shortLog(duty.pubkey) + if checkDuty(duty, epoch, currentSlot): + let task = vc.spawnProposalTask(duty) + if duty.slot in hashset: + error "Multiple block proposers for this slot, " & + "producing blocks for all proposers", slot = duty.slot + else: + hashset.incl(duty.slot) + res.add(task) + res + vc.proposers[epoch] = ProposedData.init(epoch, dependentRoot, tasks) proc waitForBlockPublished*(vc: ValidatorClientRef, slot: Slot, timediff: TimeDiff) {.async.} = @@ -439,24 +443,100 @@ proc waitForBlockPublished*(vc: ValidatorClientRef, if not(task.future.finished()): res.add(task.future) res + waitTime = (start_beacon_time(slot) + timediff) - vc.beaconClock.now() + logScope: start_time = startTime pending_tasks = len(pendingTasks) slot = slot timediff = timediff - if len(pendingTasks) > 0: - let waitTime = (start_beacon_time(slot) + timediff) - vc.beaconClock.now() - logScope: - wait_time = waitTime - if waitTime.nanoseconds > 0'i64: + # TODO (cheatfate): This algorithm should be tuned, when we will have ability + # to monitor block proposals which are not created by validators bundled with + # VC. + logScope: wait_time = waitTime + if waitTime.nanoseconds > 0'i64: + if len(pendingTasks) > 0: + # Block proposal pending + try: + await allFutures(pendingTasks).wait(nanoseconds(waitTime.nanoseconds)) + trace "Block proposal awaited" + # The expected block arrived - in our async loop however, we might + # have been doing other processing that caused delays here so we'll + # cap the waiting to the time when we would have sent out attestations + # had the block not arrived. An opposite case is that we received + # (or produced) a block that has not yet reached our neighbours. To + # protect against our attestations being dropped (because the others + # have not yet seen the block), we'll impose a minimum delay of + # 2000ms. The delay is enforced only when we're not hitting the + # "normal" cutoff time for sending out attestations. An earlier delay + # of 250ms has proven to be not enough, increasing the risk of losing + # attestations, and with growing block sizes, 1000ms started to be + # risky as well. Regardless, because we "just" received the block, + # we'll impose the delay. + + # Take into consideration chains with a different slot time + const afterBlockDelay = nanos(attestationSlotOffset.nanoseconds div 2) + let + afterBlockTime = vc.beaconClock.now() + afterBlockDelay + afterBlockCutoff = vc.beaconClock.fromNow( + min(afterBlockTime, + slot.attestation_deadline() + afterBlockDelay)) + if afterBlockCutoff.inFuture: + debug "Got block, waiting to send attestations", + after_block_cutoff = shortLog(afterBlockCutoff.offset) + await sleepAsync(afterBlockCutoff.offset) + except CancelledError as exc: + let dur = Moment.now() - startTime + debug "Waiting for block publication interrupted", duration = dur + raise exc + except AsyncTimeoutError: + let dur = Moment.now() - startTime + debug "Block was not published in time", duration = dur + else: + # No pending block proposals. try: - await allFutures(pendingTasks).wait(nanoseconds(waitTime.nanoseconds)) - trace "Block proposal awaited" + await sleepAsync(nanoseconds(waitTime.nanoseconds)) except CancelledError as exc: let dur = Moment.now() - startTime debug "Waiting for block publication interrupted", duration = dur raise exc - except AsyncTimeoutError: + except CatchableError as exc: let dur = Moment.now() - startTime - debug "Block was not published in time", duration = dur + error "Unexpected error occured while waiting for block publication", + err_name = exc.name, err_msg = exc.msg, duration = dur + return + +proc mainLoop(service: BlockServiceRef) {.async.} = + let vc = service.client + service.state = ServiceState.Running + debug "Service started" + var future = newFuture[void]() + try: + # Future is not going to be completed, so the only way to exit, is to + # cancel it. + await future + except CancelledError as exc: + debug "Service interrupted" + except CatchableError as exc: + error "Service crashed with unexpected error", err_name = exc.name, + err_msg = exc.msg + + # We going to cleanup all the pending proposer tasks. + var res: seq[Future[void]] + for epoch, data in vc.proposers.pairs(): + for duty in data.duties.items(): + if not(duty.future.finished()): + res.add(duty.future.cancelAndWait()) + await allFutures(res) + +proc init*(t: typedesc[BlockServiceRef], + vc: ValidatorClientRef): Future[BlockServiceRef] {.async.} = + logScope: service = ServiceName + var res = BlockServiceRef(name: ServiceName, client: vc, + state: ServiceState.Initialized) + debug "Initializing service" + return res + +proc start*(service: BlockServiceRef) = + service.lifeFut = mainLoop(service) diff --git a/beacon_chain/validator_client/common.nim b/beacon_chain/validator_client/common.nim index 838001c23c..d90b2c4bec 100644 --- a/beacon_chain/validator_client/common.nim +++ b/beacon_chain/validator_client/common.nim @@ -39,14 +39,12 @@ const DelayBuckets* = [-Inf, -4.0, -2.0, -1.0, -0.5, -0.1, -0.05, 0.05, 0.1, 0.5, 1.0, 2.0, 4.0, 8.0, Inf] + ZeroTimeDiff* = TimeDiff(nanoseconds: 0'i64) + type ServiceState* {.pure.} = enum Initialized, Running, Error, Closing, Closed - BlockServiceEventRef* = ref object of RootObj - slot*: Slot - proposers*: seq[ValidatorPubKey] - RegistrationKind* {.pure.} = enum Cached, IncorrectTime, MissingIndex, MissingFee, MissingGasLimit ErrorSignature, NoSignature @@ -174,6 +172,8 @@ type beaconClock*: BeaconClock attachedValidators*: ref ValidatorPool forks*: seq[Fork] + preGenesisEvent*: AsyncEvent + genesisEvent*: AsyncEvent forksAvailable*: AsyncEvent nodesAvailable*: AsyncEvent indicesAvailable*: AsyncEvent @@ -201,7 +201,7 @@ type data*: seq[ApiNodeFailure] const - DefaultDutyAndProof* = DutyAndProof(epoch: Epoch(0xFFFF_FFFF_FFFF_FFFF'u64)) + DefaultDutyAndProof* = DutyAndProof(epoch: FAR_FUTURE_EPOCH) SlotDuration* = int64(SECONDS_PER_SLOT).seconds OneThirdDuration* = int64(SECONDS_PER_SLOT).seconds div INTERVALS_PER_SLOT AllBeaconNodeRoles* = { @@ -323,12 +323,21 @@ proc `$`*(bn: BeaconNodeServerRef): string = bn.logIdent proc validatorLog*(key: ValidatorPubKey, - index: ValidatorIndex): string = + index: ValidatorIndex): string = var res = shortLog(key) res.add('@') res.add(Base10.toString(uint64(index))) res +proc validatorLog*(validator: AttachedValidator): string = + var res = shortLog(validator) + res.add('@') + if validator.index.isSome(): + res.add(Base10.toString(uint64(validator.index.get()))) + else: + res.add("") + res + chronicles.expandIt(BeaconNodeServerRef): node = $it node_index = it.index @@ -564,18 +573,12 @@ proc init*(t: typedesc[ProposedData], epoch: Epoch, dependentRoot: Eth2Digest, data: openArray[ProposerTask]): ProposedData = ProposedData(epoch: epoch, dependentRoot: dependentRoot, duties: @data) -proc getCurrentSlot*(vc: ValidatorClientRef): Option[Slot] = - let - wallTime = vc.beaconClock.now() - wallSlot = wallTime.toSlot() - - if not(wallSlot.afterGenesis): - let checkGenesisTime = vc.beaconClock.fromNow(start_beacon_time(Slot(0))) - warn "Jump in time detected, something wrong with wallclock", - wall_time = wallTime, genesisIn = checkGenesisTime.offset - none[Slot]() +proc getCurrentSlot*(vc: ValidatorClientRef): Opt[Slot] = + let res = vc.beaconClock.now().toSlot() + if res.afterGenesis: + Opt.some(res.slot) else: - some(wallSlot.slot) + Opt.none(Slot) proc getAttesterDutiesForSlot*(vc: ValidatorClientRef, slot: Slot): seq[DutyAndProof] = @@ -915,3 +918,75 @@ proc prepareRegistrationList*( proc init*(t: typedesc[ApiNodeFailure], node: BeaconNodeServerRef, failure: ApiFailure): ApiNodeFailure = ApiNodeFailure(node: node, failure: failure) + +proc checkedWaitForSlot*(vc: ValidatorClientRef, destinationSlot: Slot, + offset: TimeDiff, + showLogs: bool): Future[Opt[Slot]] {.async.} = + let + currentTime = vc.beaconClock.now() + currentSlot = currentTime.slotOrZero() + chronosOffset = chronos.nanoseconds( + if offset.nanoseconds < 0: 0'i64 else: offset.nanoseconds) + + var timeToSlot = (destinationSlot.start_beacon_time() - currentTime) + + chronosOffset + + logScope: + start_time = shortLog(currentTime) + start_slot = shortLog(currentSlot) + dest_slot = shortLog(destinationSlot) + time_to_slot = shortLog(timeToSlot) + + while true: + await sleepAsync(timeToSlot) + + let + wallTime = vc.beaconClock.now() + wallSlot = wallTime.slotOrZero() + + logScope: + wall_time = shortLog(wallTime) + wall_slot = shortLog(wallSlot) + + if wallSlot < destinationSlot: + # While we were sleeping, the system clock changed and time moved + # backwards! + if wallSlot + 1 < destinationSlot: + # This is a critical condition where it's hard to reason about what + # to do next - we'll call the attention of the user here by shutting + # down. + if showLogs: + fatal "System time adjusted backwards significantly - " & + "clock may be inaccurate - shutting down" + return Opt.none(Slot) + else: + # Time moved back by a single slot - this could be a minor adjustment, + # for example when NTP does its thing after not working for a while + timeToSlot = destinationSlot.start_beacon_time() - wallTime + + chronosOffset + if showLogs: + warn "System time adjusted backwards, rescheduling slot actions" + continue + + elif wallSlot > destinationSlot + SLOTS_PER_EPOCH: + if showLogs: + warn "Time moved forwards by more than an epoch, skipping ahead" + return Opt.some(wallSlot) + + elif wallSlot > destinationSlot: + if showLogs: + notice "Missed expected slot start, catching up" + return Opt.some(wallSlot) + + else: + return Opt.some(destinationSlot) + +proc checkedWaitForNextSlot*(vc: ValidatorClientRef, curSlot: Opt[Slot], + offset: TimeDiff, + showLogs: bool): Future[Opt[Slot]] = + let + currentTime = vc.beaconClock.now() + currentSlot = curSlot.valueOr: currentTime.slotOrZero() + nextSlot = currentSlot + 1 + + vc.checkedWaitForSlot(nextSlot, offset, showLogs) diff --git a/beacon_chain/validator_client/doppelganger_service.nim b/beacon_chain/validator_client/doppelganger_service.nim index ed165ebdd7..9cef4488b4 100644 --- a/beacon_chain/validator_client/doppelganger_service.nim +++ b/beacon_chain/validator_client/doppelganger_service.nim @@ -58,6 +58,21 @@ proc mainLoop(service: DoppelgangerServiceRef) {.async.} = debug "Service disabled because of configuration settings" return + debug "Doppelganger detection loop is waiting for initialization" + try: + await allFutures( + vc.preGenesisEvent.wait(), + vc.genesisEvent.wait(), + vc.indicesAvailable.wait() + ) + except CancelledError: + debug "Service interrupted" + return + except CatchableError as exc: + warn "Service crashed with unexpected error", err_name = exc.name, + err_msg = exc.msg + return + # On (re)start, we skip the remainder of the epoch before we start monitoring # for doppelgangers so we don't trigger on the attestations we produced before # the epoch - there's no activity in the genesis slot, so if we start at or diff --git a/beacon_chain/validator_client/duties_service.nim b/beacon_chain/validator_client/duties_service.nim index 52c10354bb..6ef11ab678 100644 --- a/beacon_chain/validator_client/duties_service.nim +++ b/beacon_chain/validator_client/duties_service.nim @@ -38,7 +38,9 @@ proc checkDuty(duty: RestAttesterDuty): bool = proc checkSyncDuty(duty: RestSyncCommitteeDuty): bool = uint64(duty.validator_index) <= VALIDATOR_REGISTRY_LIMIT -proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} = +proc pollForValidatorIndices*(service: DutiesServiceRef) {.async.} = + let vc = service.client + let validatorIdents = block: var res: seq[ValidatorIdent] @@ -107,16 +109,12 @@ proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} = updated_validators = updated vc.indicesAvailable.fire() -proc pollForAttesterDuties*(vc: ValidatorClientRef, +proc pollForAttesterDuties*(service: DutiesServiceRef, epoch: Epoch): Future[int] {.async.} = - let validatorIndices = - block: - var res: seq[ValidatorIndex] - for index in vc.attachedValidators[].indices(): - res.add(index) - res + let vc = service.client + let validatorIndices = toSeq(vc.attachedValidators[].indices()) - if validatorIndices.len == 0: + if len(validatorIndices) == 0: return 0 var duties: seq[RestAttesterDuty] @@ -243,7 +241,8 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef, return len(addOrReplaceItems) -proc pruneSyncCommitteeDuties*(vc: ValidatorClientRef, slot: Slot) = +proc pruneSyncCommitteeDuties*(service: DutiesServiceRef, slot: Slot) = + let vc = service.client if slot.is_sync_committee_period(): var newSyncCommitteeDuties: SyncCommitteeDutiesMap let epoch = slot.epoch() @@ -255,8 +254,9 @@ proc pruneSyncCommitteeDuties*(vc: ValidatorClientRef, slot: Slot) = newSyncCommitteeDuties[key] = currentPeriodDuties vc.syncCommitteeDuties = newSyncCommitteeDuties -proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef, +proc pollForSyncCommitteeDuties*(service: DutiesServiceRef, epoch: Epoch): Future[int] {.async.} = + let vc = service.client let validatorIndices = toSeq(vc.attachedValidators[].indices()) var filteredDuties: seq[RestSyncCommitteeDuty] @@ -335,7 +335,8 @@ proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef, return len(addOrReplaceItems) -proc pruneAttesterDuties(vc: ValidatorClientRef, epoch: Epoch) = +proc pruneAttesterDuties(service: DutiesServiceRef, epoch: Epoch) = + let vc = service.client var attesters: AttesterMap for key, item in vc.attesters: var v = EpochDuties() @@ -348,7 +349,7 @@ proc pruneAttesterDuties(vc: ValidatorClientRef, epoch: Epoch) = attesters[key] = v vc.attesters = attesters -proc pollForAttesterDuties*(vc: ValidatorClientRef) {.async.} = +proc pollForAttesterDuties*(service: DutiesServiceRef) {.async.} = ## Query the beacon node for attestation duties for all known validators. ## ## This function will perform (in the following order): @@ -356,125 +357,127 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef) {.async.} = ## 1. Poll for current-epoch duties and update the local `attesters` map. ## 2. Poll for next-epoch duties and update the local `attesters` map. ## 3. Push out any attestation subnet subscriptions to the BN. - let sres = vc.getCurrentSlot() - if sres.isSome(): - let - currentSlot = sres.get() - currentEpoch = currentSlot.epoch() - nextEpoch = currentEpoch + 1'u64 + let vc = service.client + let + currentSlot = vc.getCurrentSlot().get(Slot(0)) + currentEpoch = currentSlot.epoch() + nextEpoch = currentEpoch + 1'u64 - if vc.attachedValidators[].count() != 0: - var counts: array[2, tuple[epoch: Epoch, count: int]] - counts[0] = (currentEpoch, await vc.pollForAttesterDuties(currentEpoch)) - counts[1] = (nextEpoch, await vc.pollForAttesterDuties(nextEpoch)) + if vc.attachedValidators[].count() != 0: + var counts: array[2, tuple[epoch: Epoch, count: int]] + counts[0] = (currentEpoch, + await service.pollForAttesterDuties(currentEpoch)) + counts[1] = (nextEpoch, + await service.pollForAttesterDuties(nextEpoch)) - if (counts[0].count == 0) and (counts[1].count == 0): - debug "No new attester's duties received", slot = currentSlot + if (counts[0].count == 0) and (counts[1].count == 0): + debug "No new attester's duties received", slot = currentSlot - let subscriptions = - block: - var res: seq[RestCommitteeSubscription] - for item in counts: - if item.count > 0: - for duty in vc.attesterDutiesForEpoch(item.epoch): - if currentSlot + SUBSCRIPTION_BUFFER_SLOTS < duty.data.slot: - let isAggregator = - if duty.slotSig.isSome(): - is_aggregator(duty.data.committee_length, - duty.slotSig.get()) - else: - false - let sub = RestCommitteeSubscription( - validator_index: duty.data.validator_index, - committee_index: duty.data.committee_index, - committees_at_slot: duty.data.committees_at_slot, - slot: duty.data.slot, - is_aggregator: isAggregator - ) - res.add(sub) - res + let subscriptions = + block: + var res: seq[RestCommitteeSubscription] + for item in counts: + if item.count > 0: + for duty in vc.attesterDutiesForEpoch(item.epoch): + if currentSlot + SUBSCRIPTION_BUFFER_SLOTS < duty.data.slot: + let isAggregator = + if duty.slotSig.isSome(): + is_aggregator(duty.data.committee_length, + duty.slotSig.get()) + else: + false + let sub = RestCommitteeSubscription( + validator_index: duty.data.validator_index, + committee_index: duty.data.committee_index, + committees_at_slot: duty.data.committees_at_slot, + slot: duty.data.slot, + is_aggregator: isAggregator + ) + res.add(sub) + res - if len(subscriptions) > 0: - let res = await vc.prepareBeaconCommitteeSubnet(subscriptions) - if res == 0: - warn "Failed to subscribe validators to beacon committee subnets", - slot = currentSlot, epoch = currentEpoch, - subscriptions_count = len(subscriptions) + if len(subscriptions) > 0: + let res = await vc.prepareBeaconCommitteeSubnet(subscriptions) + if res == 0: + warn "Failed to subscribe validators to beacon committee subnets", + slot = currentSlot, epoch = currentEpoch, + subscriptions_count = len(subscriptions) - vc.pruneAttesterDuties(currentEpoch) + service.pruneAttesterDuties(currentEpoch) -proc pollForSyncCommitteeDuties* (vc: ValidatorClientRef) {.async.} = - let sres = vc.getCurrentSlot() - if sres.isSome(): - let - currentSlot = sres.get() - currentEpoch = currentSlot.epoch() +proc pollForSyncCommitteeDuties*(service: DutiesServiceRef) {.async.} = + let vc = service.client + let + currentSlot = vc.getCurrentSlot().get(Slot(0)) + currentEpoch = currentSlot.epoch() - if vc.attachedValidators[].count() != 0: - let - dutyPeriods = - block: - var res: seq[tuple[epoch: Epoch, period: SyncCommitteePeriod]] - let - currentPeriod = currentSlot.sync_committee_period() - lookaheadSlot = currentSlot + - SUBSCRIPTION_LOOKAHEAD_EPOCHS * SLOTS_PER_EPOCH - lookaheadPeriod = lookaheadSlot.sync_committee_period() + if vc.attachedValidators[].count() != 0: + let + dutyPeriods = + block: + var res: seq[tuple[epoch: Epoch, period: SyncCommitteePeriod]] + let + currentPeriod = currentSlot.sync_committee_period() + lookaheadSlot = currentSlot + + SUBSCRIPTION_LOOKAHEAD_EPOCHS * SLOTS_PER_EPOCH + lookaheadPeriod = lookaheadSlot.sync_committee_period() + res.add( + (epoch: currentSlot.epoch(), + period: currentPeriod) + ) + if lookAheadPeriod > currentPeriod: res.add( - (epoch: currentSlot.epoch(), - period: currentPeriod) + (epoch: lookaheadPeriod.start_epoch(), + period: lookAheadPeriod) ) - if lookAheadPeriod > currentPeriod: - res.add( - (epoch: lookaheadPeriod.start_epoch(), - period: lookAheadPeriod) - ) - res - - (counts, total) = - block: - var res: seq[tuple[epoch: Epoch, period: SyncCommitteePeriod, - count: int]] - var total = 0 - if len(dutyPeriods) > 0: - for (epoch, period) in dutyPeriods: - let count = await vc.pollForSyncCommitteeDuties(epoch) - res.add((epoch: epoch, period: period, count: count)) - total += count - (res, total) - - if total == 0: - debug "No new sync committee member's duties received", - slot = currentSlot - - let subscriptions = - block: - var res: seq[RestSyncCommitteeSubscription] - for item in counts: - if item.count > 0: - let untilEpoch = start_epoch(item.period + 1'u64) - let subscriptionsInfo = - vc.syncMembersSubscriptionInfoForEpoch(item.epoch) - for subInfo in subscriptionsInfo: - let sub = RestSyncCommitteeSubscription( - validator_index: subInfo.validator_index, - sync_committee_indices: - subInfo.validator_sync_committee_indices, - until_epoch: untilEpoch - ) - res.add(sub) res - if len(subscriptions) > 0: - let res = await vc.prepareSyncCommitteeSubnets(subscriptions) - if res != 0: - warn "Failed to subscribe validators to sync committee subnets", - slot = currentSlot, epoch = currentEpoch, - subscriptions_count = len(subscriptions) + (counts, total) = + block: + var res: seq[tuple[epoch: Epoch, period: SyncCommitteePeriod, + count: int]] + var total = 0 + if len(dutyPeriods) > 0: + for (epoch, period) in dutyPeriods: + let count = await service.pollForSyncCommitteeDuties(epoch) + res.add((epoch: epoch, period: period, count: count)) + total += count + (res, total) + + if total == 0: + debug "No new sync committee member's duties received", + slot = currentSlot + + let subscriptions = + block: + var res: seq[RestSyncCommitteeSubscription] + for item in counts: + if item.count > 0: + let untilEpoch = start_epoch(item.period + 1'u64) + let subscriptionsInfo = + vc.syncMembersSubscriptionInfoForEpoch(item.epoch) + for subInfo in subscriptionsInfo: + let sub = RestSyncCommitteeSubscription( + validator_index: subInfo.validator_index, + sync_committee_indices: + subInfo.validator_sync_committee_indices, + until_epoch: untilEpoch + ) + res.add(sub) + res + + if len(subscriptions) > 0: + let res = await vc.prepareSyncCommitteeSubnets(subscriptions) + if res == 0: + warn "Failed to subscribe validators to sync committee subnets", + slot = currentSlot, epoch = currentEpoch, + subscriptions_count = len(subscriptions) + + service.pruneSyncCommitteeDuties(currentSlot) - vc.pruneSyncCommitteeDuties(currentSlot) +proc pruneBeaconProposers(service: DutiesServiceRef, epoch: Epoch) = + let vc = service.client -proc pruneBeaconProposers(vc: ValidatorClientRef, epoch: Epoch) = var proposers: ProposerMap for epochKey, data in vc.proposers: if (epochKey + HISTORICAL_DUTIES_EPOCHS) >= epoch: @@ -484,94 +487,88 @@ proc pruneBeaconProposers(vc: ValidatorClientRef, epoch: Epoch) = loop = ProposerLoop vc.proposers = proposers -proc pollForBeaconProposers*(vc: ValidatorClientRef) {.async.} = - let sres = vc.getCurrentSlot() - if sres.isSome(): - let - currentSlot = sres.get() - currentEpoch = currentSlot.epoch() +proc pollForBeaconProposers*(service: DutiesServiceRef) {.async.} = + let vc = service.client + let + currentSlot = vc.getCurrentSlot().get(Slot(0)) + currentEpoch = currentSlot.epoch() + + if vc.attachedValidators[].count() != 0: + try: + let res = await vc.getProposerDuties(currentEpoch, + ApiStrategyKind.First) + let + dependentRoot = res.dependent_root + duties = res.data + relevantDuties = duties.filterIt(it.pubkey in vc.attachedValidators[]) + + if len(relevantDuties) > 0: + vc.addOrReplaceProposers(currentEpoch, dependentRoot, relevantDuties) + else: + debug "No relevant proposer duties received", slot = currentSlot, + duties_count = len(duties) + except ValidatorApiError as exc: + notice "Unable to get proposer duties", slot = currentSlot, + epoch = currentEpoch, reason = exc.getFailureReason() + except CancelledError as exc: + debug "Proposer duties processing was interrupted" + raise exc + except CatchableError as exc: + debug "Unexpected error occured while getting proposer duties", + slot = currentSlot, epoch = currentEpoch, err_name = exc.name, + err_msg = exc.msg + + service.pruneBeaconProposers(currentEpoch) + +proc prepareBeaconProposers*(service: DutiesServiceRef) {.async.} = + let vc = service.client + let + currentSlot = vc.getCurrentSlot().get(Slot(0)) + currentEpoch = currentSlot.epoch() + proposers = vc.prepareProposersList(currentEpoch) - if vc.attachedValidators[].count() != 0: + if len(proposers) > 0: + let count = try: - let res = await vc.getProposerDuties(currentEpoch, - ApiStrategyKind.First) - let - dependentRoot = res.dependent_root - duties = res.data - relevantDuties = duties.filterIt(it.pubkey in vc.attachedValidators[]) - - if len(relevantDuties) > 0: - vc.addOrReplaceProposers(currentEpoch, dependentRoot, relevantDuties) - else: - debug "No relevant proposer duties received", slot = currentSlot, - duties_count = len(duties) + await prepareBeaconProposer(vc, proposers) except ValidatorApiError as exc: - warn "Unable to get proposer duties", slot = currentSlot, - epoch = currentEpoch, reason = exc.getFailureReason() + warn "Unable to prepare beacon proposers", slot = currentSlot, + epoch = currentEpoch, err_name = exc.name, + err_msg = exc.msg, reason = exc.getFailureReason() + 0 except CancelledError as exc: - debug "Proposer duties processing was interrupted" + debug "Beacon proposer preparation processing was interrupted" raise exc except CatchableError as exc: - debug "Unexpected error occured while getting proposer duties", + error "Unexpected error occured while preparing beacon proposers", slot = currentSlot, epoch = currentEpoch, err_name = exc.name, err_msg = exc.msg - - vc.pruneBeaconProposers(currentEpoch) - -proc prepareBeaconProposers*(service: DutiesServiceRef) {.async.} = - let vc = service.client - let sres = vc.getCurrentSlot() - if sres.isSome(): - let - currentSlot = sres.get() - currentEpoch = currentSlot.epoch() - proposers = vc.prepareProposersList(currentEpoch) - - if len(proposers) > 0: - let count = - try: - await prepareBeaconProposer(vc, proposers) - except ValidatorApiError as exc: - warn "Unable to prepare beacon proposers", slot = currentSlot, - epoch = currentEpoch, err_name = exc.name, - err_msg = exc.msg, reason = exc.getFailureReason() - 0 - except CancelledError as exc: - debug "Beacon proposer preparation processing was interrupted" - raise exc - except CatchableError as exc: - error "Unexpected error occured while preparing beacon proposers", - slot = currentSlot, epoch = currentEpoch, err_name = exc.name, - err_msg = exc.msg - 0 - debug "Beacon proposers prepared", - validators_count = vc.attachedValidators[].count(), - proposers_count = len(proposers), - prepared_count = count + 0 + debug "Beacon proposers prepared", + validators_count = vc.attachedValidators[].count(), + proposers_count = len(proposers), + prepared_count = count proc registerValidators*(service: DutiesServiceRef) {.async.} = let vc = service.client - let sres = vc.getCurrentSlot() - - var default: seq[SignedValidatorRegistrationV1] - if sres.isSome(): - let - genesisFork = vc.forks[0] - currentSlot = sres.get() - registrations = - try: - await vc.prepareRegistrationList(getTime(), genesisFork) - except CancelledError as exc: - debug "Validator registration preparation was interrupted", - slot = currentSlot, fork = genesisFork - raise exc - except CatchableError as exc: - error "Unexpected error occured while preparing validators " & - "registration data", slot = currentSlot, fork = genesisFork, - err_name = exc.name, err_msg = exc.msg - default + let + currentSlot = vc.getCurrentSlot().get(Slot(0)) + genesisFork = vc.forks[0] + registrations = + try: + await vc.prepareRegistrationList(getTime(), genesisFork) + except CancelledError as exc: + debug "Validator registration preparation was interrupted", + slot = currentSlot, fork = genesisFork + raise exc + except CatchableError as exc: + var default: seq[SignedValidatorRegistrationV1] + error "Unexpected error occured while preparing validators " & + "registration data", slot = currentSlot, fork = genesisFork, + err_name = exc.name, err_msg = exc.msg + default - let count = + count = if len(registrations) > 0: try: await registerValidator(vc, registrations) @@ -592,10 +589,10 @@ proc registerValidators*(service: DutiesServiceRef) {.async.} = else: 0 - if count > 0: - debug "Validators registered", slot = currentSlot, - beacon_nodes_count = count, registrations = len(registrations), - validators_count = vc.attachedValidators[].count() + if count > 0: + debug "Validators registered", slot = currentSlot, + beacon_nodes_count = count, registrations = len(registrations), + validators_count = vc.attachedValidators[].count() proc waitForNextSlot(service: DutiesServiceRef, serviceLoop: DutiesServiceLoop) {.async.} = @@ -605,35 +602,45 @@ proc waitForNextSlot(service: DutiesServiceRef, proc attesterDutiesLoop(service: DutiesServiceRef) {.async.} = let vc = service.client - - debug "Attester duties loop waiting for fork schedule update" - await vc.forksAvailable.wait() + debug "Attester duties loop is waiting for initialization" + await allFutures( + vc.preGenesisEvent.wait(), + vc.indicesAvailable.wait(), + vc.forksAvailable.wait() + ) doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point") while true: - await vc.pollForAttesterDuties() + await service.pollForAttesterDuties() await service.waitForNextSlot(AttesterLoop) proc proposerDutiesLoop(service: DutiesServiceRef) {.async.} = let vc = service.client - - debug "Proposer duties loop waiting for fork schedule update" - await vc.forksAvailable.wait() + debug "Proposer duties loop is waiting for initialization" + await allFutures( + vc.preGenesisEvent.wait(), + vc.indicesAvailable.wait(), + vc.forksAvailable.wait() + ) doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point") while true: - await vc.pollForBeaconProposers() + await service.pollForBeaconProposers() await service.waitForNextSlot(ProposerLoop) proc validatorIndexLoop(service: DutiesServiceRef) {.async.} = let vc = service.client + debug "Validator indices loop is waiting for initialization" + await vc.preGenesisEvent.wait() while true: - await vc.pollForValidatorIndices() + await service.pollForValidatorIndices() await service.waitForNextSlot(IndicesLoop) proc proposerPreparationsLoop(service: DutiesServiceRef) {.async.} = let vc = service.client - - debug "Beacon proposer preparation loop waiting for validator indices update" - await vc.indicesAvailable.wait() + debug "Beacon proposer preparation loop is waiting for initialization" + await allFutures( + vc.preGenesisEvent.wait(), + vc.indicesAvailable.wait() + ) while true: await service.prepareBeaconProposers() await service.waitForNextSlot(ProposerPreparationLoop) @@ -641,21 +648,28 @@ proc proposerPreparationsLoop(service: DutiesServiceRef) {.async.} = proc validatorRegisterLoop(service: DutiesServiceRef) {.async.} = let vc = service.client doAssert(vc.config.payloadBuilderEnable) - debug "Validator registration loop is waiting for initialization" - await allFutures(vc.indicesAvailable.wait(), vc.forksAvailable.wait()) + await allFutures( + vc.preGenesisEvent.wait(), + vc.indicesAvailable.wait(), + vc.forksAvailable.wait() + ) + doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point") while true: await service.registerValidators() await service.waitForNextSlot(ValidatorRegisterLoop) proc syncCommitteeDutiesLoop(service: DutiesServiceRef) {.async.} = let vc = service.client - - debug "Sync committee duties loop waiting for fork schedule update" - await vc.forksAvailable.wait() + debug "Sync committee duties loop is waiting for initialization" + await allFutures( + vc.preGenesisEvent.wait(), + vc.indicesAvailable.wait(), + vc.forksAvailable.wait() + ) doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point") while true: - await vc.pollForSyncCommitteeDuties() + await service.pollForSyncCommitteeDuties() await service.waitForNextSlot(SyncCommitteeLoop) template checkAndRestart(serviceLoop: DutiesServiceLoop, @@ -696,9 +710,13 @@ proc mainLoop(service: DutiesServiceRef) {.async.} = # become safe to combine loops, breaks and exception handlers. let breakLoop = try: - var futures = @[FutureBase(attestFut), FutureBase(proposeFut), - FutureBase(indicesFut), FutureBase(syncFut), - FutureBase(prepareFut)] + var futures = @[ + FutureBase(attestFut), + FutureBase(proposeFut), + FutureBase(indicesFut), + FutureBase(syncFut), + FutureBase(prepareFut) + ] if not(isNil(registerFut)): futures.add(FutureBase(registerFut)) discard await race(futures) checkAndRestart(AttesterLoop, attestFut, service.attesterDutiesLoop()) @@ -743,8 +761,6 @@ proc init*(t: typedesc[DutiesServiceRef], let res = DutiesServiceRef(name: ServiceName, client: vc, state: ServiceState.Initialized) debug "Initializing service" - # We query for indices first, to avoid empty queries for duties. - await vc.pollForValidatorIndices() return res proc start*(service: DutiesServiceRef) = diff --git a/beacon_chain/validator_client/fallback_service.nim b/beacon_chain/validator_client/fallback_service.nim index 30325fa82f..b64de0ab59 100644 --- a/beacon_chain/validator_client/fallback_service.nim +++ b/beacon_chain/validator_client/fallback_service.nim @@ -34,6 +34,10 @@ proc otherNodes*(vc: ValidatorClientRef): seq[BeaconNodeServerRef] = proc otherNodesCount*(vc: ValidatorClientRef): int = vc.beaconNodes.countIt(it.status != RestBeaconNodeStatus.Synced) +proc preGenesisNodes*(vc: ValidatorClientRef): seq[BeaconNodeServerRef] = + vc.beaconNodes.filterIt(it.status notin {RestBeaconNodeStatus.Synced, + RestBeaconNodeStatus.OptSynced}) + proc waitNodes*(vc: ValidatorClientRef, timeoutFut: Future[void], statuses: set[RestBeaconNodeStatus], roles: set[BeaconNodeRole], waitChanges: bool) {.async.} = @@ -230,7 +234,12 @@ proc checkNode(vc: ValidatorClientRef, proc checkNodes*(service: FallbackServiceRef): Future[bool] {.async.} = let - nodesToCheck = service.client.otherNodes() + vc = service.client + nodesToCheck = + if vc.genesisEvent.isSet(): + service.client.otherNodes() + else: + service.client.preGenesisNodes() pendingChecks = nodesToCheck.mapIt(service.client.checkNode(it)) var res = false try: @@ -252,6 +261,16 @@ proc mainLoop(service: FallbackServiceRef) {.async.} = service.state = ServiceState.Running debug "Service started" + try: + await vc.preGenesisEvent.wait() + except CancelledError: + debug "Service interrupted" + return + except CatchableError as exc: + warn "Service crashed with unexpected error", err_name = exc.name, + err_msg = exc.msg + return + while true: # This loop could look much more nicer/better, when # https://github.com/nim-lang/Nim/issues/19911 will be fixed, so it could @@ -279,8 +298,6 @@ proc init*(t: typedesc[FallbackServiceRef], state: ServiceState.Initialized, changesEvent: newAsyncEvent()) debug "Initializing service" - # Perform initial nodes check. - if await res.checkNodes(): res.changesEvent.fire() return res proc start*(service: FallbackServiceRef) = diff --git a/beacon_chain/validator_client/fork_service.nim b/beacon_chain/validator_client/fork_service.nim index b465d846c6..dbd1cb53e3 100644 --- a/beacon_chain/validator_client/fork_service.nim +++ b/beacon_chain/validator_client/fork_service.nim @@ -43,39 +43,33 @@ proc sortForks(forks: openArray[Fork]): Result[seq[Fork], cstring] {. ok(sortedForks) proc pollForFork(vc: ValidatorClientRef) {.async.} = - let sres = vc.getCurrentSlot() - if sres.isSome(): - let - currentSlot = sres.get() - currentEpoch = currentSlot.epoch() - - let forks = - try: - await vc.getForkSchedule(ApiStrategyKind.Best) - except ValidatorApiError as exc: - warn "Unable to retrieve fork schedule", - reason = exc.getFailureReason(), err_msg = exc.msg - return - except CancelledError as exc: - debug "Fork retrieval process was interrupted" - raise exc - except CatchableError as exc: - error "Unexpected error occured while getting fork information", - err_name = exc.name, err_msg = exc.msg + let forks = + try: + await vc.getForkSchedule(ApiStrategyKind.Best) + except ValidatorApiError as exc: + warn "Unable to retrieve fork schedule", + reason = exc.getFailureReason(), err_msg = exc.msg + return + except CancelledError as exc: + debug "Fork retrieval process was interrupted" + raise exc + except CatchableError as exc: + error "Unexpected error occured while getting fork information", + err_name = exc.name, err_msg = exc.msg + return + + let sortedForks = + block: + let res = sortForks(forks) + if res.isErr(): + warn "Invalid fork schedule received", reason = res.error() return + res.get() - let sortedForks = - block: - let res = sortForks(forks) - if res.isErr(): - warn "Invalid fork schedule received", reason = res.error() - return - res.get() - - if (len(vc.forks) == 0) or (vc.forks != sortedForks): - vc.forks = sortedForks - notice "Fork schedule updated", fork_schedule = sortedForks - vc.forksAvailable.fire() + if (len(vc.forks) == 0) or (vc.forks != sortedForks): + vc.forks = sortedForks + notice "Fork schedule updated", fork_schedule = sortedForks + vc.forksAvailable.fire() proc waitForNextEpoch(service: ForkServiceRef) {.async.} = let vc = service.client @@ -88,6 +82,16 @@ proc mainLoop(service: ForkServiceRef) {.async.} = service.state = ServiceState.Running debug "Service started" + try: + await vc.preGenesisEvent.wait() + except CancelledError: + debug "Service interrupted" + return + except CatchableError as exc: + warn "Service crashed with unexpected error", err_name = exc.name, + err_msg = exc.msg + return + while true: # This loop could look much more nicer/better, when # https://github.com/nim-lang/Nim/issues/19911 will be fixed, so it could @@ -114,7 +118,6 @@ proc init*(t: typedesc[ForkServiceRef], let res = ForkServiceRef(name: ServiceName, client: vc, state: ServiceState.Initialized) debug "Initializing service" - await vc.pollForFork() return res proc start*(service: ForkServiceRef) = diff --git a/beacon_chain/validator_client/sync_committee_service.nim b/beacon_chain/validator_client/sync_committee_service.nim index 9f07f00116..a4c70884f0 100644 --- a/beacon_chain/validator_client/sync_committee_service.nim +++ b/beacon_chain/validator_client/sync_committee_service.nim @@ -406,36 +406,68 @@ proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef, debug "Producing contribution and proofs", delay = delay await service.produceAndPublishContributions(slot, beaconBlockRoot, duties) -proc spawnSyncCommitteeTasks(service: SyncCommitteeServiceRef, slot: Slot) = +proc processSyncCommitteeTasks(service: SyncCommitteeServiceRef, + slot: Slot) {.async.} = let vc = service.client duties = vc.getSyncCommitteeDutiesForSlot(slot + 1) + timeout = vc.beaconClock.durationToNextSlot() - asyncSpawn service.publishSyncMessagesAndContributions(slot, duties) + try: + await service.publishSyncMessagesAndContributions(slot, + duties).wait(timeout) + except AsyncTimeoutError: + warn "Unable to publish sync committee messages and contributions in time", + slot = slot, timeout = timeout + except CancelledError as exc: + debug "Sync committee publish task has been interrupted" + raise exc + except CatchableError as exc: + error "Unexpected error encountered while processing sync committee tasks", + error_name = exc.name, error_message = exc.msg proc mainLoop(service: SyncCommitteeServiceRef) {.async.} = let vc = service.client service.state = ServiceState.Running debug "Service started" - debug "Sync committee duties loop waiting for fork schedule update" - await vc.forksAvailable.wait() + debug "Sync committee processing loop is waiting for initialization" + try: + await allFutures( + vc.preGenesisEvent.wait(), + vc.genesisEvent.wait(), + vc.indicesAvailable.wait(), + vc.forksAvailable.wait() + ) + except CancelledError: + debug "Service interrupted" + return + except CatchableError as exc: + warn "Service crashed with unexpected error", err_name = exc.name, + err_msg = exc.msg + return + + doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point") + var currentSlot: Opt[Slot] while true: # This loop could look much more nicer/better, when # https://github.com/nim-lang/Nim/issues/19911 will be fixed, so it could # become safe to combine loops, breaks and exception handlers. let breakLoop = try: - let sleepTime = - syncCommitteeMessageSlotOffset + vc.beaconClock.durationToNextSlot() - - let sres = vc.getCurrentSlot() - if sres.isSome(): - let currentSlot = sres.get() - service.spawnSyncCommitteeTasks(currentSlot) - await sleepAsync(sleepTime) - false + let + # We use zero offset here, because we do waiting in + # waitForBlockPublished(syncCommitteeMessageSlotOffset). + slot = await vc.checkedWaitForNextSlot(currentSlot, ZeroTimeDiff, + false) + if slot.isNone(): + debug "System time adjusted backwards significantly, exiting" + true + else: + currentSlot = slot + await service.processSyncCommitteeTasks(currentSlot.get()) + false except CancelledError: debug "Service interrupted" true