Skip to content
This repository has been archived by the owner on Jan 18, 2025. It is now read-only.

Commit

Permalink
implement resume_gateway_url
Browse files Browse the repository at this point in the history
Signed-off-by: Ruairi <[email protected]>
  • Loading branch information
splatterxl committed Jun 20, 2023
1 parent d920e9d commit f3b6ec1
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 11 deletions.
4 changes: 0 additions & 4 deletions packages/fuwa/src/ws/DispatchHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ export async function handleDispatch(

break;
}
case GatewayDispatchEvents.Resumed: {
shard.emit('resumed');
break;
}
//#endregion

//#region Guilds
Expand Down
2 changes: 1 addition & 1 deletion packages/ws/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@fuwa/ws",
"version": "0.3.0",
"version": "0.4.0",
"description": "Minimal WebSocket client for Discord's real-time gateway",
"main": "dist/index.js",
"types": "typings/index.d.ts",
Expand Down
19 changes: 13 additions & 6 deletions packages/ws/src/GatewayManager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { REST } from '@fuwa/rest';
import { APIGatewayBotInfo, GatewayCloseCodes } from 'discord-api-types/v10';
import {
APIGatewayBotInfo,
GatewayCloseCodes,
GatewayDispatchPayload,
} from 'discord-api-types/v10';
import EventEmitter from 'node:events';
import { setTimeout as sleep } from 'node:timers/promises';
import { GatewayShard, ShardState } from './GatewayShard.js';
Expand Down Expand Up @@ -240,13 +244,14 @@ export class GatewayManager extends EventEmitter {
*
* @returns Whether the shard was successfully respawned.
*/
public async respawn(id: number) {
public async respawn(id: number, url?: string) {
this.event('shardRespawn', id);

return this.spawn({
id: id,
shards: 1,
id,
count: this.count,
url,
})
.then(() => true)
.catch(() => false);
Expand Down Expand Up @@ -279,10 +284,11 @@ export class GatewayManager extends EventEmitter {

/** @internal */
private _registerListeners(shard: GatewayShard, runtime: boolean) {
if (runtime)
if (runtime) {
shard.on('_refresh', () => {
this.respawn(shard.id);
});
}
shard
.on('_throw', e => {
throw new Error(`Shard ${shard.id}: ${e}`);
Expand All @@ -293,8 +299,9 @@ export class GatewayManager extends EventEmitter {
this.emit(p.t, p.d, shard);
}
})
.on('dispatch', d => {
this.emit('dispatch', d, shard);
.on('dispatch', (payload: GatewayDispatchPayload) => {
this.emit('dispatch', payload, shard);
this.emit(payload.t, payload.d);
})
.on('ready', () => {
this.event('shardReady', shard);
Expand Down
23 changes: 23 additions & 0 deletions packages/ws/src/GatewayShard.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { AsyncQueue } from '@sapphire/async-queue';
import {
GatewayDispatchEvents,
GatewayIdentify,
GatewayOpcodes,
GatewayReceivePayload,
Expand Down Expand Up @@ -208,6 +209,26 @@ export class GatewayShard extends EventEmitter {
case GatewayOpcodes.Dispatch: {
this.emit('dispatch', data);

switch (data.t) {
case GatewayDispatchEvents.Ready: {
this.session = data.d.session_id;
this._awaitedGuilds = data.d.guilds.map(g => g.id);

this.url = data.d.resume_gateway_url;

break;
}
case GatewayDispatchEvents.Resumed: {
this.emit('resume');
this.emit('ready');

break;
}
default: {
// this is handled by the client
}
}

break;
}
}
Expand All @@ -224,6 +245,8 @@ export class GatewayShard extends EventEmitter {

this.state = ShardState.Available;
this.emit('ready');

return true;
}

/**
Expand Down
176 changes: 176 additions & 0 deletions packages/ws/src/ShardingManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import { consumeJSON, resolveRequest, RESTClient } from '@fuwa/rest';
import { ChildProcess, fork } from 'child_process';
import { APIGatewayBotInfo, Routes } from 'discord-api-types/v10';
import { STATUS_CODES } from 'http';
import { isMainThread, Worker } from 'worker_threads';

export class ShardingManager {
gatewayInfo!: APIGatewayBotInfo;
client: RESTClient;
workers: Map<number, ChildProcess | Worker> = new Map();

constructor(public token: string, public options: ShardingManagerOptions) {
this.client = new RESTClient(RESTClient.getDefaultOptions(token));
}

public async spawn() {
const token = this.token;

if (!token || typeof token !== 'string' || token.trim() == '')
throw new Error('Invalid token provided. Must be a non-empty string.');

this.gatewayInfo = await this.client
.execute(resolveRequest({ route: Routes.gatewayBot() }))
.then(res => {
if (res.statusCode !== 200) {
throw new Error(
`[ShardingManager] Failed to get gateway info: ${res.statusCode} ${
STATUS_CODES[res.statusCode]
}`,
);
}

return res;
})
.then(consumeJSON);

const { totalShards = this.gatewayInfo.shards } = this.options;

const shards = Array.isArray(this.options.shards)
? this.options.shards.length === 2
? range(this.options.shards[0], this.options.shards[1])
: this.options.shards
: this.options.shards === 'auto'
? range(this.options.increment ?? 0, totalShards - 1)
: typeof this.options.shards === 'number'
? range(this.options.increment ?? 0, this.options.shards - 1)
: null;

if (!shards) throw new Error('Invalid shards option');

const _ = [...new Set(shards)];

switch (this.options.mode) {
case 'process':
for (const i of _) {
console.debug(`[ShardingManager] Spawning process for shard ${i}`);
this.workers.set(i, this.spawnProcess(i));
}
break;
case 'worker':
for (const i of _) {
console.debug(`[ShardingManager] Spawning worker for shard ${i}`);
this.workers.set(i, this.spawnWorker(i));
}
break;
default:
throw new Error('Invalid mode option');
}

process.on('exit', () => {
console.log('[ShardingManager] Shutting down');
for (const [id, worker] of this.workers) {
if (worker instanceof ChildProcess) {
worker.kill();
} else {
worker.terminate();
}
console.log(`[ShardingManager] Killed worker ${id}`);
}
});
}

private spawnProcess(id: number) {
const worker = fork(this.options.file, this.options.shardArgs ?? [], {
env: {
...process.env,
__FUWA_SHARD_ID: id.toString(),
__FUWA_SHARD_COUNT: this.gatewayInfo.shards.toString(),
__FUWA_SHARDING_MANAGER: 'true',
},
});

worker.on('exit', code => {
if (code !== 0) {
console.error(
`[ShardingManager] Worker ${id} exited with code ${code}, respawning`,
);
this.respawn(id);
} else {
console.log(
`[ShardingManager] Worker ${id} exited with success code ${code}`,
);
}
});

return worker;
}

private spawnWorker(id: number) {
if (!isMainThread) throw new Error('Cannot spawn worker in worker thread');

const worker = new Worker(this.options.file, {
workerData: {
id,
count: this.gatewayInfo.shards,
__fuwa_sharding_manager: true,
},
});

worker.on('exit', code => {
if (code !== 0) {
console.error(
`[ShardingManager] Worker ${id} exited with code ${code}, respawning`,
);
this.respawn(id);
} else {
console.log(
`[ShardingManager] Worker ${id} exited with success code ${code}`,
);
}
});

return worker;
}

respawn(id: number) {
switch (this.options.mode) {
case 'process':
this.workers.set(id, this.spawnProcess(id));
break;
case 'worker':
this.workers.set(id, this.spawnWorker(id));
break;
}
}

handleMessage(d: any, id: number) {
console.log(`[ShardingManager] Received message from worker ${id}:`, d);

const buf = Buffer.from(d.toString());

switch (buf.at(0)) {
case 1:
this.respawn(id);
break;
}
}
}

export interface ShardingManagerOptions {
token: string;
shards: number | 'auto' | [number, number] | number[];
fetchInfo?: boolean;
totalShards?: number;
limitPerWorker?: number;
increment?: number;
mode: 'process' | 'worker' | 'range';
file: string;
respawn?: boolean;
autoSpawn?: boolean;
shardArgs?: string[];
}

function range(start: number, end: number = start) {
return Array.from(new Array(end - start + 1), (_, i) => i + start);
}

0 comments on commit f3b6ec1

Please sign in to comment.