Skip to content

Commit

Permalink
fix(WebSocketShard): proper error bubbling (#9119)
Browse files Browse the repository at this point in the history
* fix(WebSocketShard): proper error bubbling

* fix(WebSocketShard): proper success signaling from waitForEvent

* refactor(waitForEvent): better error bubbling behavior

* fix(WebSocketShard): still allow the first connect call to reject

* fix(WebSocketShard): handle potential once error in #send

* refactor(WebSocketShard): waitForEvent & bubbleWaitForEventError

* refactor: success signaling

* chore: bump async EE to allow overwriting the error event
  • Loading branch information
didinele authored Feb 19, 2023
1 parent 7f2ef96 commit 9681f34
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 22 deletions.
2 changes: 1 addition & 1 deletion packages/brokers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
"homepage": "https://discord.js.org",
"dependencies": {
"@msgpack/msgpack": "^2.8.0",
"@vladfrangu/async_event_emitter": "^2.1.3",
"@vladfrangu/async_event_emitter": "^2.1.4",
"ioredis": "^5.2.4"
},
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"@discordjs/util": "workspace:^",
"@discordjs/ws": "workspace:^",
"@sapphire/snowflake": "^3.4.0",
"@vladfrangu/async_event_emitter": "^2.1.3",
"@vladfrangu/async_event_emitter": "^2.1.4",
"discord-api-types": "^0.37.35"
},
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion packages/ws/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
"@discordjs/util": "workspace:^",
"@sapphire/async-queue": "^1.5.0",
"@types/ws": "^8.5.4",
"@vladfrangu/async_event_emitter": "^2.1.3",
"@vladfrangu/async_event_emitter": "^2.1.4",
"discord-api-types": "^0.37.35",
"tslib": "^2.4.1",
"ws": "^8.12.0"
Expand Down
90 changes: 78 additions & 12 deletions packages/ws/src/ws/WebSocketShard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export enum WebSocketShardEvents {
Closed = 'closed',
Debug = 'debug',
Dispatch = 'dispatch',
Error = 'error',
HeartbeatComplete = 'heartbeat',
Hello = 'hello',
Ready = 'ready',
Expand All @@ -56,6 +57,7 @@ export type WebSocketShardEventsMap = {
[WebSocketShardEvents.Closed]: [{ code: number }];
[WebSocketShardEvents.Debug]: [payload: { message: string }];
[WebSocketShardEvents.Dispatch]: [payload: { data: GatewayDispatchPayload }];
[WebSocketShardEvents.Error]: [payload: { error: Error }];
[WebSocketShardEvents.Hello]: [];
[WebSocketShardEvents.Ready]: [payload: { data: GatewayReadyDispatchData }];
[WebSocketShardEvents.Resumed]: [];
Expand Down Expand Up @@ -99,6 +101,9 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {

private session: SessionInfo | null = null;

// Indicates whether the shard has already resolved its original connect() call
private initialConnectResolved = false;

private readonly sendQueue = new AsyncQueue();

private readonly timeouts = new Collection<WebSocketShardEvents, NodeJS.Timeout>();
Expand Down Expand Up @@ -158,14 +163,21 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {

this.sendRateLimitState = getInitialSendRateLimitState();

await this.waitForEvent(WebSocketShardEvents.Hello, this.strategy.options.helloTimeout);
const { ok } = await this.bubbleWaitForEventError(
this.waitForEvent(WebSocketShardEvents.Ready, this.strategy.options.readyTimeout),
);
if (!ok) {
return;
}

if (session?.shardCount === this.strategy.options.shardCount) {
this.session = session;
await this.resume(session);
} else {
await this.identify();
}

this.initialConnectResolved = true;
}

public async destroy(options: WebSocketShardDestroyOptions = {}) {
Expand Down Expand Up @@ -234,18 +246,59 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
}
}

private async waitForEvent(event: WebSocketShardEvents, timeoutDuration?: number | null) {
this.debug([`Waiting for event ${event} for ${timeoutDuration ? `${timeoutDuration}ms` : 'indefinitely'}`]);
private async waitForEvent(event: WebSocketShardEvents, timeoutDuration?: number | null): Promise<void> {
this.debug([`Waiting for event ${event} ${timeoutDuration ? `for ${timeoutDuration}ms` : 'indefinitely'}`]);
const controller = new AbortController();
const timeout = timeoutDuration ? setTimeout(() => controller.abort(), timeoutDuration).unref() : null;
if (timeout) {
this.timeouts.set(event, timeout);
}

await once(this, event, { signal: controller.signal });
if (timeout) {
clearTimeout(timeout);
this.timeouts.delete(event);
await once(this, event, { signal: controller.signal }).finally(() => {
if (timeout) {
clearTimeout(timeout);
this.timeouts.delete(event);
}
});
}

/**
* Does special error handling for waitForEvent calls, depending on the current state of the connection lifecycle
* (i.e. whether or not the original connect() call has resolved or if the user has an error listener)
*/
private async bubbleWaitForEventError(
promise: Promise<unknown>,
): Promise<{ error: unknown; ok: false } | { ok: true }> {
try {
await promise;
return { ok: true };
} catch (error) {
// Any error that isn't an abort error would have been caused by us emitting an error event in the first place
// See https://nodejs.org/api/events.html#eventsonceemitter-name-options for `once()` behavior
if (error instanceof Error && error.name === 'AbortError') {
this.emit(WebSocketShardEvents.Error, { error });
}

// As stated previously, any other error would have been caused by us emitting the error event, which looks
// like { error: unknown }
// eslint-disable-next-line no-ex-assign
error = (error as { error: unknown }).error;

// If the user has no handling on their end (error event) simply throw.
// We also want to throw if we're still in the initial `connect()` call, since that's the only time
// the user can catch the error "normally"
if (this.listenerCount(WebSocketShardEvents.Error) === 0 || !this.initialConnectResolved) {
throw error;
}

// If the error is handled, we can just try to reconnect
await this.destroy({
code: CloseCodes.Normal,
reason: 'Something timed out',
recover: WebSocketShardDestroyRecovery.Reconnect,
});

return { ok: false, error };
}
}

Expand All @@ -256,7 +309,12 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {

if (this.#status !== WebSocketShardStatus.Ready && !ImportantGatewayOpcodes.has(payload.op)) {
this.debug(['Tried to send a non-crucial payload before the shard was ready, waiting']);
await once(this, WebSocketShardEvents.Ready);
// This will throw if the shard throws an error event in the meantime, just requeue the payload
try {
await once(this, WebSocketShardEvents.Ready);
} catch {
return this.send(payload);
}
}

await this.sendQueue.wait();
Expand Down Expand Up @@ -325,7 +383,13 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
d,
});

await this.waitForEvent(WebSocketShardEvents.Ready, this.strategy.options.readyTimeout);
const { ok } = await this.bubbleWaitForEventError(
this.waitForEvent(WebSocketShardEvents.Ready, this.strategy.options.readyTimeout),
);
if (!ok) {
return;
}

this.#status = WebSocketShardStatus.Ready;
}

Expand Down Expand Up @@ -393,7 +457,9 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
this.inflate.push(Buffer.from(decompressable), flush ? zlib.Z_SYNC_FLUSH : zlib.Z_NO_FLUSH);

if (this.inflate.err) {
this.emit('error', `${this.inflate.err}${this.inflate.msg ? `: ${this.inflate.msg}` : ''}`);
this.emit(WebSocketShardEvents.Error, {
error: new Error(`${this.inflate.err}${this.inflate.msg ? `: ${this.inflate.msg}` : ''}`),
});
}

if (!flush) {
Expand Down Expand Up @@ -521,8 +587,8 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
}
}

private onError(err: Error) {
this.emit('error', err);
private onError(error: Error) {
this.emit(WebSocketShardEvents.Error, { error });
}

private async onClose(code: number) {
Expand Down
14 changes: 7 additions & 7 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2039,7 +2039,7 @@ __metadata:
"@msgpack/msgpack": ^2.8.0
"@types/node": 16.18.11
"@vitest/coverage-c8": ^0.27.1
"@vladfrangu/async_event_emitter": ^2.1.3
"@vladfrangu/async_event_emitter": ^2.1.4
cross-env: ^7.0.3
eslint: ^8.31.0
eslint-config-neon: ^0.1.40
Expand Down Expand Up @@ -2112,7 +2112,7 @@ __metadata:
"@sapphire/snowflake": ^3.4.0
"@types/node": 16.18.11
"@vitest/coverage-c8": ^0.27.1
"@vladfrangu/async_event_emitter": ^2.1.3
"@vladfrangu/async_event_emitter": ^2.1.4
cross-env: ^7.0.3
discord-api-types: ^0.37.35
eslint: ^8.31.0
Expand Down Expand Up @@ -2516,7 +2516,7 @@ __metadata:
"@types/node": 16.18.11
"@types/ws": ^8.5.4
"@vitest/coverage-c8": ^0.27.1
"@vladfrangu/async_event_emitter": ^2.1.3
"@vladfrangu/async_event_emitter": ^2.1.4
cross-env: ^7.0.3
discord-api-types: ^0.37.35
esbuild-plugin-version-injector: ^1.0.2
Expand Down Expand Up @@ -5346,10 +5346,10 @@ __metadata:
languageName: node
linkType: hard

"@vladfrangu/async_event_emitter@npm:^2.1.3":
version: 2.1.3
resolution: "@vladfrangu/async_event_emitter@npm:2.1.3"
checksum: 1541b281550b39446f86ea9d4622be0d74c4d3924b42550db11164b409a82010f396b588a87ffe27f72a96a7f92af0190f4c3b57861249a4038515e0d474b3c6
"@vladfrangu/async_event_emitter@npm:^2.1.4":
version: 2.1.4
resolution: "@vladfrangu/async_event_emitter@npm:2.1.4"
checksum: 604d228a4fa46c0686d4377c2ca63035aa266382133f351f098d85782df4e451ebba2c528a7d54aa955c7fdb824a642a7ec63d5a85cf46f6cbaea46ea56a0959
languageName: node
linkType: hard

Expand Down

2 comments on commit 9681f34

@vercel
Copy link

@vercel vercel bot commented on 9681f34 Feb 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vercel
Copy link

@vercel vercel bot commented on 9681f34 Feb 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.