Skip to content

Commit

Permalink
feat: add optional CBOR transaction representation to SUBSCRIBE_ADDRESS
Browse files Browse the repository at this point in the history
  • Loading branch information
iccicci committed Oct 25, 2024
1 parent 6bc48a1 commit bd7ebfd
Show file tree
Hide file tree
Showing 13 changed files with 203 additions and 129 deletions.
14 changes: 12 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,27 @@
"cSpell.words": [
"addresses",
"bech",
"bignumber",
"blockfrost",
"cardano",
"cbor",
"cbors",
"compat",
"emurgo",
"from",
"HEALTHCHECK",
"keyhash",
"libc",
"lovelaces",
"memoizee",
"tailwindcss",
"to",
"timelock",
"txid",
"txids",
"uninstantiated",
"utxos"
"utxo",
"utxos",
"uuidv4"
],
"search.exclude": {
"**/.yarn": true,
Expand Down
1 change: 1 addition & 0 deletions eslint.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export default [
'**/.eslintrc.js',
'**/jest.config.js',
'**/babel.config.js',
'**/test',
],
},
...fixupConfigRules(
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
"start": "yarn build && yarn node ./dist/server.js",
"start:profiler": "yarn build && NODE_ENV=production yarn node --prof ./dist/server.js",
"test-performance": "yarn build && yarn node ./dist/scripts/performance/index.js",
"test": "BLOCKFROST_PROJECT_ID='none' vitest",
"test": "BLOCKFROST_PROJECT_ID='mainnet_none' vitest",
"coverage": "vitest run --coverage",
"type-check": "tsc --project tsconfig.json"
},
"dependencies": {
"@blockfrost/blockfrost-js": "5.5.0",
"@blockfrost/blockfrost-js": "5.6.0",
"@emurgo/cardano-serialization-lib-nodejs": "^11.5.0",
"@sentry/integrations": "^7.85.0",
"@sentry/node": "^7.85.0",
Expand Down
26 changes: 20 additions & 6 deletions src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,22 @@ import { prepareMessage } from './utils/message.js';
import { blockfrostAPI } from './utils/blockfrost-api.js';
import { Responses } from '@blockfrost/blockfrost-js';
import { promiseTimeout } from './utils/common.js';
import { getTransactionsWithUtxo } from './utils/transaction.js';
import { getTransactionsWithDetails } from './utils/transaction.js';
import { TxNotification } from './types/response.js';
import { EMIT_MAX_MISSED_BLOCKS } from './constants/config.js';
import { logger } from './utils/logger.js';
import { SubscribeAddressOptions } from './types/message.js';

interface EmitBlockOptions {
fetchTimeoutMs?: number;
maxMissedBlocks?: number;
}

export interface SubscribedAddress {
address: string;
options?: SubscribeAddressOptions;
}

// eslint-disable-next-line unicorn/prefer-event-target
const events = new EventEmitter();

Expand Down Expand Up @@ -94,7 +100,7 @@ export const onBlock = async (
latestBlock: Responses['block_content'],
affectedAddressesInBlock: Responses['block_content_addresses'],
activeSubscriptions: Server.Subscription[] | undefined,
subscribedAddresses: string[] | undefined,
subscribedAddresses: SubscribedAddress[] | undefined,
) => {
// client has no subscription
if (!activeSubscriptions) return;
Expand All @@ -113,7 +119,7 @@ export const onBlock = async (

if (activeAddressSub && subscribedAddresses) {
const affectedAddresses = affectedAddressesInBlock.filter(a =>
subscribedAddresses.includes(a.address),
subscribedAddresses.some(addr => addr.address === a.address),
);

if (affectedAddresses.length === 0) {
Expand All @@ -122,15 +128,22 @@ export const onBlock = async (
}

// get list of unique txids (same tx could affect multiple client's addresses, but we want to fetch it only once)
const txIdsSet = new Set<string>();
const txsCbor: Record<string, boolean | undefined> = {};

for (const address of affectedAddresses) {
const { cbor } =
subscribedAddresses.find(subscription => subscription.address === address.address)!
.options || {};

for (const tx of address.transactions) {
txIdsSet.add(tx.tx_hash);
txsCbor[tx.tx_hash] ||= cbor;
}
}

// fetch txs that include client's address with their utxo data
const txs = await getTransactionsWithUtxo([...txIdsSet]);
const txs = await getTransactionsWithDetails(
Object.entries(txsCbor).map(([txid, cbor]) => ({ txid, cbor })),
);

const notifications: TxNotification[] = [];

Expand All @@ -149,6 +162,7 @@ export const onBlock = async (
txData: enhancedTx.txData,
txUtxos: enhancedTx.txUtxos,
txHash: enhancedTx.txData.hash,
txCbor: enhancedTx.txCbor,
});
}
}
Expand Down
21 changes: 14 additions & 7 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import * as Server from './types/server.js';
import { MESSAGES, WELCOME_MESSAGE, REPOSITORY_URL } from './constants/index.js';
import { getMessage, prepareErrorMessage, prepareMessage } from './utils/message.js';
import { MetricsCollector } from './utils/prometheus.js';
import { events, onBlock, startEmitter } from './events.js';
import { SubscribedAddress, events, onBlock, startEmitter } from './events.js';
import getServerInfo from './methods/get-server-info.js';
import getAccountInfo from './methods/get-account-info.js';
import getAccountUtxo from './methods/get-account-utxo.js';
Expand Down Expand Up @@ -67,7 +67,7 @@ const wss = new WebSocketServer({ server });
server.keepAliveTimeout = 65_000;

const activeSubscriptions: Record<string, Server.Subscription[]> = {};
const addressesSubscribed: Record<string, string[]> = {};
const addressesSubscribed: Record<string, SubscribedAddress[]> = {};

const clients: Array<{
clientId: string;
Expand Down Expand Up @@ -305,11 +305,18 @@ wss.on('connection', async (ws: Server.Ws) => {
}

case MESSAGES.SUBSCRIBE_ADDRESS: {
if (data.params.addresses && data.params.addresses.length > 0) {
for (const addressInput of data.params.addresses) {
if (!addressesSubscribed[clientId].includes(addressInput)) {
addressesSubscribed[clientId].push(addressInput);
}
const { addresses, options } = { options: {}, ...data.params };

if (addresses && addresses.length > 0) {
for (const address of addresses) {
const subscriptionIndex = addressesSubscribed[clientId].findIndex(
addr => addr.address === address,
);

// Subscribe to new address...
if (subscriptionIndex === -1) addressesSubscribed[clientId].push({ address, options });
// ... or update the options
else addressesSubscribed[clientId][subscriptionIndex].options = options;
}

const activeAddressSubIndex = activeSubscriptions[clientId].findIndex(
Expand Down
9 changes: 8 additions & 1 deletion src/types/message.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
export type Details = 'basic' | 'tokens' | 'tokenBalances' | 'txids' | 'txs';

export interface SubscribeAddressOptions {
cbor?: boolean;
}

export type Messages =
| {
id: number;
Expand Down Expand Up @@ -62,7 +66,10 @@ export type Messages =
| {
id: number;
command: 'SUBSCRIBE_ADDRESS';
params: { addresses: string[] };
params: {
addresses: string[];
options?: SubscribeAddressOptions;
};
}
| {
id: number;
Expand Down
1 change: 1 addition & 0 deletions src/types/response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export interface TxNotification {
txHash: string;
txData: TransformedTransaction;
txUtxos: TransformedTransactionUtxo;
txCbor?: string;
}

export interface BalanceHistoryData {
Expand Down
1 change: 1 addition & 0 deletions src/types/transactions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export interface TxIdsToTransactionsResponse {
txData: TransformedTransaction;
address: string;
txHash: string;
txCbor?: string;
}

export interface TransformedTransactionUtxo {
Expand Down
30 changes: 23 additions & 7 deletions src/utils/transaction.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { BlockfrostServerError, Responses } from '@blockfrost/blockfrost-js';
import * as Types from '../types/transactions.js';
import { TransformedTransaction, TransformedTransactionUtxo } from '../types/transactions.js';
import { TxIdsToTransactionsResponse } from '../types/transactions.js';
import { blockfrostAPI } from '../utils/blockfrost-api.js';
import { getAssetData, transformAsset } from './asset.js';
import { assetMetadataLimiter, pLimiter } from './limiter.js';
Expand Down Expand Up @@ -68,28 +68,44 @@ export const txIdsToTransactions = async (
return sortedTxs;
};

export const getTransactionsWithUtxo = async (
txids: string[],
): Promise<{ txData: TransformedTransaction; txUtxos: TransformedTransactionUtxo }[]> => {
export interface GetTransactionsDetails {
txid: string;
cbor?: boolean;
}

export const getTransactionsWithDetails = async (
txs: GetTransactionsDetails[],
): Promise<Pick<TxIdsToTransactionsResponse, 'txData' | 'txUtxos' | 'txCbor'>[]> => {
const txsData = await Promise.all(
txids.map(txid =>
txs.map(({ txid }) =>
pLimiter.add(() => blockfrostAPI.txs(txid).then(data => transformTransactionData(data)), {
throwOnTimeout: true,
}),
),
);
const txsUtxo = await Promise.all(
txids.map(txid =>
txs.map(({ txid }) =>
pLimiter.add(
() => blockfrostAPI.txsUtxos(txid).then(data => transformTransactionUtxo(data)),
{ throwOnTimeout: true },
),
),
);
const txsCbors = await Promise.all(
txs.map(({ txid, cbor }) =>
cbor
? pLimiter.add(() => blockfrostAPI.txsCbor(txid).then(data => data.cbor), {
throwOnTimeout: true,
})
: // eslint-disable-next-line unicorn/no-useless-undefined
Promise.resolve<undefined>(undefined),
),
);

return txids.map((_txid, index) => ({
return txs.map((_tx, index) => ({
txData: txsData[index],
txUtxos: txsUtxo[index],
txCbor: txsCbors[index],
}));
};

Expand Down
Loading

0 comments on commit bd7ebfd

Please sign in to comment.