From ee49cd625c71effc1d792e3c2e8e954f862d75f0 Mon Sep 17 00:00:00 2001 From: vgorkavenko Date: Mon, 20 Mar 2023 18:00:16 +0400 Subject: [PATCH 1/3] fix: double accounting --- src/common/functions/allSettled.ts | 13 +++++++++++ src/duty/attestation/attestation.metrics.ts | 3 ++- src/duty/attestation/attestation.rewards.ts | 1 - src/duty/attestation/attestation.service.ts | 3 ++- src/duty/duty.metrics.ts | 5 ++-- src/duty/duty.rewards.ts | 3 ++- src/duty/duty.service.ts | 24 +++++++++++--------- src/duty/propose/propose.metrics.ts | 3 ++- src/duty/state/state.metrics.ts | 3 ++- src/duty/summary/summary.metrics.ts | 3 ++- src/duty/sync/sync.metrics.ts | 5 ++-- src/duty/sync/sync.rewards.ts | 3 +-- src/duty/withdrawal/withdrawals.metrics.ts | 3 ++- src/duty/withdrawal/withdrawals.service.ts | 5 ++-- src/storage/clickhouse/clickhouse.service.ts | 3 ++- 15 files changed, 52 insertions(+), 28 deletions(-) create mode 100644 src/common/functions/allSettled.ts diff --git a/src/common/functions/allSettled.ts b/src/common/functions/allSettled.ts new file mode 100644 index 00000000..887f7c12 --- /dev/null +++ b/src/common/functions/allSettled.ts @@ -0,0 +1,13 @@ +// wait for all promises to resolve and throws if any error occurs +export async function allSettled(values: Promise[]): Promise { + const results = await Promise.allSettled(values); + const failed = results.filter((r: PromiseSettledResult) => r.status == 'rejected'); + if (failed.length > 0) { + throw new global.AggregateError( + failed.map((r: PromiseRejectedResult) => r.reason), + failed.flatMap((r: any) => Array.from(r.reason.message, r.reason.stack || '')).join('\n'), + ); + } + + return results.map((r: PromiseFulfilledResult) => r.value); +} diff --git a/src/duty/attestation/attestation.metrics.ts b/src/duty/attestation/attestation.metrics.ts index 03952a58..20595556 100644 --- a/src/duty/attestation/attestation.metrics.ts +++ b/src/duty/attestation/attestation.metrics.ts @@ -3,6 +3,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common'; import { ConfigService } from 'common/config'; import { Epoch } from 'common/eth-providers/consensus-provider/types'; +import { allSettled } from 'common/functions/allSettled'; import { PrometheusService, TrackTask, setOtherOperatorsMetric, setUserOperatorsMetric } from 'common/prometheus'; import { RegistryService, RegistrySourceOperator } from 'common/validators-registry'; import { ClickhouseService } from 'storage/clickhouse'; @@ -34,7 +35,7 @@ export class AttestationMetrics { this.logger.log('Calculating attestation metrics'); this.processedEpoch = epoch; this.operators = await this.registryService.getOperators(); - await Promise.all([ + await allSettled([ this.perfectAttestationsLastEpoch(), this.missedAttestationsLastEpoch(), this.highIncDelayAttestationsLastEpoch(), diff --git a/src/duty/attestation/attestation.rewards.ts b/src/duty/attestation/attestation.rewards.ts index eac9091c..06a240cf 100644 --- a/src/duty/attestation/attestation.rewards.ts +++ b/src/duty/attestation/attestation.rewards.ts @@ -89,6 +89,5 @@ export class AttestationRewards { att_penalty, }); } - return true; } } diff --git a/src/duty/attestation/attestation.service.ts b/src/duty/attestation/attestation.service.ts index d15dc26f..431568a7 100644 --- a/src/duty/attestation/attestation.service.ts +++ b/src/duty/attestation/attestation.service.ts @@ -9,6 +9,7 @@ import { streamArray } from 'stream-json/streamers/StreamArray'; import { ConfigService } from 'common/config'; import { ConsensusProviderService } from 'common/eth-providers'; import { Epoch, Slot } from 'common/eth-providers/consensus-provider/types'; +import { allSettled } from 'common/functions/allSettled'; import { range } from 'common/functions/range'; import { unblock } from 'common/functions/unblock'; import { PrometheusService, TrackTask } from 'common/prometheus'; @@ -177,7 +178,7 @@ export class AttestationService { }).finally(() => pipeline.destroy()); }; - await Promise.all([processCommittees(this.processedEpoch - 1), processCommittees(this.processedEpoch)]); + await allSettled([processCommittees(this.processedEpoch - 1), processCommittees(this.processedEpoch)]); return committees; } } diff --git a/src/duty/duty.metrics.ts b/src/duty/duty.metrics.ts index efe72b38..5497f7ec 100644 --- a/src/duty/duty.metrics.ts +++ b/src/duty/duty.metrics.ts @@ -4,6 +4,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common'; import { ConfigService } from 'common/config'; import { ConsensusProviderService } from 'common/eth-providers'; import { Epoch } from 'common/eth-providers/consensus-provider/types'; +import { allSettled } from 'common/functions/allSettled'; import { PrometheusService, TrackTask } from 'common/prometheus'; import { ClickhouseService } from '../storage'; @@ -34,7 +35,7 @@ export class DutyMetrics { @TrackTask('calc-all-duties-metrics') public async calculate(epoch: Epoch, possibleHighRewardValidators: string[]): Promise { this.logger.log('Calculating duties metrics of user validators'); - await Promise.all([ + await allSettled([ this.withPossibleHighReward(epoch, possibleHighRewardValidators), this.stateMetrics.calculate(epoch), this.withdrawalsMetrics.calculate(epoch), @@ -45,7 +46,7 @@ export class DutyMetrics { } private async withPossibleHighReward(epoch: Epoch, possibleHighRewardValidators: string[]): Promise { - await Promise.all([ + await allSettled([ this.attestationMetrics.calculate(epoch, possibleHighRewardValidators), this.proposeMetrics.calculate(epoch, possibleHighRewardValidators), this.syncMetrics.calculate(epoch, possibleHighRewardValidators), diff --git a/src/duty/duty.rewards.ts b/src/duty/duty.rewards.ts index c5b8d497..5e00bf5d 100644 --- a/src/duty/duty.rewards.ts +++ b/src/duty/duty.rewards.ts @@ -3,6 +3,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common'; import { ConfigService } from 'common/config'; import { Epoch } from 'common/eth-providers/consensus-provider/types'; +import { allSettled } from 'common/functions/allSettled'; import { PrometheusService, TrackTask } from 'common/prometheus'; import { AttestationRewards } from './attestation'; @@ -26,6 +27,6 @@ export class DutyRewards { // todo: 'Slashed' case // todo: 'Inactivity leak' case this.logger.log('Calculate rewards for all duties'); - await Promise.all([this.attestationRewards.calculate(epoch), this.syncRewards.calculate(epoch), this.proposerRewards.calculate(epoch)]); + await allSettled([this.attestationRewards.calculate(epoch), this.syncRewards.calculate(epoch), this.proposerRewards.calculate(epoch)]); } } diff --git a/src/duty/duty.service.ts b/src/duty/duty.service.ts index 26618aa5..0fb9312d 100644 --- a/src/duty/duty.service.ts +++ b/src/duty/duty.service.ts @@ -5,6 +5,7 @@ import { ConfigService } from 'common/config'; import { BlockHeaderResponse, ConsensusProviderService } from 'common/eth-providers'; import { BlockCacheService } from 'common/eth-providers/consensus-provider/block-cache'; import { Epoch, Slot } from 'common/eth-providers/consensus-provider/types'; +import { allSettled } from 'common/functions/allSettled'; import { range } from 'common/functions/range'; import { unblock } from 'common/functions/unblock'; import { PrometheusService, TrackTask } from 'common/prometheus'; @@ -41,18 +42,19 @@ export class DutyService { ) {} public async checkAndWrite({ epoch, stateSlot }: { epoch: Epoch; stateSlot: Slot }): Promise { - // Prefetch will be done before main checks because duty by state requests are heavy - // and while we wait for their responses we fetch blocks and headers. - // If for some reason prefetch task will be slower than duty by state requests, - // blocks and headers will be fetched inside tasks of checks - const [, , possibleHighRewardVals] = await Promise.all([ - this.prefetch(epoch), + const [, , possibleHighRewardVals] = await allSettled([ + // Prefetch will be done before main checks because duty by state requests are heavy + // and while we wait for their responses we fetch blocks and headers. + // If for some reason prefetch task will be slower than duty by state requests, + // blocks and headers will be fetched inside tasks of checks + // so, it can be optional when failed + this.prefetch(epoch).catch(() => undefined), this.checkAll(epoch, stateSlot), - // optional task to get possible high reward validators for head epoch + // Optional task to get possible high reward validators for head epoch // it's nice to have but not critical this.getPossibleHighRewardValidators().catch(() => []), ]); - await Promise.all([this.writeEpochMeta(epoch), this.writeSummary(epoch)]); + await allSettled([this.writeEpochMeta(epoch), this.writeSummary(epoch)]); this.summary.clear(); await this.storage.updateEpochProcessing({ epoch, is_stored: true }); return possibleHighRewardVals; @@ -62,7 +64,7 @@ export class DutyService { protected async checkAll(epoch: Epoch, stateSlot: Slot): Promise { this.summary.clear(); this.logger.log('Checking duties of validators'); - await Promise.all([ + await allSettled([ this.state.check(epoch, stateSlot), this.attestation.check(epoch, stateSlot), this.sync.check(epoch, stateSlot), @@ -85,7 +87,7 @@ export class DutyService { const toFetch = slots.map((s) => [this.clClient.getBlockHeader(s), this.clClient.getBlockInfo(s)]).flat(); while (toFetch.length > 0) { const chunk = toFetch.splice(0, 32); - await Promise.all(chunk); + await allSettled(chunk); } } @@ -95,7 +97,7 @@ export class DutyService { const headEpoch = Math.trunc(actualSlotHeader.header.message.slot / this.config.get('FETCH_INTERVAL_SLOTS')); this.logger.log('Getting possible high reward validator indexes'); const propDependentRoot = await this.clClient.getDutyDependentRoot(headEpoch); - const [sync, prop] = await Promise.all([ + const [sync, prop] = await allSettled([ this.clClient.getSyncCommitteeInfo('finalized', headEpoch), this.clClient.getCanonicalProposerDuties(headEpoch, propDependentRoot), ]); diff --git a/src/duty/propose/propose.metrics.ts b/src/duty/propose/propose.metrics.ts index 7d367dc5..a9dd2ac2 100644 --- a/src/duty/propose/propose.metrics.ts +++ b/src/duty/propose/propose.metrics.ts @@ -3,6 +3,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common'; import { ConfigService } from 'common/config'; import { Epoch } from 'common/eth-providers/consensus-provider/types'; +import { allSettled } from 'common/functions/allSettled'; import { PrometheusService, TrackTask, setOtherOperatorsMetric, setUserOperatorsMetric } from 'common/prometheus'; import { RegistryService, RegistrySourceOperator } from 'common/validators-registry'; import { ClickhouseService } from 'storage'; @@ -24,7 +25,7 @@ export class ProposeMetrics { this.logger.log('Calculating propose metrics'); this.processedEpoch = epoch; this.operators = await this.registryService.getOperators(); - await Promise.all([this.goodProposes(), this.missProposes(), this.highRewardMissProposes(possibleHighRewardValidators)]); + await allSettled([this.goodProposes(), this.missProposes(), this.highRewardMissProposes(possibleHighRewardValidators)]); } private async goodProposes() { diff --git a/src/duty/state/state.metrics.ts b/src/duty/state/state.metrics.ts index 7a630d5c..929634d3 100644 --- a/src/duty/state/state.metrics.ts +++ b/src/duty/state/state.metrics.ts @@ -4,6 +4,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common'; import { ConfigService } from 'common/config'; import { Epoch } from 'common/eth-providers/consensus-provider/types'; +import { allSettled } from 'common/functions/allSettled'; import { Owner, PrometheusService, PrometheusValStatus, TrackTask, setUserOperatorsMetric } from 'common/prometheus'; import { RegistryService, RegistrySourceOperator } from 'common/validators-registry'; import { LidoSourceService } from 'common/validators-registry/lido-source'; @@ -29,7 +30,7 @@ export class StateMetrics { this.logger.log('Calculating state metrics'); this.processedEpoch = epoch; this.operators = await this.registryService.getOperators(); - await Promise.all([ + await allSettled([ this.operatorsIdentifies(), this.nosStats(), this.userValidatorsStats(), diff --git a/src/duty/summary/summary.metrics.ts b/src/duty/summary/summary.metrics.ts index fbe6021d..4cfc7474 100644 --- a/src/duty/summary/summary.metrics.ts +++ b/src/duty/summary/summary.metrics.ts @@ -4,6 +4,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common'; import { ConfigService } from 'common/config'; import { ConsensusProviderService } from 'common/eth-providers'; import { Epoch } from 'common/eth-providers/consensus-provider/types'; +import { allSettled } from 'common/functions/allSettled'; import { PrometheusService, TrackTask, setUserOperatorsMetric } from 'common/prometheus'; import { RegistryService, RegistrySourceOperator } from 'common/validators-registry'; import { ClickhouseService } from 'storage'; @@ -32,7 +33,7 @@ export class SummaryMetrics { this.logger.log('Calculating propose metrics'); this.processedEpoch = epoch; this.operators = await this.registryService.getOperators(); - await Promise.all([this.userRewards(), this.avgChainRewards(), this.common()]); + await allSettled([this.userRewards(), this.avgChainRewards(), this.common()]); } private async common() { diff --git a/src/duty/sync/sync.metrics.ts b/src/duty/sync/sync.metrics.ts index 5a63f341..320d1687 100644 --- a/src/duty/sync/sync.metrics.ts +++ b/src/duty/sync/sync.metrics.ts @@ -3,6 +3,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common'; import { ConfigService } from 'common/config'; import { Epoch } from 'common/eth-providers/consensus-provider/types'; +import { allSettled } from 'common/functions/allSettled'; import { PrometheusService, TrackTask, setOtherOperatorsMetric, setUserOperatorsMetric } from 'common/prometheus'; import { RegistryService, RegistrySourceOperator } from 'common/validators-registry'; import { ClickhouseService } from 'storage'; @@ -28,7 +29,7 @@ export class SyncMetrics { this.processedEpoch = epoch; this.operators = await this.registryService.getOperators(); - await Promise.all([ + await allSettled([ this.userAvgSyncPercent(), this.otherAvgSyncPercent(), this.operatorAvgSyncPercents(), @@ -55,7 +56,7 @@ export class SyncMetrics { private async syncParticipation(possibleHighRewardValidators: string[]) { const chainAvgSyncPercent = await this.chainAvgSyncPercent(); - await Promise.all([ + await allSettled([ this.goodSyncParticipationLastEpoch(chainAvgSyncPercent), this.badSyncParticipationLastEpoch(chainAvgSyncPercent), this.badSyncParticipationLastNEpoch(chainAvgSyncPercent), diff --git a/src/duty/sync/sync.rewards.ts b/src/duty/sync/sync.rewards.ts index ca8b4346..592ab284 100644 --- a/src/duty/sync/sync.rewards.ts +++ b/src/duty/sync/sync.rewards.ts @@ -16,7 +16,7 @@ export class SyncRewards { protected readonly summary: SummaryService, ) {} - public calculate(epoch: Epoch) { + public async calculate(epoch: Epoch) { const epochMeta = this.summary.epoch(epoch).getMeta(); let sync_earned_reward = 0; let sync_missed_reward = 0; @@ -30,6 +30,5 @@ export class SyncRewards { this.summary.epoch(epoch).set({ epoch, val_id: v.val_id, sync_earned_reward, sync_penalty, sync_missed_reward }); } - return true; } } diff --git a/src/duty/withdrawal/withdrawals.metrics.ts b/src/duty/withdrawal/withdrawals.metrics.ts index 86a8c7d5..9540546e 100644 --- a/src/duty/withdrawal/withdrawals.metrics.ts +++ b/src/duty/withdrawal/withdrawals.metrics.ts @@ -3,6 +3,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common'; import { ConfigService } from 'common/config'; import { Epoch } from 'common/eth-providers/consensus-provider/types'; +import { allSettled } from 'common/functions/allSettled'; import { PrometheusService, TrackTask, setUserOperatorsMetric } from 'common/prometheus'; import { RegistryService, RegistrySourceOperator } from 'common/validators-registry'; import { ClickhouseService } from 'storage/clickhouse'; @@ -29,7 +30,7 @@ export class WithdrawalsMetrics { this.logger.log('Calculating withdrawals metrics'); this.processedEpoch = epoch; this.operators = this.registryService.getOperators(); - await Promise.all([this.userNodeOperatorsWithdrawalsStats(), this.otherChainWithdrawalsStats()]); + await allSettled([this.userNodeOperatorsWithdrawalsStats(), this.otherChainWithdrawalsStats()]); } private async userNodeOperatorsWithdrawalsStats() { diff --git a/src/duty/withdrawal/withdrawals.service.ts b/src/duty/withdrawal/withdrawals.service.ts index 12e4ba06..aa8cabb6 100644 --- a/src/duty/withdrawal/withdrawals.service.ts +++ b/src/duty/withdrawal/withdrawals.service.ts @@ -4,11 +4,12 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common'; import { ConfigService } from 'common/config'; import { BlockInfoResponse, ConsensusProviderService } from 'common/eth-providers'; import { Epoch } from 'common/eth-providers/consensus-provider/types'; +import { allSettled } from 'common/functions/allSettled'; +import { range } from 'common/functions/range'; import { PrometheusService, TrackTask } from 'common/prometheus'; import { RegistryService } from 'common/validators-registry'; import { ClickhouseService } from 'storage/clickhouse'; -import { range } from '../../common/functions/range'; import { SummaryService } from '../summary'; @Injectable() @@ -30,7 +31,7 @@ export class WithdrawalsService { const firstSlotInEpoch = epoch * slotsInEpoch; const slots: number[] = range(firstSlotInEpoch, firstSlotInEpoch + slotsInEpoch); const toFetch = slots.map((s) => this.clClient.getBlockInfo(s)); - const blocks = (await Promise.all(toFetch)).filter((b) => b != undefined) as BlockInfoResponse[]; + const blocks = (await allSettled(toFetch)).filter((b) => b != undefined) as BlockInfoResponse[]; for (const block of blocks) { const withdrawals = block.message.body.execution_payload.withdrawals ?? []; for (const withdrawal of withdrawals) { diff --git a/src/storage/clickhouse/clickhouse.service.ts b/src/storage/clickhouse/clickhouse.service.ts index ba83fa07..83876b0d 100644 --- a/src/storage/clickhouse/clickhouse.service.ts +++ b/src/storage/clickhouse/clickhouse.service.ts @@ -4,6 +4,7 @@ import { Inject, Injectable, LoggerService, OnModuleInit } from '@nestjs/common' import { ConfigService } from 'common/config'; import { Epoch } from 'common/eth-providers/consensus-provider/types'; +import { allSettled } from 'common/functions/allSettled'; import { retrier } from 'common/functions/retrier'; import { PrometheusService, TrackTask } from 'common/prometheus'; import { EpochMeta, ValidatorDutySummary } from 'duty/summary'; @@ -129,7 +130,7 @@ export class ClickhouseService implements OnModuleInit { let summaryChunk = []; let chunkSize = 0; const writeChunks = async (indexesChunk, summaryChunk) => { - return await Promise.all([ + return await allSettled([ this.retry( async () => await this.db.insert({ From 86422161239c18d183a83756bb596942166dfa3e Mon Sep 17 00:00:00 2001 From: vgorkavenko Date: Mon, 20 Mar 2023 19:48:48 +0400 Subject: [PATCH 2/3] fix: test --- src/duty/attestation/attestation.service.ts | 2 +- test/duties.e2e-spec.ts | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/duty/attestation/attestation.service.ts b/src/duty/attestation/attestation.service.ts index 431568a7..6766d719 100644 --- a/src/duty/attestation/attestation.service.ts +++ b/src/duty/attestation/attestation.service.ts @@ -68,7 +68,7 @@ export class AttestationService { protected async processAttestation(epoch: Epoch, attestation: SlotAttestation, committee: number[]) { const attestationFlags = { source: [], target: [], head: [] }; - const [canonHead, canonTarget, canonSource] = await Promise.all([ + const [canonHead, canonTarget, canonSource] = await allSettled([ this.getCanonSlotRoot(attestation.slot), this.getCanonSlotRoot(attestation.target_epoch * this.slotsInEpoch), this.getCanonSlotRoot(attestation.source_epoch * this.slotsInEpoch), diff --git a/test/duties.e2e-spec.ts b/test/duties.e2e-spec.ts index 42b81124..452269d8 100644 --- a/test/duties.e2e-spec.ts +++ b/test/duties.e2e-spec.ts @@ -17,6 +17,7 @@ import { RegistryService } from 'common/validators-registry'; import { ClickhouseService } from 'storage'; import { ValStatus } from '../src/common/eth-providers'; +import { allSettled } from '../src/common/functions/allSettled'; import { DutyModule, DutyService } from '../src/duty'; const MikroORMMockProvider = { @@ -200,7 +201,7 @@ describe('Duties', () => { stateSlot = Number(process.env['TEST_STATE_SLOT']); epochNumber = Number(process.env['TEST_EPOCH_NUMBER']); - await Promise.all([dutyService['prefetch'](epochNumber), dutyService['checkAll'](epochNumber, stateSlot)]); + await allSettled([dutyService['prefetch'](epochNumber), dutyService['checkAll'](epochNumber, stateSlot)]); summaryToSave = [...dutyService['summary'].epoch(epochNumber).values()].map((v) => { return { ...v, From 990426f54fc9737c8e67711c25f3fa80d767e34f Mon Sep 17 00:00:00 2001 From: Vladimir Gorkavenko <32727352+vgorkavenko@users.noreply.github.com> Date: Mon, 20 Mar 2023 20:19:49 +0400 Subject: [PATCH 3/3] fix: eventloop (#139) --- src/duty/attestation/attestation.rewards.ts | 7 +++ src/duty/state/state.service.ts | 45 +++++++++++--------- src/storage/clickhouse/clickhouse.service.ts | 2 + 3 files changed, 34 insertions(+), 20 deletions(-) diff --git a/src/duty/attestation/attestation.rewards.ts b/src/duty/attestation/attestation.rewards.ts index 06a240cf..510ecff3 100644 --- a/src/duty/attestation/attestation.rewards.ts +++ b/src/duty/attestation/attestation.rewards.ts @@ -4,6 +4,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common'; import { ConfigService } from 'common/config'; import { Epoch } from 'common/eth-providers/consensus-provider/types'; +import { unblock } from 'common/functions/unblock'; import { PrometheusService } from 'common/prometheus'; import { SummaryService } from '../summary'; @@ -42,6 +43,8 @@ export class AttestationRewards { Math.trunc(perfect.source * epochMeta.state.base_reward * 32 * sourceParticipation) + Math.trunc(perfect.target * epochMeta.state.base_reward * 32 * targetParticipation) + Math.trunc(perfect.head * epochMeta.state.base_reward * 32 * headParticipation); + const maxBatchSize = 10000; + let index = 0; for (const v of this.summary.epoch(epoch).values()) { // Calculate attestation rewards from previous epoch const pv = this.summary.epoch(epoch - 1).get(v.val_id); @@ -88,6 +91,10 @@ export class AttestationRewards { att_missed_reward, att_penalty, }); + index++; + if (index % maxBatchSize == 0) { + await unblock(); + } } } } diff --git a/src/duty/state/state.service.ts b/src/duty/state/state.service.ts index 578aab95..df32604d 100644 --- a/src/duty/state/state.service.ts +++ b/src/duty/state/state.service.ts @@ -5,15 +5,17 @@ import { chain } from 'stream-chain'; import { parser } from 'stream-json'; import { pick } from 'stream-json/filters/Pick'; import { streamArray } from 'stream-json/streamers/StreamArray'; +import { batch } from 'stream-json/utils/Batch'; import { ConfigService } from 'common/config'; import { ConsensusProviderService, StateValidatorResponse, ValStatus } from 'common/eth-providers'; import { Epoch, Slot } from 'common/eth-providers/consensus-provider/types'; +import { bigNumberSqrt } from 'common/functions/bigNumberSqrt'; +import { unblock } from 'common/functions/unblock'; import { PrometheusService, TrackTask } from 'common/prometheus'; import { RegistryService } from 'common/validators-registry'; import { ClickhouseService } from 'storage/clickhouse'; -import { bigNumberSqrt } from '../../common/functions/bigNumberSqrt'; import { SummaryService } from '../summary'; @Injectable() @@ -37,27 +39,30 @@ export class StateService { this.logger.log('Processing all validators state'); let activeValidatorsCount = 0; let activeValidatorsEffectiveBalance = 0n; - const pipeline = chain([readStream, parser(), pick({ filter: 'data' }), streamArray()]); + const pipeline = chain([readStream, parser(), pick({ filter: 'data' }), streamArray(), batch({ batchSize: 10000 })]); await new Promise((resolve, reject) => { - pipeline.on('data', async (data) => { - const state: StateValidatorResponse = data.value; - const index = Number(state.index); - const operator = this.registry.getOperatorKey(state.validator.pubkey); - this.summary.epoch(epoch).set({ - epoch, - val_id: index, - val_pubkey: state.validator.pubkey, - val_nos_id: operator?.operatorIndex, - val_nos_name: operator?.operatorName, - val_slashed: state.validator.slashed, - val_status: state.status, - val_balance: BigInt(state.balance), - val_effective_balance: BigInt(state.validator.effective_balance), - }); - if ([ValStatus.ActiveOngoing, ValStatus.ActiveExiting, ValStatus.ActiveSlashed].includes(state.status)) { - activeValidatorsCount++; - activeValidatorsEffectiveBalance += BigInt(state.validator.effective_balance) / BigInt(10 ** 9); + pipeline.on('data', async (batch) => { + for (const data of batch) { + const state: StateValidatorResponse = data.value; + const index = Number(state.index); + const operator = this.registry.getOperatorKey(state.validator.pubkey); + this.summary.epoch(epoch).set({ + epoch, + val_id: index, + val_pubkey: state.validator.pubkey, + val_nos_id: operator?.operatorIndex, + val_nos_name: operator?.operatorName, + val_slashed: state.validator.slashed, + val_status: state.status, + val_balance: BigInt(state.balance), + val_effective_balance: BigInt(state.validator.effective_balance), + }); + if ([ValStatus.ActiveOngoing, ValStatus.ActiveExiting, ValStatus.ActiveSlashed].includes(state.status)) { + activeValidatorsCount++; + activeValidatorsEffectiveBalance += BigInt(state.validator.effective_balance) / BigInt(10 ** 9); + } } + await unblock(); }); pipeline.on('error', (error) => reject(error)); pipeline.on('end', () => resolve(true)); diff --git a/src/storage/clickhouse/clickhouse.service.ts b/src/storage/clickhouse/clickhouse.service.ts index 83876b0d..f70dab61 100644 --- a/src/storage/clickhouse/clickhouse.service.ts +++ b/src/storage/clickhouse/clickhouse.service.ts @@ -6,6 +6,7 @@ import { ConfigService } from 'common/config'; import { Epoch } from 'common/eth-providers/consensus-provider/types'; import { allSettled } from 'common/functions/allSettled'; import { retrier } from 'common/functions/retrier'; +import { unblock } from 'common/functions/unblock'; import { PrometheusService, TrackTask } from 'common/prometheus'; import { EpochMeta, ValidatorDutySummary } from 'duty/summary'; @@ -167,6 +168,7 @@ export class ClickhouseService implements OnModuleInit { await writeChunks(indexesChunk, summaryChunk); [indexesChunk, summaryChunk] = [[], []]; chunkSize = 0; + await unblock(); } } if (chunkSize) await writeChunks(indexesChunk, summaryChunk);