Skip to content

Commit

Permalink
refactor: remove pLimiter code repetition
Browse files Browse the repository at this point in the history
  • Loading branch information
iccicci committed Nov 19, 2024
1 parent c94d421 commit f46a862
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 76 deletions.
112 changes: 51 additions & 61 deletions src/utils/address.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
} from '@blockfrost/blockfrost-js';
import memoizee from 'memoizee';
import { getAssetData, transformAsset } from './asset.js';
import { assetMetadataLimiter, pLimiter } from './limiter.js';
import { assetMetadataLimiter, limiter } from './limiter.js';

export const deriveAddress = (
publicKey: string,
Expand Down Expand Up @@ -69,9 +69,7 @@ export const discoverAddresses = async (
);

addressCount++;
const promise = pLimiter.add(() => blockfrostAPI.addresses(address), {
throwOnTimeout: true,
});
const promise = limiter(() => blockfrostAPI.addresses(address));

promisesBundle.push({ address, promise, path });
}
Expand Down Expand Up @@ -113,17 +111,15 @@ export const addressesToUtxos = async (
const promises = addresses.map(item =>
item.data === 'empty'
? []
: pLimiter.add(
() =>
// change batchSize to fetch only 1 page at a time (each page has 100 utxos)
blockfrostAPI.addressesUtxosAll(item.address, { batchSize: 1 }).catch(error => {
if (error instanceof BlockfrostServerError && error.status_code === 404) {
return [];
} else {
throw error;
}
}),
{ throwOnTimeout: true },
: limiter(() =>
// change batchSize to fetch only 1 page at a time (each page has 100 utxos)
blockfrostAPI.addressesUtxosAll(item.address, { batchSize: 1 }).catch(error => {
if (error instanceof BlockfrostServerError && error.status_code === 404) {
return [];
} else {
throw error;
}
}),
),
);

Expand Down Expand Up @@ -167,15 +163,13 @@ export const utxosWithBlocks = async (
if (utxo.data === 'empty') continue;

for (const utxoData of utxo.data) {
const promise = pLimiter.add(
() =>
blockfrostAPI.blocks(utxoData.block).then(blockData => ({
address: utxo.address,
path: utxo.path,
utxoData: utxoData,
blockInfo: blockData,
})),
{ throwOnTimeout: true },
const promise = limiter(() =>
blockfrostAPI.blocks(utxoData.block).then(blockData => ({
address: utxo.address,
path: utxo.path,
utxoData: utxoData,
blockInfo: blockData,
})),
);

promisesBundle.push(promise);
Expand All @@ -198,27 +192,25 @@ export const addressesToTxIds = async (
for (const item of addresses) {
if (item.data === 'empty') continue;

const promise = pLimiter.add(
() =>
// 1 page (100 txs) per address at a time should be more efficient default value
// compared to fetching 10 pages (1000 txs) per address
blockfrostAPI
.addressesTransactionsAll(item.address, { batchSize: 1 })
.then(data => ({
address: item.address,
data,
}))
.catch(error => {
if (error instanceof BlockfrostServerError && error.status_code === 404) {
return {
address: item.address,
data: [],
};
} else {
throw error;
}
}),
{ throwOnTimeout: true },
const promise = limiter(() =>
// 1 page (100 txs) per address at a time should be more efficient default value
// compared to fetching 10 pages (1000 txs) per address
blockfrostAPI
.addressesTransactionsAll(item.address, { batchSize: 1 })
.then(data => ({
address: item.address,
data,
}))
.catch(error => {
if (error instanceof BlockfrostServerError && error.status_code === 404) {
return {
address: item.address,
data: [],
};
} else {
throw error;
}
}),
);

promisesBundle.push(promise);
Expand All @@ -244,22 +236,20 @@ export const getAddressesData = async (
}

const promises = addresses.map(addr =>
pLimiter.add(
() =>
blockfrostAPI.addressesTotal(addr.address).catch(error => {
if (error.status_code === 404) {
return {
address: addr.address,
path: addr.path,
tx_count: 0,
received_sum: [{ unit: 'lovelace', quantity: '0' }],
sent_sum: [{ unit: 'lovelace', quantity: '0' }],
};
} else {
throw new Error(error);
}
}),
{ throwOnTimeout: true },
limiter(() =>
blockfrostAPI.addressesTotal(addr.address).catch(error => {
if (error.status_code === 404) {
return {
address: addr.address,
path: addr.path,
tx_count: 0,
received_sum: [{ unit: 'lovelace', quantity: '0' }],
sent_sum: [{ unit: 'lovelace', quantity: '0' }],
};
} else {
throw new Error(error);
}
}),
),
);
const responses = await Promise.all(promises);
Expand Down
7 changes: 6 additions & 1 deletion src/utils/limiter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import PQueue from 'p-queue';
import PQueue, { QueueAddOptions } from 'p-queue';
import {
FIAT_RATES_REQUESTS_PER_SEC,
BLOCKFROST_REQUEST_CONCURRENCY,
Expand Down Expand Up @@ -28,3 +28,8 @@ pLimiter.on('error', error => {
ratesLimiter.on('error', error => {
logger.warn(`ratesLimiter error`, error);
});

export const limiter = <T>(
task: () => PromiseLike<T>,
options?: Exclude<QueueAddOptions, 'throwOnTimeout'>,
) => pLimiter.add<T>(task, { ...options, throwOnTimeout: true });
19 changes: 5 additions & 14 deletions src/utils/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as Types from '../types/transactions.js';
import { TxIdsToTransactionsResponse } from '../types/transactions.js';
import { blockfrostAPI } from '../utils/blockfrost-api.js';
import { getAssetData, transformAsset } from './asset.js';
import { assetMetadataLimiter, pLimiter } from './limiter.js';
import { assetMetadataLimiter, limiter } from './limiter.js';
import { logger } from './logger.js';

export const sortTransactionsCmp = <
Expand Down Expand Up @@ -47,9 +47,7 @@ export const txIdsToTransactions = async (

for (const item of txIdsPerAddress) {
for (const txId of item.txIds) {
promises.push(
pLimiter.add(() => fetchTxWithUtxo(txId, item.address), { throwOnTimeout: true }),
);
promises.push(limiter(() => fetchTxWithUtxo(txId, item.address)));
}
}

Expand All @@ -73,25 +71,18 @@ export const getTransactionsWithDetails = async (
): Promise<Pick<TxIdsToTransactionsResponse, 'txData' | 'txUtxos' | 'txCbor'>[]> => {
const txsData = await Promise.all(
txs.map(({ txId }) =>
pLimiter.add(() => blockfrostAPI.txs(txId).then(data => transformTransactionData(data)), {
throwOnTimeout: true,
}),
limiter(() => blockfrostAPI.txs(txId).then(data => transformTransactionData(data))),
),
);
const txsUtxo = await Promise.all(
txs.map(({ txId }) =>
pLimiter.add(
() => blockfrostAPI.txsUtxos(txId).then(data => transformTransactionUtxo(data)),
{ throwOnTimeout: true },
),
limiter(() => blockfrostAPI.txsUtxos(txId).then(data => transformTransactionUtxo(data))),
),
);
const txsCbors = await Promise.all(
txs.map(({ txId, cbor }) =>
cbor
? pLimiter.add(() => blockfrostAPI.txsCbor(txId).then(data => data.cbor), {
throwOnTimeout: true,
})
? limiter(() => blockfrostAPI.txsCbor(txId).then(data => data.cbor))
: // eslint-disable-next-line unicorn/no-useless-undefined
Promise.resolve<undefined>(undefined),
),
Expand Down

0 comments on commit f46a862

Please sign in to comment.