Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(WebSocketManager): use /ws package internally #9099

Merged
merged 20 commits into from
May 1, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat(WebSocketManager): use /ws package internally
Qjuh committed Apr 25, 2023
commit 1d63ba9b570182d9213fed05bef5ca75458acac1
1 change: 1 addition & 0 deletions packages/discord.js/package.json
Original file line number Diff line number Diff line change
@@ -55,6 +55,7 @@
"@discordjs/formatters": "workspace:^",
"@discordjs/rest": "workspace:^",
"@discordjs/util": "workspace:^",
"@discordjs/ws": "workspace:^",
"@sapphire/snowflake": "^3.4.2",
"@types/ws": "^8.5.4",
"discord-api-types": "^0.37.40",
279 changes: 105 additions & 174 deletions packages/discord.js/src/client/websocket/WebSocketManager.js
Original file line number Diff line number Diff line change
@@ -2,9 +2,13 @@

const EventEmitter = require('node:events');
const { setImmediate } = require('node:timers');
const { setTimeout: sleep } = require('node:timers/promises');
const { Collection } = require('@discordjs/collection');
const { GatewayCloseCodes, GatewayDispatchEvents, Routes } = require('discord-api-types/v10');
const {
WebSocketManager: WSWebSocketManager,
WebSocketShardEvents: WSWebSocketShardEvents,
CloseCodes,
} = require('@discordjs/ws');
const { GatewayCloseCodes, GatewayDispatchEvents } = require('discord-api-types/v10');
const WebSocketShard = require('./WebSocketShard');
const PacketHandlers = require('./handlers');
const { DiscordjsError, ErrorCodes } = require('../../errors');
@@ -22,16 +26,6 @@ const BeforeReadyWhitelist = [
GatewayDispatchEvents.GuildMemberRemove,
];

const unrecoverableErrorCodeMap = {
[GatewayCloseCodes.AuthenticationFailed]: ErrorCodes.TokenInvalid,
[GatewayCloseCodes.InvalidShard]: ErrorCodes.ShardingInvalid,
[GatewayCloseCodes.ShardingRequired]: ErrorCodes.ShardingRequired,
[GatewayCloseCodes.InvalidIntents]: ErrorCodes.InvalidIntents,
[GatewayCloseCodes.DisallowedIntents]: ErrorCodes.DisallowedIntents,
};

const UNRESUMABLE_CLOSE_CODES = [1000, GatewayCloseCodes.AlreadyAuthenticated, GatewayCloseCodes.InvalidSeq];
Qjuh marked this conversation as resolved.
Show resolved Hide resolved

/**
* The WebSocket manager for this client.
* <info>This class forwards raw dispatch events,
@@ -56,27 +50,12 @@ class WebSocketManager extends EventEmitter {
*/
this.gateway = null;

/**
* The amount of shards this manager handles
* @private
* @type {number}
*/
this.totalShards = this.client.options.shards.length;

/**
* A collection of all shards this manager handles
* @type {Collection<number, WebSocketShard>}
*/
this.shards = new Collection();

/**
* An array of shards to be connected or that need to reconnect
* @type {Set<WebSocketShard>}
* @private
* @name WebSocketManager#shardQueue
*/
Object.defineProperty(this, 'shardQueue', { value: new Set(), writable: true });

/**
* An array of queued events before this WebSocketManager became ready
* @type {Object[]}
@@ -98,12 +77,7 @@ class WebSocketManager extends EventEmitter {
*/
this.destroyed = false;

/**
* If this manager is currently reconnecting one or multiple shards
* @type {boolean}
* @private
*/
this.reconnecting = false;
this._ws = null;
}

/**
@@ -119,11 +93,11 @@ class WebSocketManager extends EventEmitter {
/**
* Emits a debug message.
* @param {string} message The debug message
* @param {?WebSocketShard} [shard] The shard that emitted this message, if any
* @param {?number} [shardId] The id of the shard that emitted this message, if any
* @private
*/
debug(message, shard) {
this.client.emit(Events.Debug, `[WS => ${shard ? `Shard ${shard.id}` : 'Manager'}] ${message}`);
debug(message, shardId) {
this.client.emit(Events.Debug, `[WS => ${shardId ? `Shard ${shardId}` : 'Manager'}] ${message}`);
Qjuh marked this conversation as resolved.
Show resolved Hide resolved
}

/**
@@ -132,11 +106,30 @@ class WebSocketManager extends EventEmitter {
*/
async connect() {
const invalidToken = new DiscordjsError(ErrorCodes.TokenInvalid);
const { shards, shardCount, intents, ws } = this.client.options;
if (this._ws && this._ws.options.token !== this.client.token) {
await this._ws.destroy({ code: 1000, reason: 'Login with differing token requested' });
Qjuh marked this conversation as resolved.
Show resolved Hide resolved
this._ws = null;
}
if (!this._ws) {
this._ws = new WSWebSocketManager({
intents: intents.bitfield,
rest: this.client.rest,
token: this.client.token,
largeThreshold: ws.large_threshold,
version: ws.version,
shardIds: shards === 'auto' ? null : shards,
shardCount: shards === 'auto' ? null : shardCount,
initialPresence: ws.presence,
});
this.attachEvents();
}

const {
url: gatewayURL,
shards: recommendedShards,
session_start_limit: sessionStartLimit,
} = await this.client.rest.get(Routes.gatewayBot()).catch(error => {
} = await this._ws.fetchGatewayInformation().catch(error => {
throw error.status === 401 ? invalidToken : error;
});

@@ -152,156 +145,95 @@ class WebSocketManager extends EventEmitter {

this.gateway = `${gatewayURL}/`;

let { shards } = this.client.options;

if (shards === 'auto') {
this.debug(`Using the recommended shard count provided by Discord: ${recommendedShards}`);
this.totalShards = this.client.options.shardCount = recommendedShards;
shards = this.client.options.shards = Array.from({ length: recommendedShards }, (_, i) => i);
}

this.totalShards = shards.length;
this.debug(`Spawning shards: ${shards.join(', ')}`);
this.shardQueue = new Set(shards.map(id => new WebSocketShard(this, id)));

return this.createShards();
}

/**
* Handles the creation of a shard.
* @returns {Promise<boolean>}
* @private
*/
async createShards() {
// If we don't have any shards to handle, return
if (!this.shardQueue.size) return false;

const [shard] = this.shardQueue;

this.shardQueue.delete(shard);
await this._ws.connect();

if (!shard.eventsAttached) {
shard.on(WebSocketShardEvents.AllReady, unavailableGuilds => {
/**
* Emitted when a shard turns ready.
* @event Client#shardReady
* @param {number} id The shard id that turned ready
* @param {?Set<Snowflake>} unavailableGuilds Set of unavailable guild ids, if any
*/
this.client.emit(Events.ShardReady, shard.id, unavailableGuilds);

if (!this.shardQueue.size) this.reconnecting = false;
this.checkShardsReady();
});
this.totalShards = this.client.options.shardCount = await this._ws.getShardCount();
this.client.options.shards = await this._ws.getShardIds();
for (const id of this.client.options.shards) {
if (!this.shards.has(id)) {
const shard = new WebSocketShard(this, id);
this.shards.set(id, shard);

shard.on(WebSocketShardEvents.Close, event => {
if (event.code === 1_000 ? this.destroyed : event.code in unrecoverableErrorCodeMap) {
shard.on(WebSocketShardEvents.AllReady, unavailableGuilds => {
/**
* Emitted when a shard's WebSocket disconnects and will no longer reconnect.
* @event Client#shardDisconnect
* @param {CloseEvent} event The WebSocket close event
* @param {number} id The shard id that disconnected
* Emitted when a shard turns ready.
* @event Client#shardReady
* @param {number} id The shard id that turned ready
* @param {?Set<Snowflake>} unavailableGuilds Set of unavailable guild ids, if any
*/
this.client.emit(Events.ShardDisconnect, event, shard.id);
this.debug(GatewayCloseCodes[event.code], shard);
return;
}
this.client.emit(Events.ShardReady, shard.id, unavailableGuilds);

if (UNRESUMABLE_CLOSE_CODES.includes(event.code)) {
// These event codes cannot be resumed
shard.sessionId = null;
}

/**
* Emitted when a shard is attempting to reconnect or re-identify.
* @event Client#shardReconnecting
* @param {number} id The shard id that is attempting to reconnect
*/
this.client.emit(Events.ShardReconnecting, shard.id);

this.shardQueue.add(shard);

if (shard.sessionId) this.debug(`Session id is present, attempting an immediate reconnect...`, shard);
this.reconnect();
});

shard.on(WebSocketShardEvents.InvalidSession, () => {
this.client.emit(Events.ShardReconnecting, shard.id);
});

shard.on(WebSocketShardEvents.Destroyed, () => {
this.debug('Shard was destroyed but no WebSocket connection was present! Reconnecting...', shard);

this.client.emit(Events.ShardReconnecting, shard.id);

this.shardQueue.add(shard);
this.reconnect();
});

shard.eventsAttached = true;
}

this.shards.set(shard.id, shard);

try {
await shard.connect();
} catch (error) {
if (error?.code && error.code in unrecoverableErrorCodeMap) {
throw new DiscordjsError(unrecoverableErrorCodeMap[error.code]);
// Undefined if session is invalid, error event for regular closes
} else if (!error || error.code) {
this.debug('Failed to connect to the gateway, requeueing...', shard);
this.shardQueue.add(shard);
} else {
throw error;
this.checkShardsReady();
});
}
}
// If we have more shards, add a 5s delay
if (this.shardQueue.size) {
this.debug(`Shard Queue Size: ${this.shardQueue.size}; continuing in 5 seconds...`);
await sleep(5_000);
return this.createShards();
}

return true;
}

/**
* Handles reconnects for this manager.
* Attaches event handlers to the internal WebSocketShardManager from `@discordjs/ws`.
* @private
* @returns {Promise<boolean>}
*/
async reconnect() {
if (this.reconnecting || this.status !== Status.Ready) return false;
this.reconnecting = true;
try {
await this.createShards();
} catch (error) {
this.debug(`Couldn't reconnect or fetch information about the gateway. ${error}`);
if (error.httpStatus !== 401) {
this.debug(`Possible network error occurred. Retrying in 5s...`);
await sleep(5_000);
this.reconnecting = false;
return this.reconnect();
attachEvents() {
this._ws.on(WSWebSocketShardEvents.Debug, ({ message, shardId }) => this.debug(message, shardId));
this._ws.on(WSWebSocketShardEvents.Dispatch, ({ packet, shardId }) => {
this.client.emit(Events.Raw, packet, shardId);
const shard = this.shards.get(shardId);
this.handlePacket(packet, shard);
if (shard.status === Status.WaitingForGuilds && packet.t === GatewayDispatchEvents.GuildCreate) {
shard.onGuildPacket(packet);
}
// If we get an error at this point, it means we cannot reconnect anymore
if (this.client.listenerCount(Events.Invalidated)) {
});

this._ws.on(WSWebSocketShardEvents.Ready, ({ data, shardId }) => {
this.shards.get(shardId).onReadyPacket(data);
});

this._ws.on(WSWebSocketShardEvents.Closed, ({ code, reason = '', shardId }) => {
this.shards.get(shardId).status = code === CloseCodes.Resuming ? Status.Resuming : Status.Disconnected;
if (code === CloseCodes.Normal && this.destroyed) {
/**
* Emitted when the client's session becomes invalidated.
* You are expected to handle closing the process gracefully and preventing a boot loop
* if you are listening to this event.
* @event Client#invalidated
* Emitted when a shard's WebSocket disconnects and will no longer reconnect.
* @event Client#shardDisconnect
* @param {CloseEvent} event The WebSocket close event
* @param {number} id The shard id that disconnected
*/
this.client.emit(Events.Invalidated);
// Destroy just the shards. This means you have to handle the cleanup yourself
this.destroy();
} else {
this.client.destroy();
this.client.emit(Events.ShardDisconnect, { code, reason, wasClean: true }, shardId);
Qjuh marked this conversation as resolved.
Show resolved Hide resolved
this.debug(GatewayCloseCodes[code], shardId);
return;
}
} finally {
this.reconnecting = false;
}
return true;

/**
* Emitted when a shard is attempting to reconnect or re-identify.
* @event Client#shardReconnecting
* @param {number} id The shard id that is attempting to reconnect
*/
this.client.emit(Events.ShardReconnecting, shardId);
});

this._ws.on(WSWebSocketShardEvents.Resumed, ({ shardId }) => {
/**
* Emitted when the shard resumes successfully
* @event WebSocketShard#resumed
*/
this.shards.get(shardId).emit(WebSocketShardEvents.Resumed);
});

this._ws.on(WSWebSocketShardEvents.HeartbeatComplete, ({ heartbeatAt, latency, shardId }) => {
const shard = this.shards.get(shardId);
shard.lastPingTimestamp = heartbeatAt;
shard.ping = latency;
});

// TODO: refactor once error event gets exposed publicly
this._ws.on('error', ({ err, shardId }) => {
/**
* Emitted whenever a shard's WebSocket encounters a connection error.
* @event Client#shardError
* @param {Error} error The encountered error
* @param {number} shardId The shard that encountered this error
*/
this.client.emit(Events.ShardError, err, shardId);
});
}

/**
@@ -310,7 +242,7 @@ class WebSocketManager extends EventEmitter {
* @private
*/
broadcast(packet) {
for (const shard of this.shards.values()) shard.send(packet);
for (const shardId of this.shards.keys()) this._ws.send(shardId, packet);
}

/**
@@ -322,8 +254,7 @@ class WebSocketManager extends EventEmitter {
// TODO: Make a util for getting a stack
this.debug(`Manager was destroyed. Called by:\n${new Error().stack}`);
this.destroyed = true;
this.shardQueue.clear();
for (const shard of this.shards.values()) shard.destroy({ closeCode: 1_000, reset: true, emit: false, log: false });
this._ws.destroy({ code: 1_000 });
Qjuh marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Loading