From 0096beaf7c74ffd9277b1f21dc71380739ccaeea Mon Sep 17 00:00:00 2001 From: slowbackspace Date: Tue, 29 Oct 2024 14:08:14 +0100 Subject: [PATCH] fix: emit block event only after affected addresses are retrieved --- CHANGELOG.md | 1 + src/events.ts | 57 ++++++++++++++++------------ src/server.ts | 35 ++++++++--------- src/types/events.ts | 3 ++ src/utils/address.ts | 19 ---------- src/utils/blockfrost-api.ts | 49 +++++++++++++++++++++++- test/unit/fixtures/events.ts | 41 ++++++++++++++------ test/unit/tests/utils/events.test.ts | 30 +++++++++------ 8 files changed, 149 insertions(+), 86 deletions(-) create mode 100644 src/types/events.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f7f5fe1..31466fee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - crash while fetching affected addresses for a block undergoing a rollback +- emit block event only after all data are retrieved making address subscriptions more reliable ## [2.1.0] - 2023-06-10 diff --git a/src/events.ts b/src/events.ts index ded4674a..87fd623c 100644 --- a/src/events.ts +++ b/src/events.ts @@ -1,7 +1,7 @@ import EventEmitter from 'events'; import * as Server from './types/server.js'; import { prepareMessage } from './utils/message.js'; -import { blockfrostAPI } from './utils/blockfrost-api.js'; +import { getBlockData } from './utils/blockfrost-api.js'; import { Responses } from '@blockfrost/blockfrost-js'; import { promiseTimeout } from './utils/common.js'; import { getTransactionsWithDetails } from './utils/transaction.js'; @@ -30,18 +30,17 @@ export const _resetPreviousBlock = () => { export const emitBlock = async (options?: EmitBlockOptions) => { try { - const latestBlock = await blockfrostAPI.blocksLatest(); + const { latestBlock, affectedAddresses } = await getBlockData(); if ((latestBlock.height ?? 0) < (previousBlock?.height ?? 0)) { // rollback logger.warn( - `Rollback detected. Previous block height: ${previousBlock?.height}, current block height: ${latestBlock.height}`, + `[BLOCK EMITTER] Rollback detected. Previous block height: ${previousBlock?.height}, current block height: ${latestBlock.height}`, ); previousBlock = undefined; } const currentPreviousBlock = previousBlock; - // update previousBlock ASAP (before fetching missing blocks)) so next run won't have stale data previousBlock = latestBlock; @@ -54,7 +53,7 @@ export const emitBlock = async (options?: EmitBlockOptions) => { if (missedBlocks > (options?.maxMissedBlocks ?? EMIT_MAX_MISSED_BLOCKS)) { // too many missed blocks, skip emitting logger.warn( - `newBlock emitter: Emitting skipped. Too many missed blocks: ${ + `[BLOCK EMITTER] Emitting skipped. Too many missed blocks: ${ currentPreviousBlock.height + 1 }-${latestBlock.height - 1}`, ); @@ -62,20 +61,24 @@ export const emitBlock = async (options?: EmitBlockOptions) => { for (let index = currentPreviousBlock.height + 1; index < latestBlock.height; index++) { // emit previously missed blocks try { - const missedBlock = await promiseTimeout( - blockfrostAPI.blocks(index), - options?.fetchTimeoutMs ?? 2000, - ); + const missedBlockData = (await promiseTimeout( + getBlockData({ block: index }), + options?.fetchTimeoutMs ?? 8000, + )) as Awaited>; logger.warn( - `newBlock emitter: Emitting missed block: ${index} (current block: ${latestBlock.height})`, + `[BLOCK EMITTER] Emitting missed block: ${index} (current block: ${latestBlock.height})`, + ); + events.emit( + 'newBlock', + missedBlockData.latestBlock, + missedBlockData.affectedAddresses, ); - events.emit('newBlock', missedBlock); } catch (error) { if (error instanceof Error && error.message === 'PROMISE_TIMEOUT') { - logger.warn(`newBlock emitter: Skipping block ${index}. Fetch takes too long.`); + logger.warn(`[BLOCK EMITTER] Skipping block ${index}. Fetch takes too long.`); } else { - logger.warn(`newBlock emitter: Skipping block ${index}.`); + logger.warn(`[BLOCK EMITTER] Skipping block ${index}.`); logger.warn(error); } @@ -84,12 +87,12 @@ export const emitBlock = async (options?: EmitBlockOptions) => { } } - logger.info(`Emitting new block ${latestBlock.hash} (${latestBlock.height})`); + logger.info(`[BLOCK EMITTER] Emitting new block ${latestBlock.hash} (${latestBlock.height})`); // emit latest block - events.emit('newBlock', latestBlock); + events.emit('newBlock', latestBlock, affectedAddresses); } } catch (error) { - logger.error('newBlock emitter error', error); + logger.error('[BLOCK EMITTER] error', error); } }; @@ -174,14 +177,20 @@ export const onBlock = async ( } }; -export const startEmitter = () => { - logger.info('Started block emitter'); - setInterval( - emitBlock, - process.env.BLOCKFROST_BLOCK_LISTEN_INTERVAL - ? Number.parseInt(process.env.BLOCKFROST_BLOCK_LISTEN_INTERVAL, 10) - : 5000, - ); +export const startEmitter = async () => { + const interval = process.env.BLOCKFROST_BLOCK_LISTEN_INTERVAL + ? Number.parseInt(process.env.BLOCKFROST_BLOCK_LISTEN_INTERVAL, 10) + : 5000; + + const t0 = Date.now(); + + await emitBlock(); + const t1 = Date.now(); + const durationMs = t1 - t0; + + const delay = Math.max(interval - durationMs, 0); + + setTimeout(startEmitter, delay); }; export { events }; diff --git a/src/server.ts b/src/server.ts index dd7f6037..5600b0e6 100644 --- a/src/server.ts +++ b/src/server.ts @@ -22,12 +22,12 @@ import getTransaction from './methods/get-transaction.js'; import submitTransaction from './methods/push-transaction.js'; import estimateFee from './methods/estimate-fee.js'; import getBalanceHistory from './methods/get-balance-history.js'; -import { getAffectedAddresses } from './utils/address.js'; import { logger } from './utils/logger.js'; import { METRICS_COLLECTOR_INTERVAL_MS } from './constants/config.js'; import { getPort } from './utils/server.js'; import { createRequire } from 'module'; +import { AffectedAddressesInBlock } from './types/events.js'; const require = createRequire(import.meta.url); const packageJson = require('../package.json'); @@ -139,27 +139,24 @@ setInterval(() => { }, 30_000); startEmitter(); -// this event is triggered with every new block see events.ts -events.on('newBlock', async (latestBlock: Responses['block_content']) => { - logger.info( - `Retrieving affected addressed for newBlock ${latestBlock.hash} ${latestBlock.height}`, - ); - try { - // TODO: move fetching affected address for the block to newBlock emitter - // So if fetching affected addresses returns 404 due to block rollback it won't be emitted - const affectedAddresses = await getAffectedAddresses(latestBlock.height); +// this event is triggered with every new block see events.ts +events.on( + 'newBlock', + async (latestBlock: Responses['block_content'], affectedAddresses: AffectedAddressesInBlock) => { logger.debug(`Running newBlock callback for ${clients.length} clients`); - for (const client of clients) { - client.newBlockCallback(latestBlock, affectedAddresses); + try { + for (const client of clients) { + client.newBlockCallback(latestBlock, affectedAddresses); + } + } catch (error) { + logger.error( + `Failed to notify client about new block ${latestBlock.hash} ${latestBlock.height}.`, + error, + ); } - } catch (error) { - logger.error( - `Failed to notify clients about new block ${latestBlock.hash} ${latestBlock.height}.`, - error, - ); - } -}); + }, +); wss.on('connection', async (ws: Server.Ws) => { ws.isAlive = true; diff --git a/src/types/events.ts b/src/types/events.ts new file mode 100644 index 00000000..0ac5d901 --- /dev/null +++ b/src/types/events.ts @@ -0,0 +1,3 @@ +import { Responses } from '@blockfrost/blockfrost-js'; + +export type AffectedAddressesInBlock = Responses['block_content_addresses']; diff --git a/src/utils/address.ts b/src/utils/address.ts index 0c403d31..e7cb35a5 100644 --- a/src/utils/address.ts +++ b/src/utils/address.ts @@ -8,7 +8,6 @@ import { } from '@blockfrost/blockfrost-js'; import memoizee from 'memoizee'; import { getAssetData, transformAsset } from './asset.js'; -import { logger } from './logger.js'; import { assetMetadataLimiter, pLimiter } from './limiter.js'; export const deriveAddress = ( @@ -321,21 +320,3 @@ export const getStakingAccountTotal = async ( throw error; } }; - -export const getAffectedAddresses = async ( - blockHeight: number | null, -): Promise => { - if (blockHeight === null) { - throw new Error('Cannot fetch block transactions. Invalid block height.'); - } - try { - const addresses = await blockfrostAPI.blocksAddressesAll(blockHeight, { batchSize: 2 }); - - return addresses; - } catch (error) { - if (error instanceof BlockfrostServerError && error.status_code === 404) { - logger.warn(`Failed to fetch addresses for a block ${blockHeight}. Block not found.`); - } - throw error; - } -}; diff --git a/src/utils/blockfrost-api.ts b/src/utils/blockfrost-api.ts index 90bc72c3..937eaa60 100644 --- a/src/utils/blockfrost-api.ts +++ b/src/utils/blockfrost-api.ts @@ -1,7 +1,9 @@ -import { BlockFrostAPI } from '@blockfrost/blockfrost-js'; +import { BlockFrostAPI, BlockfrostServerError, Responses } from '@blockfrost/blockfrost-js'; import { Options } from '@blockfrost/blockfrost-js/lib/types/index.js'; import { createRequire } from 'module'; import { BLOCKFROST_REQUEST_TIMEOUT } from '../constants/config.js'; +import { logger } from './logger.js'; +import { AffectedAddressesInBlock } from '../types/events.js'; const require = createRequire(import.meta.url); const packageJson = require('../../package.json'); @@ -18,6 +20,51 @@ export const getBlockfrostClient = (options?: Partial) => { }); }; +export const getBlockData = async (options?: { + block?: number | string; + attempt?: number; +}): Promise<{ + latestBlock: Responses['block_content']; + affectedAddresses: AffectedAddressesInBlock; +}> => { + // Fetch latest block and all addresses affected in the block + // Fetching of affected addresses may fail, there are 3 retry attempts before throwing an error + const MAX_ATTEMPTS = 3; + const latestBlock = options?.block + ? await blockfrostAPI.blocks(options.block) + : await blockfrostAPI.blocksLatest(); + let affectedAddresses: AffectedAddressesInBlock = []; + + try { + affectedAddresses = await blockfrostAPI.blocksAddressesAll(latestBlock.hash, { + batchSize: 2, + }); + } catch (error) { + if ( + error instanceof BlockfrostServerError && + error.status_code === 404 // Backend lagging, block rollback + ) { + const attempt = options?.attempt ?? 0; + + if (attempt < MAX_ATTEMPTS - 1) { + logger.warn( + `Unable to fetch addresses for block ${latestBlock.height} ${latestBlock.hash}. Block no longer on chain.`, + ); + return getBlockData({ ...options, attempt: attempt + 1 }); + } else { + throw error; + } + } else { + throw error; + } + } + + return { + latestBlock, + affectedAddresses, + }; +}; + export const blockfrostAPI = getBlockfrostClient(); // Special client for tx submit due timeout that's necessary for handling "mempool full" error. diff --git a/test/unit/fixtures/events.ts b/test/unit/fixtures/events.ts index 7ece5241..1da4100d 100644 --- a/test/unit/fixtures/events.ts +++ b/test/unit/fixtures/events.ts @@ -3,11 +3,24 @@ export const emitBlock = [ { description: 'one block', - blocks: [ + data: [ { - time: 1506203091, - height: 1, - hash: 'a', + block: { + time: 1506203091, + height: 1, + hash: 'a', + }, + blockAddresses: [ + { + address: + 'addr1q9ld26v2lv8wvrxxmvg90pn8n8n5k6tdst06q2s856rwmvnueldzuuqmnsye359fqrk8hwvenjnqultn7djtrlft7jnq7dy7wv', + transactions: [ + { + tx_hash: '1a0570af966fb355a7160e4f82d5a80b8681b7955f5d44bec0dce628516157f0', + }, + ], + }, + ], }, ], }, @@ -46,7 +59,9 @@ export const emitMissedBlock = [ export const onBlock = [ { description: "1 of client's addresses affected in a block (real data)", - subscribedAddresses: [{address:'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g77pj9h',cbor:true}], + subscribedAddresses: [ + { address: 'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g77pj9h', cbor: true }, + ], mocks: { block: { time: 1639491936, @@ -340,8 +355,8 @@ export const onBlock = [ ], }, txCbor: { - cbor: '0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef' - } + cbor: '0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef', + }, }, ], }, @@ -597,7 +612,9 @@ export const onBlock = [ ], }, txHash: '4d5beb45fe37b44b46f839811a3d3a1ac4a20911850740867a64f77d09372d0b', - txCbor: {cbor:'0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef'} + txCbor: { + cbor: '0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef', + }, }, ], }, @@ -606,9 +623,9 @@ export const onBlock = [ { description: "2 of the client's addresses affected, 1 address affected in multiple txs", subscribedAddresses: [ - {address:'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g77pj9h'}, - {address:'addr_test1wrsexavz37208qda7mwwu4k7hcpg26cz0ce86f5e9kul3hqzlh22t'}, - {address:'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g770j9h'}, // random address, should not be affected + { address: 'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g77pj9h' }, + { address: 'addr_test1wrsexavz37208qda7mwwu4k7hcpg26cz0ce86f5e9kul3hqzlh22t' }, + { address: 'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g770j9h' }, // random address, should not be affected ], mocks: { block: { @@ -794,7 +811,7 @@ export const onBlock = [ { description: 'subscribed address was not affected', subscribedAddresses: [ - {address:'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g770j9h'}, // random address, should not be affected + { address: 'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g770j9h' }, // random address, should not be affected ], mocks: { block: { diff --git a/test/unit/tests/utils/events.test.ts b/test/unit/tests/utils/events.test.ts index 27a3df35..68a2898a 100644 --- a/test/unit/tests/utils/events.test.ts +++ b/test/unit/tests/utils/events.test.ts @@ -16,23 +16,28 @@ describe('events', () => { for (const fixture of fixtures.emitBlock) { test(fixture.description, async () => { // @ts-ignore - const mock1 = sinon.stub(blockfrostAPI, 'blocksLatest').resolves(fixture.blocks[0]); + const mock1 = sinon.stub(blockfrostAPI, 'blocksLatest').resolves(fixture.data[0].block); + const mockBlockAddresses = sinon + .stub(blockfrostAPI, 'blocksAddressesAll') + .resolves(fixture.data[0].blockAddresses ?? []); const callback = vi.fn(); events.on('newBlock', callback); await emitBlock(); + mock1.restore(); + mockBlockAddresses.restore(); events.removeAllListeners(); _resetPreviousBlock(); expect(callback).toBeCalledTimes(1); - expect(callback).toHaveBeenCalledWith(fixture.blocks[0]); + expect(callback).toHaveBeenCalledWith(fixture.data[0].block, fixture.data[0].blockAddresses); }); } for (const fixture of fixtures.emitMissedBlock) { test(fixture.description, async () => { const mock1 = sinon.stub(blockfrostAPI, 'blocks'); - + const mockBlockAddresses = sinon.stub(blockfrostAPI, 'blocksAddressesAll').resolves([]); mock1 .onCall(0) // @ts-ignore @@ -57,21 +62,23 @@ describe('events', () => { await emitBlock(); expect(callback).toBeCalledTimes(1); - expect(callback).toHaveBeenNthCalledWith(1, fixture.latestBlocks[0]); + expect(callback).toHaveBeenNthCalledWith(1, fixture.latestBlocks[0], []); await emitBlock({ maxMissedBlocks: 10 }); expect(callback).toBeCalledTimes(4); // one time from the first emit, 3 times from 2nd emit (2 missed blocks + latest) - expect(callback).toHaveBeenNthCalledWith(2, fixture.missedBlocks[0]); - expect(callback).toHaveBeenNthCalledWith(3, fixture.missedBlocks[1]); - expect(callback).toHaveBeenNthCalledWith(4, fixture.latestBlocks[1]); + expect(callback).toHaveBeenNthCalledWith(2, fixture.missedBlocks[0], []); + expect(callback).toHaveBeenNthCalledWith(3, fixture.missedBlocks[1], []); + expect(callback).toHaveBeenNthCalledWith(4, fixture.latestBlocks[1], []); mock1.restore(); mock2.restore(); + mockBlockAddresses.restore(); events.removeAllListeners(); _resetPreviousBlock(); }); test(`${fixture.description} - timeout test`, async () => { const blockLatestMock = sinon.stub(blockfrostAPI, 'blocksLatest'); + const mockBlockAddresses = sinon.stub(blockfrostAPI, 'blocksAddressesAll').resolves([]); blockLatestMock .onCall(0) @@ -96,13 +103,13 @@ describe('events', () => { setTimeout(() => { // @ts-ignore resolve(fixture.missedBlocks[1]); - }, 4000); + }, 10000); }), ); // delay 2nd response by 5s, which should trigger timeout await emitBlock(); expect(callback).toBeCalledTimes(1); - expect(callback).toHaveBeenNthCalledWith(1, fixture.latestBlocks[0]); + expect(callback).toHaveBeenNthCalledWith(1, fixture.latestBlocks[0], []); await emitBlock({ fetchTimeoutMs: 3000, maxMissedBlocks: 10 }); @@ -120,10 +127,11 @@ describe('events', () => { }); expect(callback).toBeCalledTimes(3); // one time from the first emit, 2 times from 2nd emit (just 1 missed block (because 2nd block will timeout) + latest block) - expect(callback).toHaveBeenNthCalledWith(2, fixture.missedBlocks[0]); - expect(callback).toHaveBeenNthCalledWith(3, fixture.latestBlocks[1]); + expect(callback).toHaveBeenNthCalledWith(2, fixture.missedBlocks[0], []); + expect(callback).toHaveBeenNthCalledWith(3, fixture.latestBlocks[1], []); blockLatestMock.restore(); blockMock.restore(); + mockBlockAddresses.restore(); events.removeAllListeners(); _resetPreviousBlock(); });