Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore/connection limiter #226

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18,378 changes: 0 additions & 18,378 deletions .pnp.cjs

This file was deleted.

2,047 changes: 0 additions & 2,047 deletions .pnp.loader.mjs

This file was deleted.

2 changes: 1 addition & 1 deletion .yarnrc.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
nodeLinker: pnp
nodeLinker: node-modules

plugins:
- path: .yarn/plugins/yarn-plugin-nixify.cjs
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed
- Remove client from the list of clients on missed heartbeat

### Changed

- process exits after health check fails for more than `HEALTHCHECK_FAIL_THRESHOLD_MS` (default 60s)
Expand Down
2 changes: 1 addition & 1 deletion default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ rec {
mkdir -p $out/bin
cat <<EOF > $out/bin/${name}
#!${pkgs.runtimeShell}
${pkgs.nodejs-18_x}/bin/node --require "$out/libexec/source/.pnp.cjs" $out/libexec/source/dist/server.js
${pkgs.nodejs-18_x}/bin/node $out/libexec/source/dist/server.js
EOF
chmod +x $out/bin/${name}
'';
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"lint": "eslint ./src/**/*.ts",
"start": "yarn build && yarn node ./dist/server.js",
"start:profiler": "yarn build && NODE_ENV=production yarn node --prof ./dist/server.js",
"test-performance": "yarn build && yarn node ./dist/scripts/performance/index.ts",
"test-performance": "yarn build && yarn node ./dist/scripts/performance/index.js",
"test": "BLOCKFROST_PROJECT_ID='none' vitest",
"coverage": "vitest run --coverage",
"type-check": "tsc --project tsconfig.json"
Expand Down
10 changes: 9 additions & 1 deletion src/constants/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ export const FIAT_RATES_ENABLE_ON_TESTNET = false;
export const BLOCKFROST_REQUEST_CONCURRENCY = 500;

// How often should metrics be updated
export const METRICS_COLLECTOR_INTERVAL_MS = 10_000;
export const METRICS_COLLECTOR_INTERVAL_MS = 20_000;

// If healthcheck repeatedly fails for duration longer than this constant the process exits
export const HEALTHCHECK_FAIL_THRESHOLD_MS = 6 * METRICS_COLLECTOR_INTERVAL_MS; // 6 health checks

// Timeout for requests dispatched by blockfrost sdk
export const BLOCKFROST_REQUEST_TIMEOUT = 35_000;

export const CONNECTION_LIMITER = {
WINDOW_MS: 5000,
CONNECTIONS: 20,
};
4 changes: 2 additions & 2 deletions src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ export const onBlock = async (
const activeBlockSub = activeSubscriptions?.find(index => index.type === 'block');

if (activeBlockSub) {
const message = prepareMessage(activeBlockSub.id, latestBlock);
const message = prepareMessage(activeBlockSub.id, clientId, latestBlock);

ws.send(message);
}
Expand Down Expand Up @@ -155,7 +155,7 @@ export const onBlock = async (
}

logger.debug(`Sent tx notification to client ${clientId}`);
const message = prepareMessage(activeAddressSub.id, notifications);
const message = prepareMessage(activeAddressSub.id, clientId, notifications);

ws.send(message);
}
Expand Down
6 changes: 3 additions & 3 deletions src/methods/estimate-fee.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { prepareMessage, prepareErrorMessage } from '../utils/message.js';
import { blockfrostAPI } from '../utils/blockfrost-api.js';
import { logger } from '../utils/logger.js';

export default async (id: number): Promise<string> => {
export default async (id: number, clientId: string): Promise<string> => {
try {
const epochsLatest = await blockfrostAPI.epochsLatest();
const epochsParameters = await blockfrostAPI.epochsParameters(epochsLatest.epoch);
Expand All @@ -11,12 +11,12 @@ export default async (id: number): Promise<string> => {
lovelacePerByte: epochsParameters.min_fee_a,
};

const message = prepareMessage(id, result);
const message = prepareMessage(id, clientId, result);

return message;
} catch (error) {
logger.error(error);
const message = prepareErrorMessage(id, error);
const message = prepareErrorMessage(id, clientId, error);

return message;
}
Expand Down
9 changes: 5 additions & 4 deletions src/methods/get-account-info.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,28 +144,29 @@ export const getAccountInfo = async (

if (duration > 7) {
logger.warn(
`Warning: getAccountInfo-${details} took ${duration}s. Transactions: ${txCount} Addresses: ${_addressesCount} Tokens: ${tokensBalances.length} `,
`Warning: getAccountInfo-${details} ${publicKey} took ${duration}s. Transactions: ${txCount} Addresses: ${_addressesCount} Tokens: ${tokensBalances.length} `,
);
}

return accountInfo;
};

export default async (
id: number,
msgId: number,
clientId: string,
publicKey: string,
details: Messages.Details,
page = 1,
pageSize = 25,
): Promise<string> => {
try {
const accountInfo = await getAccountInfo(publicKey, details, page, pageSize);
const message = prepareMessage(id, accountInfo);
const message = prepareMessage(msgId, clientId, accountInfo);

return message;
} catch (error) {
logger.error(error);
const message = prepareErrorMessage(id, error);
const message = prepareErrorMessage(msgId, clientId, error);

return message;
}
Expand Down
8 changes: 4 additions & 4 deletions src/methods/get-account-utxo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import { prepareMessage, prepareErrorMessage } from '../utils/message.js';
import { discoverAddresses, addressesToUtxos, utxosWithBlocks } from '../utils/address.js';
import { logger } from '../utils/logger.js';

export default async (id: number, publicKey: string): Promise<string> => {
export default async (msgId: number, clientId: string, publicKey: string): Promise<string> => {
if (!publicKey) {
const message = prepareMessage(id, 'Missing parameter descriptor');
const message = prepareMessage(msgId, 'Missing parameter descriptor', clientId);

return message;
}
Expand All @@ -17,12 +17,12 @@ export default async (id: number, publicKey: string): Promise<string> => {
const utxosResult = await addressesToUtxos(addresses);
const utxosBlocks = await utxosWithBlocks(utxosResult);

const message = prepareMessage(id, utxosBlocks);
const message = prepareMessage(msgId, clientId, utxosBlocks);

return message;
} catch (error) {
logger.error(error);
const message = prepareErrorMessage(id, error);
const message = prepareErrorMessage(msgId, clientId, error);

return message;
}
Expand Down
14 changes: 6 additions & 8 deletions src/methods/get-balance-history.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ export const getAccountBalanceHistory = async (
const { txIds, addresses } = await getAccountTransactionHistory({ accountPublicKey: publicKey });

// fetch all transactions and filter only those that are from within from-to interval
logger.debug('getAccountBalanceHistory', `Fetching ${txIds.length} txs`);
const txs = (
await txIdsToTransactions(
txIds.map(tx => ({
Expand All @@ -148,7 +147,6 @@ export const getAccountBalanceHistory = async (
// txs are sorted from newest to oldest, we need exact opposite
.reverse();

logger.debug('getAccountBalanceHistory', `aggregating ${txs.length} txs`);
const bins = await aggregateTransactions(txs, addresses, groupBy);

if (blockfrostAPI.options.network !== 'mainnet' && !FIAT_RATES_ENABLE_ON_TESTNET) {
Expand All @@ -157,7 +155,6 @@ export const getAccountBalanceHistory = async (
}

// fetch fiat rate for each bin
logger.debug('getAccountBalanceHistory', `fetching fiat rates for ${bins.length} time intervals`);
const binRatesPromises = bins.map(bin => getRatesForDate(bin.time));
const binRates = await Promise.allSettled(binRatesPromises);

Expand All @@ -174,14 +171,15 @@ export const getAccountBalanceHistory = async (
};

export default async (
id: number,
msgId: number,
clientId: string,
publicKey: string,
groupBy: number,
from?: number,
to?: number,
): Promise<string> => {
if (!publicKey) {
const message = prepareMessage(id, 'Missing parameter descriptor');
const message = prepareMessage(msgId, clientId, 'Missing parameter descriptor');

return message;
}
Expand All @@ -190,18 +188,18 @@ export default async (

try {
const data = await getAccountBalanceHistory(publicKey, groupBy, from, to);
const message = prepareMessage(id, data);
const message = prepareMessage(msgId, clientId, data);

return message;
} catch (error) {
logger.error(error);
const message = prepareErrorMessage(id, error);
const message = prepareErrorMessage(msgId, clientId, error);

return message;
} finally {
const t2 = Date.now();
const diff = t2 - t1;

logger.debug(`getBalanceHistory for public key ${publicKey} took ${diff} ms`);
logger.debug(`[${clientId}] getBalanceHistory for public key ${publicKey} took ${diff} ms`);
}
};
10 changes: 7 additions & 3 deletions src/methods/get-block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ import { prepareMessage, prepareErrorMessage } from '../utils/message.js';
import { blockfrostAPI } from '../utils/blockfrost-api.js';
import { logger } from '../utils/logger.js';

export default async (id: number, hashOrNumber: string | number): Promise<string> => {
export default async (
msgId: number,
clientId: string,
hashOrNumber: string | number,
): Promise<string> => {
try {
const block = await blockfrostAPI.blocks(hashOrNumber);
const message = prepareMessage(id, block);
const message = prepareMessage(msgId, clientId, block);

return message;
} catch (error) {
logger.error(error);
const message = prepareErrorMessage(id, error);
const message = prepareErrorMessage(msgId, clientId, error);

return message;
}
Expand Down
6 changes: 3 additions & 3 deletions src/methods/get-latest-block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ import { prepareMessage, prepareErrorMessage } from '../utils/message.js';
import { blockfrostAPI } from '../utils/blockfrost-api.js';
import { logger } from '../utils/logger.js';

export default async (id: number): Promise<string> => {
export default async (id: number, clientId: string): Promise<string> => {
try {
const block = await blockfrostAPI.blocksLatest();
const message = prepareMessage(id, block);
const message = prepareMessage(id, clientId, block);

return message;
} catch (error) {
logger.error(error);
const message = prepareErrorMessage(id, error);
const message = prepareErrorMessage(id, clientId, error);

return message;
}
Expand Down
6 changes: 3 additions & 3 deletions src/methods/get-server-info.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ export const getServerInfo = async () => {
return serverInfo;
};

export default async (id: number): Promise<string> => {
export default async (id: number, clientId: string): Promise<string> => {
try {
const serverInfo = await getServerInfo();
const message = prepareMessage(id, serverInfo);
const message = prepareMessage(id, clientId, serverInfo);

return message;
} catch (error) {
const message = prepareErrorMessage(id, error);
const message = prepareErrorMessage(id, clientId, error);

return message;
}
Expand Down
6 changes: 3 additions & 3 deletions src/methods/get-transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ import { blockfrostAPI } from '../utils/blockfrost-api.js';
import { transformTransactionData } from '../utils/transaction.js';
import { logger } from '../utils/logger.js';

export default async (id: number, txId: string): Promise<string> => {
export default async (msgId: number, clientId: string, txId: string): Promise<string> => {
try {
const tx = await blockfrostAPI.txs(txId);
const message = prepareMessage(id, await transformTransactionData(tx));
const message = prepareMessage(msgId, clientId, await transformTransactionData(tx));

return message;
} catch (error) {
logger.error(error);
const message = prepareErrorMessage(id, error);
const message = prepareErrorMessage(msgId, clientId, error);

return message;
}
Expand Down
14 changes: 9 additions & 5 deletions src/methods/push-transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ import { prepareErrorMessage, prepareMessage } from '../utils/message.js';
import { txClient } from '../utils/blockfrost-api.js';
import { logger } from '../utils/logger.js';

export default async (id: number, transaction: Uint8Array | string): Promise<string> => {
export default async (
id: number,
clientId: string,
transaction: Uint8Array | string,
): Promise<string> => {
try {
const submitTransactionResult = await txClient.txSubmit(transaction);
const message = prepareMessage(id, submitTransactionResult);
const message = prepareMessage(id, clientId, submitTransactionResult);

return message;
} catch (error) {
if (error instanceof BlockfrostClientError && error.code === 'ETIMEDOUT') {
// Request timed out. Most likely mempool is full since that's the only reason why submit api should get stuck
const errorMessage = 'Mempool is full, please try resubmitting again later.';
const message = prepareErrorMessage(id, errorMessage);
const message = prepareErrorMessage(id, clientId, errorMessage);

logger.error(error);
return message;
Expand All @@ -24,7 +28,7 @@ export default async (id: number, transaction: Uint8Array | string): Promise<str
message: error.body ?? error.message,
body: undefined,
};
const message = prepareErrorMessage(id, formattedError);
const message = prepareErrorMessage(id, clientId, formattedError);

logger.error(
new BlockfrostServerError({
Expand All @@ -39,7 +43,7 @@ export default async (id: number, transaction: Uint8Array | string): Promise<str
return message;
} else {
logger.error(error);
const message = prepareErrorMessage(id, error);
const message = prepareErrorMessage(id, clientId, error);

return message;
}
Expand Down
Loading
Loading