Skip to content

Commit

Permalink
fix: emit block event only after affected addresses are retrieved
Browse files Browse the repository at this point in the history
  • Loading branch information
slowbackspace committed Oct 30, 2024
1 parent fb87514 commit 0096bea
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 86 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- crash while fetching affected addresses for a block undergoing a rollback
- emit block event only after all data are retrieved making address subscriptions more reliable

## [2.1.0] - 2023-06-10

Expand Down
57 changes: 33 additions & 24 deletions src/events.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import EventEmitter from 'events';
import * as Server from './types/server.js';
import { prepareMessage } from './utils/message.js';
import { blockfrostAPI } from './utils/blockfrost-api.js';
import { getBlockData } from './utils/blockfrost-api.js';
import { Responses } from '@blockfrost/blockfrost-js';
import { promiseTimeout } from './utils/common.js';
import { getTransactionsWithDetails } from './utils/transaction.js';
Expand Down Expand Up @@ -30,18 +30,17 @@ export const _resetPreviousBlock = () => {

export const emitBlock = async (options?: EmitBlockOptions) => {
try {
const latestBlock = await blockfrostAPI.blocksLatest();
const { latestBlock, affectedAddresses } = await getBlockData();

if ((latestBlock.height ?? 0) < (previousBlock?.height ?? 0)) {
// rollback
logger.warn(
`Rollback detected. Previous block height: ${previousBlock?.height}, current block height: ${latestBlock.height}`,
`[BLOCK EMITTER] Rollback detected. Previous block height: ${previousBlock?.height}, current block height: ${latestBlock.height}`,
);
previousBlock = undefined;
}

const currentPreviousBlock = previousBlock;
// update previousBlock ASAP (before fetching missing blocks)) so next run won't have stale data

previousBlock = latestBlock;

Expand All @@ -54,28 +53,32 @@ export const emitBlock = async (options?: EmitBlockOptions) => {
if (missedBlocks > (options?.maxMissedBlocks ?? EMIT_MAX_MISSED_BLOCKS)) {
// too many missed blocks, skip emitting
logger.warn(
`newBlock emitter: Emitting skipped. Too many missed blocks: ${
`[BLOCK EMITTER] Emitting skipped. Too many missed blocks: ${
currentPreviousBlock.height + 1
}-${latestBlock.height - 1}`,
);
} else {
for (let index = currentPreviousBlock.height + 1; index < latestBlock.height; index++) {
// emit previously missed blocks
try {
const missedBlock = await promiseTimeout(
blockfrostAPI.blocks(index),
options?.fetchTimeoutMs ?? 2000,
);
const missedBlockData = (await promiseTimeout(
getBlockData({ block: index }),
options?.fetchTimeoutMs ?? 8000,
)) as Awaited<ReturnType<typeof getBlockData>>;

logger.warn(
`newBlock emitter: Emitting missed block: ${index} (current block: ${latestBlock.height})`,
`[BLOCK EMITTER] Emitting missed block: ${index} (current block: ${latestBlock.height})`,
);
events.emit(
'newBlock',
missedBlockData.latestBlock,
missedBlockData.affectedAddresses,
);
events.emit('newBlock', missedBlock);
} catch (error) {
if (error instanceof Error && error.message === 'PROMISE_TIMEOUT') {
logger.warn(`newBlock emitter: Skipping block ${index}. Fetch takes too long.`);
logger.warn(`[BLOCK EMITTER] Skipping block ${index}. Fetch takes too long.`);
} else {
logger.warn(`newBlock emitter: Skipping block ${index}.`);
logger.warn(`[BLOCK EMITTER] Skipping block ${index}.`);

logger.warn(error);
}
Expand All @@ -84,12 +87,12 @@ export const emitBlock = async (options?: EmitBlockOptions) => {
}
}

logger.info(`Emitting new block ${latestBlock.hash} (${latestBlock.height})`);
logger.info(`[BLOCK EMITTER] Emitting new block ${latestBlock.hash} (${latestBlock.height})`);
// emit latest block
events.emit('newBlock', latestBlock);
events.emit('newBlock', latestBlock, affectedAddresses);
}
} catch (error) {
logger.error('newBlock emitter error', error);
logger.error('[BLOCK EMITTER] error', error);
}
};

Expand Down Expand Up @@ -174,14 +177,20 @@ export const onBlock = async (
}
};

export const startEmitter = () => {
logger.info('Started block emitter');
setInterval(
emitBlock,
process.env.BLOCKFROST_BLOCK_LISTEN_INTERVAL
? Number.parseInt(process.env.BLOCKFROST_BLOCK_LISTEN_INTERVAL, 10)
: 5000,
);
export const startEmitter = async () => {
const interval = process.env.BLOCKFROST_BLOCK_LISTEN_INTERVAL
? Number.parseInt(process.env.BLOCKFROST_BLOCK_LISTEN_INTERVAL, 10)
: 5000;

const t0 = Date.now();

await emitBlock();
const t1 = Date.now();
const durationMs = t1 - t0;

const delay = Math.max(interval - durationMs, 0);

setTimeout(startEmitter, delay);
};

export { events };
35 changes: 16 additions & 19 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import getTransaction from './methods/get-transaction.js';
import submitTransaction from './methods/push-transaction.js';
import estimateFee from './methods/estimate-fee.js';
import getBalanceHistory from './methods/get-balance-history.js';
import { getAffectedAddresses } from './utils/address.js';
import { logger } from './utils/logger.js';
import { METRICS_COLLECTOR_INTERVAL_MS } from './constants/config.js';
import { getPort } from './utils/server.js';

import { createRequire } from 'module';
import { AffectedAddressesInBlock } from './types/events.js';
const require = createRequire(import.meta.url);
const packageJson = require('../package.json');

Expand Down Expand Up @@ -139,27 +139,24 @@ setInterval(() => {
}, 30_000);

startEmitter();
// this event is triggered with every new block see events.ts
events.on('newBlock', async (latestBlock: Responses['block_content']) => {
logger.info(
`Retrieving affected addressed for newBlock ${latestBlock.hash} ${latestBlock.height}`,
);
try {
// TODO: move fetching affected address for the block to newBlock emitter
// So if fetching affected addresses returns 404 due to block rollback it won't be emitted
const affectedAddresses = await getAffectedAddresses(latestBlock.height);

// this event is triggered with every new block see events.ts
events.on(
'newBlock',
async (latestBlock: Responses['block_content'], affectedAddresses: AffectedAddressesInBlock) => {
logger.debug(`Running newBlock callback for ${clients.length} clients`);
for (const client of clients) {
client.newBlockCallback(latestBlock, affectedAddresses);
try {
for (const client of clients) {
client.newBlockCallback(latestBlock, affectedAddresses);
}
} catch (error) {
logger.error(
`Failed to notify client about new block ${latestBlock.hash} ${latestBlock.height}.`,
error,
);
}
} catch (error) {
logger.error(
`Failed to notify clients about new block ${latestBlock.hash} ${latestBlock.height}.`,
error,
);
}
});
},
);

wss.on('connection', async (ws: Server.Ws) => {
ws.isAlive = true;
Expand Down
3 changes: 3 additions & 0 deletions src/types/events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { Responses } from '@blockfrost/blockfrost-js';

export type AffectedAddressesInBlock = Responses['block_content_addresses'];
19 changes: 0 additions & 19 deletions src/utils/address.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
} from '@blockfrost/blockfrost-js';
import memoizee from 'memoizee';
import { getAssetData, transformAsset } from './asset.js';
import { logger } from './logger.js';
import { assetMetadataLimiter, pLimiter } from './limiter.js';

export const deriveAddress = (
Expand Down Expand Up @@ -321,21 +320,3 @@ export const getStakingAccountTotal = async (
throw error;
}
};

export const getAffectedAddresses = async (
blockHeight: number | null,
): Promise<Responses['block_content_addresses']> => {
if (blockHeight === null) {
throw new Error('Cannot fetch block transactions. Invalid block height.');
}
try {
const addresses = await blockfrostAPI.blocksAddressesAll(blockHeight, { batchSize: 2 });

return addresses;
} catch (error) {
if (error instanceof BlockfrostServerError && error.status_code === 404) {
logger.warn(`Failed to fetch addresses for a block ${blockHeight}. Block not found.`);
}
throw error;
}
};
49 changes: 48 additions & 1 deletion src/utils/blockfrost-api.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { BlockFrostAPI } from '@blockfrost/blockfrost-js';
import { BlockFrostAPI, BlockfrostServerError, Responses } from '@blockfrost/blockfrost-js';
import { Options } from '@blockfrost/blockfrost-js/lib/types/index.js';
import { createRequire } from 'module';
import { BLOCKFROST_REQUEST_TIMEOUT } from '../constants/config.js';
import { logger } from './logger.js';
import { AffectedAddressesInBlock } from '../types/events.js';
const require = createRequire(import.meta.url);
const packageJson = require('../../package.json');

Expand All @@ -18,6 +20,51 @@ export const getBlockfrostClient = (options?: Partial<Options>) => {
});
};

export const getBlockData = async (options?: {
block?: number | string;
attempt?: number;
}): Promise<{
latestBlock: Responses['block_content'];
affectedAddresses: AffectedAddressesInBlock;
}> => {
// Fetch latest block and all addresses affected in the block
// Fetching of affected addresses may fail, there are 3 retry attempts before throwing an error
const MAX_ATTEMPTS = 3;
const latestBlock = options?.block
? await blockfrostAPI.blocks(options.block)
: await blockfrostAPI.blocksLatest();
let affectedAddresses: AffectedAddressesInBlock = [];

try {
affectedAddresses = await blockfrostAPI.blocksAddressesAll(latestBlock.hash, {
batchSize: 2,
});
} catch (error) {
if (
error instanceof BlockfrostServerError &&
error.status_code === 404 // Backend lagging, block rollback
) {
const attempt = options?.attempt ?? 0;

if (attempt < MAX_ATTEMPTS - 1) {
logger.warn(
`Unable to fetch addresses for block ${latestBlock.height} ${latestBlock.hash}. Block no longer on chain.`,
);
return getBlockData({ ...options, attempt: attempt + 1 });
} else {
throw error;
}
} else {
throw error;
}
}

return {
latestBlock,
affectedAddresses,
};
};

export const blockfrostAPI = getBlockfrostClient();

// Special client for tx submit due timeout that's necessary for handling "mempool full" error.
Expand Down
41 changes: 29 additions & 12 deletions test/unit/fixtures/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,24 @@
export const emitBlock = [
{
description: 'one block',
blocks: [
data: [
{
time: 1506203091,
height: 1,
hash: 'a',
block: {
time: 1506203091,
height: 1,
hash: 'a',
},
blockAddresses: [
{
address:
'addr1q9ld26v2lv8wvrxxmvg90pn8n8n5k6tdst06q2s856rwmvnueldzuuqmnsye359fqrk8hwvenjnqultn7djtrlft7jnq7dy7wv',
transactions: [
{
tx_hash: '1a0570af966fb355a7160e4f82d5a80b8681b7955f5d44bec0dce628516157f0',
},
],
},
],
},
],
},
Expand Down Expand Up @@ -46,7 +59,9 @@ export const emitMissedBlock = [
export const onBlock = [
{
description: "1 of client's addresses affected in a block (real data)",
subscribedAddresses: [{address:'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g77pj9h',cbor:true}],
subscribedAddresses: [
{ address: 'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g77pj9h', cbor: true },
],
mocks: {
block: {
time: 1639491936,
Expand Down Expand Up @@ -340,8 +355,8 @@ export const onBlock = [
],
},
txCbor: {
cbor: '0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef'
}
cbor: '0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef',
},
},
],
},
Expand Down Expand Up @@ -597,7 +612,9 @@ export const onBlock = [
],
},
txHash: '4d5beb45fe37b44b46f839811a3d3a1ac4a20911850740867a64f77d09372d0b',
txCbor: {cbor:'0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef'}
txCbor: {
cbor: '0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef',
},
},
],
},
Expand All @@ -606,9 +623,9 @@ export const onBlock = [
{
description: "2 of the client's addresses affected, 1 address affected in multiple txs",
subscribedAddresses: [
{address:'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g77pj9h'},
{address:'addr_test1wrsexavz37208qda7mwwu4k7hcpg26cz0ce86f5e9kul3hqzlh22t'},
{address:'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g770j9h'}, // random address, should not be affected
{ address: 'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g77pj9h' },
{ address: 'addr_test1wrsexavz37208qda7mwwu4k7hcpg26cz0ce86f5e9kul3hqzlh22t' },
{ address: 'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g770j9h' }, // random address, should not be affected
],
mocks: {
block: {
Expand Down Expand Up @@ -794,7 +811,7 @@ export const onBlock = [
{
description: 'subscribed address was not affected',
subscribedAddresses: [
{address:'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g770j9h'}, // random address, should not be affected
{ address: 'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g770j9h' }, // random address, should not be affected
],
mocks: {
block: {
Expand Down
Loading

0 comments on commit 0096bea

Please sign in to comment.