Skip to content

Commit

Permalink
Merge branch 'feat/limits' into 'dev'
Browse files Browse the repository at this point in the history
feat: limits

Closes #81

See merge request ergo/rosen-bridge/rosenet!40
  • Loading branch information
vorujack committed Oct 5, 2024
2 parents 61043f7 + 2d87139 commit 7ddde54
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 29 deletions.
5 changes: 5 additions & 0 deletions .changeset/afraid-lizards-argue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@rosen-bridge/rosenet-node': minor
---

Add pubsub limits
5 changes: 5 additions & 0 deletions .changeset/strange-months-hear.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@rosen-bridge/rosenet-node': minor
---

Add timeout for handling incoming direct messages
21 changes: 13 additions & 8 deletions packages/rosenet-node/lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ export const RELAY_DISCOVERY_RESTART_INTERVAL = 10_000;
export const ROSENET_DIRECT_PROTOCOL_V1 = '/rosenet/direct/1';
export const DEFAULT_NODE_PORT = 55123;
export const ACK_BYTE = 1;
export const MESSAGE_ROUNDTRIP_TIMEOUT = 1000;
export const MESSAGE_ROUNDTRIP_TIMEOUT = 5000;
export const MESSAGE_HANDLING_TIMEOUT = 2000;
export const MESSAGE_RETRY_ATTEMPTS = 5;
export const MESSAGE_RETRY_INITIAL_DELAY = 2000;
export const MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT = 1000;
export const MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE = 2000;
export const MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER = 100;
export const MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER = 200;
export const MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT = 1000;
export const MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE = 2000;
export const ROSENET_DIRECT_STREAM_CREATION_TIMEOUT = 500;
export const MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT = 100;
export const MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE = 200;
export const MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER = 10;
export const MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER = 20;
export const MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT = 200;
export const MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE = 400;
export const MAX_INBOUND_PUBSUB_THROUGHPUT = 100;
export const MAX_INBOUND_PUBSUB_QUEUE_SIZE = 200;
export const MAX_OUTBOUND_PUBSUB_THROUGHPUT = 200;
export const MAX_OUTBOUND_PUBSUB_QUEUE_SIZE = 400;
export const ROSENET_DIRECT_STREAM_CREATION_TIMEOUT = 2000;
43 changes: 26 additions & 17 deletions packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import {
BulkheadPolicy,
BulkheadRejectedError,
wrap,
timeout,
TimeoutStrategy,
} from 'cockatiel';
import { pipe } from 'it-pipe';

Expand All @@ -19,6 +21,7 @@ import {
MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT,
MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER,
ROSENET_DIRECT_PROTOCOL_V1,
MESSAGE_HANDLING_TIMEOUT,
} from '../constants';

const messageHandlingBulkhead = bulkhead(
Expand Down Expand Up @@ -61,26 +64,32 @@ const handleIncomingMessageFactory =
try {
await wrappedPolicy.execute(async () => {
try {
await pipe(
stream,
decode,
async function* (source) {
for await (const message of source) {
RoseNetNodeContext.logger.debug(
'message received, calling handler and sending ack',
{
message,
},
);
handler(connection.remotePeer.toString(), message);
yield Uint8Array.of(ACK_BYTE);
}
},
stream,
const messageHandlingTimeout = timeout(
MESSAGE_HANDLING_TIMEOUT,
TimeoutStrategy.Aggressive,
);
await messageHandlingTimeout.execute(() =>
pipe(
stream,
decode,
async function* (source) {
for await (const message of source) {
RoseNetNodeContext.logger.debug(
'message received, calling handler and sending ack',
{
message,
},
);
handler(connection.remotePeer.toString(), message);
yield Uint8Array.of(ACK_BYTE);
}
},
stream,
),
);
} catch (error) {
RoseNetNodeContext.logger.warn(
'An error occurred while reading from stream',
'An error occurred while handling incoming message',
{
error,
},
Expand Down
27 changes: 26 additions & 1 deletion packages/rosenet-node/lib/rosenet-pubsub/publish.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,39 @@
import { Libp2p, PubSub } from '@libp2p/interface';
import { bulkhead, isBulkheadRejectedError } from 'cockatiel';

import RoseNetNodeContext from '../context/RoseNetNodeContext';

import {
MAX_OUTBOUND_PUBSUB_QUEUE_SIZE,
MAX_OUTBOUND_PUBSUB_THROUGHPUT,
} from '../constants';

const textEncoder = new TextEncoder();

const bulkheadPolicy = bulkhead(
MAX_OUTBOUND_PUBSUB_THROUGHPUT,
MAX_OUTBOUND_PUBSUB_QUEUE_SIZE,
);

/**
* factory for libp2p publish
*/
const publishFactory =
(node: Libp2p<{ pubsub: PubSub }>) =>
async (topic: string, message: string) => {
node.services.pubsub.publish(topic, textEncoder.encode(message));
try {
await bulkheadPolicy.execute(() =>
node.services.pubsub.publish(topic, textEncoder.encode(message)),
);
} catch (error) {
if (isBulkheadRejectedError(error)) {
RoseNetNodeContext.logger.warn('Maximum publish threshold reached');
} else {
RoseNetNodeContext.logger.warn('Message publish failed', {
message,
});
}
}
};

export default publishFactory;
27 changes: 24 additions & 3 deletions packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,38 @@
import { Libp2p, PubSub } from '@libp2p/interface';
import { bulkhead } from 'cockatiel';

import RoseNetNodeContext from '../context/RoseNetNodeContext';

import {
MAX_INBOUND_PUBSUB_QUEUE_SIZE,
MAX_INBOUND_PUBSUB_THROUGHPUT,
} from '../constants';

const textDecoder = new TextDecoder();

const bulkheadPolicy = bulkhead(
MAX_INBOUND_PUBSUB_THROUGHPUT,
MAX_INBOUND_PUBSUB_QUEUE_SIZE,
);

/**
* factory for libp2p subscribe
*/
const subscribeFactory =
(node: Libp2p<{ pubsub: PubSub }>) =>
async (topic: string, handler: (message: string) => void) => {
node.services.pubsub.subscribe(topic);
node.services.pubsub.addEventListener('message', (event) => {
if (event.detail.topic === topic) {
handler(textDecoder.decode(event.detail.data));
node.services.pubsub.addEventListener('message', async (event) => {
try {
await bulkheadPolicy.execute(() => {
if (event.detail.topic === topic) {
handler(textDecoder.decode(event.detail.data));
}
});
} catch {
RoseNetNodeContext.logger.warn(
'Maximum pubsub message handling threshold reached',
);
}
});
};
Expand Down

0 comments on commit 7ddde54

Please sign in to comment.