diff --git a/src/duty/attestation/attestation.service.ts b/src/duty/attestation/attestation.service.ts index e4df94e7..4c4dbee2 100644 --- a/src/duty/attestation/attestation.service.ts +++ b/src/duty/attestation/attestation.service.ts @@ -163,21 +163,27 @@ export class AttestationService { @TrackTask('get-attestation-committees') protected async getAttestationCommittees(stateSlot: Slot): Promise> { + const maxBatchSize = 1000; + let index = 0; const committees = new Map(); const processCommittees = async (epoch: Epoch) => { const stream = await this.clClient.getAttestationCommitteesInfo(stateSlot, epoch); const pipeline = chain([stream, parser(), pick({ filter: 'data' }), streamArray(), (data) => data.value]); + pipeline.on('data', async (committee) => { + // validator doesn't attests by default + committee.validators.forEach((index) => + this.summary.epoch(epoch).set({ epoch: epoch, val_id: Number(index), att_happened: false }), + ); + committees.set( + `${committee.index}_${committee.slot}`, + committee.validators.map((v) => Number(v)), + ); + index++; + if (index % maxBatchSize == 0) { + await unblock(); + } + }); return new Promise((resolve, reject) => { - pipeline.on('data', (committee) => { - // validator doesn't attests by default - committee.validators.forEach((index) => - this.summary.epoch(epoch).set({ epoch: epoch, val_id: Number(index), att_happened: false }), - ); - committees.set( - `${committee.index}_${committee.slot}`, - committee.validators.map((v) => Number(v)), - ); - }); pipeline.on('error', (error) => reject(error)); pipeline.on('end', () => resolve(true)); }).finally(() => pipeline.destroy());