diff --git a/lib/client.d.ts b/lib/client.d.ts new file mode 100644 index 00000000..bd8ed9e8 --- /dev/null +++ b/lib/client.d.ts @@ -0,0 +1,26 @@ +/** + * + * GraphQL over WebSocket Protocol + * + * Check out the `PROTOCOL.md` document for the transport specification. + * + */ +import { Sink, Disposable } from './types'; +import { SubscribePayload } from './message'; +/** Configuration used for the `create` client function. */ +export interface ClientOptions { + /** URL of the GraphQL server to connect. */ + url: string; + /** Optional parameters that the client specifies when establishing a connection with the server. */ + connectionParams?: Record | (() => Record); +} +export interface Client extends Disposable { + /** + * Subscribes through the WebSocket following the config parameters. It + * uses the `sink` to emit received data or errors. Returns a _cleanup_ + * function used for dropping the subscription and cleaning stuff up. + */ + subscribe(payload: SubscribePayload, sink: Sink): () => void; +} +/** Creates a disposable GQL subscriptions client. */ +export declare function createClient(options: ClientOptions): Client; diff --git a/lib/client.js b/lib/client.js new file mode 100644 index 00000000..1035a14c --- /dev/null +++ b/lib/client.js @@ -0,0 +1,239 @@ +"use strict"; +/** + * + * GraphQL over WebSocket Protocol + * + * Check out the `PROTOCOL.md` document for the transport specification. + * + */ +Object.defineProperty(exports, "__esModule", { value: true }); +exports.createClient = void 0; +const protocol_1 = require("./protocol"); +const message_1 = require("./message"); +const utils_1 = require("./utils"); +/** Creates a disposable GQL subscriptions client. */ +function createClient(options) { + const { url, connectionParams } = options; + // holds all currently subscribed sinks, will use this map + // to dispatch messages to the correct destination + const subscribedSinks = {}; + function errorAllSinks(err) { + Object.entries(subscribedSinks).forEach(([, sink]) => sink.error(err)); + } + function completeAllSinks() { + Object.entries(subscribedSinks).forEach(([, sink]) => sink.complete()); + } + // Lazily uses the socket singleton to establishes a connection described by the protocol. + let socket = null, connected = false, connecting = false; + async function connect() { + if (connected) { + return; + } + if (connecting) { + let waitedTimes = 0; + while (!connected) { + await new Promise((resolve) => setTimeout(resolve, 100)); + // 100ms * 50 = 5sec + if (waitedTimes >= 50) { + throw new Error('Waited 10 seconds but socket never connected'); + } + waitedTimes++; + } + // connected === true + return; + } + connected = false; + connecting = true; + return new Promise((resolve, reject) => { + let done = false; // used to avoid resolving/rejecting the promise multiple times + socket = new WebSocket(url, protocol_1.GRAPHQL_TRANSPORT_WS_PROTOCOL); + /** + * `onerror` handler is unnecessary because even if an error occurs, the `onclose` handler will be called + * + * From: https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_client_applications + * > If an error occurs while attempting to connect, first a simple event with the name error is sent to the + * > WebSocket object (thereby invoking its onerror handler), and then the CloseEvent is sent to the WebSocket + * > object (thereby invoking its onclose handler) to indicate the reason for the connection's closing. + */ + socket.onclose = ({ code, reason }) => { + const err = new Error(`Socket closed with event ${code}` + !reason ? '' : `: ${reason}`); + if (code === 1000 || code === 1001) { + // close event `1000: Normal Closure` is ok and so is `1001: Going Away` (maybe the server is restarting) + completeAllSinks(); + } + else { + // all other close events are considered erroneous + errorAllSinks(err); + } + if (!done) { + done = true; + connecting = false; + connected = false; // the connection is lost + socket = null; + reject(err); // we reject here bacause the close is not supposed to be called during the connect phase + } + }; + socket.onopen = () => { + try { + if (!socket) { + throw new Error('Opened a socket on nothing'); + } + socket.send(message_1.stringifyMessage({ + type: message_1.MessageType.ConnectionInit, + payload: typeof connectionParams === 'function' + ? connectionParams() + : connectionParams, + })); + } + catch (err) { + errorAllSinks(err); + if (!done) { + done = true; + connecting = false; + if (socket) { + socket.close(); + socket = null; + } + reject(err); + } + } + }; + function handleMessage({ data }) { + try { + if (!socket) { + throw new Error('Received a message on nothing'); + } + const message = message_1.parseMessage(data); + if (message.type !== message_1.MessageType.ConnectionAck) { + throw new Error(`First message cannot be of type ${message.type}`); + } + // message.type === MessageType.ConnectionAck + if (!done) { + done = true; + connecting = false; + connected = true; // only now is the connection ready + resolve(); + } + } + catch (err) { + errorAllSinks(err); + if (!done) { + done = true; + connecting = false; + if (socket) { + socket.close(); + socket = null; + } + reject(err); + } + } + finally { + if (socket) { + // this listener is not necessary anymore + socket.removeEventListener('message', handleMessage); + } + } + } + socket.addEventListener('message', handleMessage); + }); + } + return { + subscribe: (payload, sink) => { + const uuid = generateUUID(); + if (subscribedSinks[uuid]) { + sink.error(new Error(`Sink with ID ${uuid} already registered`)); + return utils_1.noop; + } + subscribedSinks[uuid] = sink; + function handleMessage({ data }) { + const message = message_1.parseMessage(data); + switch (message.type) { + case message_1.MessageType.Next: { + if (message.id === uuid) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + sink.next(message.payload); + } + break; + } + case message_1.MessageType.Error: { + if (message.id === uuid) { + sink.error(message.payload); + } + break; + } + case message_1.MessageType.Complete: { + if (message.id === uuid) { + sink.complete(); + } + break; + } + } + } + (async () => { + try { + await connect(); + if (!socket) { + throw new Error('Socket connected but empty'); + } + socket.addEventListener('message', handleMessage); + socket.send(message_1.stringifyMessage({ + id: uuid, + type: message_1.MessageType.Subscribe, + payload, + })); + } + catch (err) { + sink.error(err); + } + })(); + return () => { + if (socket) { + socket.send(message_1.stringifyMessage({ + id: uuid, + type: message_1.MessageType.Complete, + })); + socket.removeEventListener('message', handleMessage); + // equal to 1 because this sink is the last one. + // the deletion from the map happens afterwards + if (Object.entries(subscribedSinks).length === 1) { + socket.close(1000, 'Normal Closure'); + socket = null; + } + } + sink.complete(); + delete subscribedSinks[uuid]; + }; + }, + dispose: async () => { + // complete all sinks + // TODO-db-200817 complete or error? the sinks should be completed BEFORE the client gets disposed + completeAllSinks(); + // delete all sinks + Object.keys(subscribedSinks).forEach((uuid) => { + delete subscribedSinks[uuid]; + }); + // if there is an active socket, close it with a normal closure + if (socket && socket.readyState === WebSocket.OPEN) { + // TODO-db-200817 decide if `1001: Going Away` should be used instead + socket.close(1000, 'Normal Closure'); + socket = null; + } + }, + }; +} +exports.createClient = createClient; +/** Generates a new v4 UUID. Reference: https://stackoverflow.com/a/2117523/709884 */ +function generateUUID() { + if (!window.crypto) { + // fallback to Math.random when crypto is not available + return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) { + const r = (Math.random() * 16) | 0, v = c == 'x' ? r : (r & 0x3) | 0x8; + return v.toString(16); + }); + } + return '10000000-1000-4000-8000-100000000000'.replace(/[018]/g, (s) => { + const c = Number.parseInt(s, 10); + return (c ^ + (window.crypto.getRandomValues(new Uint8Array(1))[0] & (15 >> (c / 4)))).toString(16); + }); +} diff --git a/lib/index.d.ts b/lib/index.d.ts new file mode 100644 index 00000000..4f1cce44 --- /dev/null +++ b/lib/index.d.ts @@ -0,0 +1 @@ +export * from './client'; diff --git a/lib/index.js b/lib/index.js new file mode 100644 index 00000000..196aeee4 --- /dev/null +++ b/lib/index.js @@ -0,0 +1,13 @@ +"use strict"; +var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) { + if (k2 === undefined) k2 = k; + Object.defineProperty(o, k2, { enumerable: true, get: function() { return m[k]; } }); +}) : (function(o, m, k, k2) { + if (k2 === undefined) k2 = k; + o[k2] = m[k]; +})); +var __exportStar = (this && this.__exportStar) || function(m, exports) { + for (var p in m) if (p !== "default" && !exports.hasOwnProperty(p)) __createBinding(exports, m, p); +}; +Object.defineProperty(exports, "__esModule", { value: true }); +__exportStar(require("./client"), exports); diff --git a/lib/message.d.ts b/lib/message.d.ts new file mode 100644 index 00000000..7c9981c9 --- /dev/null +++ b/lib/message.d.ts @@ -0,0 +1,56 @@ +/** + * + * message + * + */ +import { GraphQLError, ExecutionResult, DocumentNode } from 'graphql'; +/** Types of messages allowed to be sent by the client/server over the WS protocol. */ +export declare enum MessageType { + ConnectionInit = "connection_init", + ConnectionAck = "connection_ack", + Subscribe = "subscribe", + Next = "next", + Error = "error", + Complete = "complete" +} +export interface ConnectionInitMessage { + readonly type: MessageType.ConnectionInit; + readonly payload?: Record; +} +export interface ConnectionAckMessage { + readonly type: MessageType.ConnectionAck; +} +export interface SubscribeMessage { + readonly id: string; + readonly type: MessageType.Subscribe; + readonly payload: SubscribePayload; +} +export interface SubscribePayload { + readonly operationName: string; + readonly query: string | DocumentNode; + readonly variables: Record; +} +export interface NextMessage { + readonly id: string; + readonly type: MessageType.Next; + readonly payload: ExecutionResult; +} +export interface ErrorMessage { + readonly id: string; + readonly type: MessageType.Error; + readonly payload: readonly GraphQLError[]; +} +export interface CompleteMessage { + readonly id: string; + readonly type: MessageType.Complete; +} +export declare type Message = T extends MessageType.ConnectionAck ? ConnectionAckMessage : T extends MessageType.ConnectionInit ? ConnectionInitMessage : T extends MessageType.Subscribe ? SubscribeMessage : T extends MessageType.Next ? NextMessage : T extends MessageType.Error ? ErrorMessage : T extends MessageType.Complete ? CompleteMessage : never; +/** @ignore */ +export declare function isMessage(val: unknown): val is Message; +/** @ignore */ +export declare function parseMessage(data: unknown): Message; +/** + * @ignore + * Helps stringifying a valid message ready to be sent through the socket. + */ +export declare function stringifyMessage(msg: Message): string; diff --git a/lib/message.js b/lib/message.js new file mode 100644 index 00000000..e0150a7c --- /dev/null +++ b/lib/message.js @@ -0,0 +1,89 @@ +"use strict"; +/** + * + * message + * + */ +Object.defineProperty(exports, "__esModule", { value: true }); +exports.stringifyMessage = exports.parseMessage = exports.isMessage = exports.MessageType = void 0; +const utils_1 = require("./utils"); +/** Types of messages allowed to be sent by the client/server over the WS protocol. */ +var MessageType; +(function (MessageType) { + MessageType["ConnectionInit"] = "connection_init"; + MessageType["ConnectionAck"] = "connection_ack"; + MessageType["Subscribe"] = "subscribe"; + MessageType["Next"] = "next"; + MessageType["Error"] = "error"; + MessageType["Complete"] = "complete"; +})(MessageType = exports.MessageType || (exports.MessageType = {})); +/** @ignore */ +function isMessage(val) { + if (utils_1.isObject(val)) { + // all messages must have the `type` prop + if (!utils_1.hasOwnProperty(val, 'type')) { + return false; + } + // validate other properties depending on the `type` + switch (val.type) { + case MessageType.ConnectionInit: + // the connection init message can have optional object `connectionParams` in the payload + return (!utils_1.hasOwnProperty(val, 'payload') || + val.payload === undefined || + utils_1.isObject(val.payload)); + case MessageType.ConnectionAck: + return true; + case MessageType.Subscribe: + return (utils_1.hasOwnStringProperty(val, 'id') && + utils_1.hasOwnObjectProperty(val, 'payload') && + utils_1.hasOwnStringProperty(val.payload, 'operationName') && + (utils_1.hasOwnStringProperty(val.payload, 'query') || // string query + utils_1.hasOwnObjectProperty(val.payload, 'query')) && // document node query + utils_1.hasOwnObjectProperty(val.payload, 'variables')); + case MessageType.Next: + return (utils_1.hasOwnStringProperty(val, 'id') && + utils_1.hasOwnObjectProperty(val, 'payload') && + // ExecutionResult + (utils_1.hasOwnObjectProperty(val.payload, 'data') || + utils_1.hasOwnObjectProperty(val.payload, 'errors'))); + case MessageType.Error: + return (utils_1.hasOwnStringProperty(val, 'id') && + // GraphQLError + utils_1.hasOwnArrayProperty(val, 'payload') && + val.payload.length > 0 // must be at least one error + ); + case MessageType.Complete: + return utils_1.hasOwnStringProperty(val, 'id'); + default: + return false; + } + } + return false; +} +exports.isMessage = isMessage; +/** @ignore */ +function parseMessage(data) { + if (isMessage(data)) { + return data; + } + if (typeof data === 'string') { + const message = JSON.parse(data); + if (!isMessage(message)) { + throw new Error('Invalid message'); + } + return message; + } + throw new Error('Message not parsable'); +} +exports.parseMessage = parseMessage; +/** + * @ignore + * Helps stringifying a valid message ready to be sent through the socket. + */ +function stringifyMessage(msg) { + if (!isMessage(msg)) { + throw new Error('Cannot stringify invalid message'); + } + return JSON.stringify(msg); +} +exports.stringifyMessage = stringifyMessage; diff --git a/lib/protocol.d.ts b/lib/protocol.d.ts new file mode 100644 index 00000000..68372314 --- /dev/null +++ b/lib/protocol.d.ts @@ -0,0 +1,7 @@ +/** + * + * protocol + * + */ +/** The WebSocket sub-protocol used for the [GraphQL over WebSocket Protocol](/PROTOCOL.md). */ +export declare const GRAPHQL_TRANSPORT_WS_PROTOCOL = "graphql-transport-ws"; diff --git a/lib/protocol.js b/lib/protocol.js new file mode 100644 index 00000000..686ff1dd --- /dev/null +++ b/lib/protocol.js @@ -0,0 +1,10 @@ +"use strict"; +/** + * + * protocol + * + */ +Object.defineProperty(exports, "__esModule", { value: true }); +exports.GRAPHQL_TRANSPORT_WS_PROTOCOL = void 0; +/** The WebSocket sub-protocol used for the [GraphQL over WebSocket Protocol](/PROTOCOL.md). */ +exports.GRAPHQL_TRANSPORT_WS_PROTOCOL = 'graphql-transport-ws'; diff --git a/lib/server.d.ts b/lib/server.d.ts new file mode 100644 index 00000000..f6ddba66 --- /dev/null +++ b/lib/server.d.ts @@ -0,0 +1,130 @@ +/** + * + * server + * + */ +import WebSocket from 'ws'; +import { GraphQLSchema, ValidationRule, ExecutionResult, ExecutionArgs } from 'graphql'; +import { Disposable } from './types'; +import { SubscribeMessage, CompleteMessage } from './message'; +import { Optional } from './utils'; +import { UUID } from './types'; +export interface ServerOptions { + /** + * The GraphQL schema on which the operations + * will be executed and validated against. If + * the schema is left undefined, one must be + * provided by in the resulting `ExecutionArgs` + * from the `onSubscribe` callback. + */ + schema?: GraphQLSchema; + /** + * Is the `subscribe` function + * from GraphQL which is used to + * execute the subscription operation + * upon. + */ + execute: (args: ExecutionArgs) => Promise | ExecutionResult; + /** + * Is the `subscribe` function + * from GraphQL which is used to + * execute the subscription operation + * upon. + */ + subscribe: (args: ExecutionArgs) => Promise | ExecutionResult>; + /** + * Is the connection callback called when the + * client requests the connection initialisation + * through the message `ConnectionInit`. The message + * payload (`connectionParams` on the client) is + * present in the `Context.connectionParams`. + * + * - Returning `true` from the callback will + * allow the client to connect. + * + * - Returning `false` from the callback will + * terminate the socket by dispatching the + * close event `4403: Forbidden`. + * + * - Throwing an error from the callback will + * terminate the socket by dispatching the + * close event `4400: `, where + * the `` is the message of the + * thrown `Error`. + */ + onConnect?: (ctx: Context) => Promise | boolean; + /** + * @default 3 * 1000 (3 seconds) + * + * The amount of time for which the + * server will wait for `ConnectionInit` message. + * + * Set the value to `Infinity` to skip waiting. + * + * If the wait timeout has passed and the client + * has not sent the `ConnectionInit` message, + * the server will terminate the socket by + * dispatching a close event `4408: Connection initialisation timeout` + */ + connectionInitWaitTimeout?: number; + /** + * Custom validation rules overriding all + * validation rules defined by the GraphQL spec. + */ + validationRules?: readonly ValidationRule[]; + /** + * Format the operation execution results + * if the implementation requires an adjusted + * result. + */ + formatExecutionResult?: (ctx: Context, result: ExecutionResult) => Promise | ExecutionResult; + /** + * The subscribe callback executed before + * the actual operation execution. Useful + * for manipulating the execution arguments + * before the doing the operation. + */ + onSubscribe?: (ctx: Context, message: SubscribeMessage, args: Optional) => Promise | ExecutionArgs; + /** + * The complete callback is executed after the + * operation has completed or the subscription + * has been closed. + */ + onComplete?: (ctx: Context, message: CompleteMessage) => void; +} +export interface Context { + readonly socket: WebSocket; + /** + * Indicates that the `ConnectionInit` message + * has been received by the server. If this is + * `true`, the client wont be kicked off after + * the wait timeout has passed. + */ + connectionInitReceived: boolean; + /** + * Indicates that the connection was acknowledged + * by having dispatched the `ConnectionAck` message + * to the related client. + */ + acknowledged: boolean; + /** The parameters passed during the connection initialisation. */ + connectionParams?: Readonly>; + /** + * Holds the active subscriptions for this context. + * Subscriptions are for `subscription` operations **only**, + * other operations (`query`/`mutation`) are resolved immediately. + */ + subscriptions: Record>; +} +export interface Server extends Disposable { + webSocketServer: WebSocket.Server; +} +declare type WebSocketServerOptions = WebSocket.ServerOptions; +declare type WebSocketServer = WebSocket.Server; +/** + * Creates a protocol complient WebSocket GraphQL + * subscription server. Read more about the protocol + * in the PROTOCOL.md documentation file. + */ +export declare function createServer(options: ServerOptions, websocketOptionsOrServer: WebSocketServerOptions | WebSocketServer): Server; +export {}; diff --git a/lib/server.js b/lib/server.js new file mode 100644 index 00000000..c19e0e29 --- /dev/null +++ b/lib/server.js @@ -0,0 +1,293 @@ +"use strict"; +/** + * + * server + * + */ +var __asyncValues = (this && this.__asyncValues) || function (o) { + if (!Symbol.asyncIterator) throw new TypeError("Symbol.asyncIterator is not defined."); + var m = o[Symbol.asyncIterator], i; + return m ? m.call(o) : (o = typeof __values === "function" ? __values(o) : o[Symbol.iterator](), i = {}, verb("next"), verb("throw"), verb("return"), i[Symbol.asyncIterator] = function () { return this; }, i); + function verb(n) { i[n] = o[n] && function (v) { return new Promise(function (resolve, reject) { v = o[n](v), settle(resolve, reject, v.done, v.value); }); }; } + function settle(resolve, reject, d, v) { Promise.resolve(v).then(function(v) { resolve({ value: v, done: d }); }, reject); } +}; +var __importDefault = (this && this.__importDefault) || function (mod) { + return (mod && mod.__esModule) ? mod : { "default": mod }; +}; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.createServer = void 0; +const ws_1 = __importDefault(require("ws")); +const graphql_1 = require("graphql"); +const protocol_1 = require("./protocol"); +const message_1 = require("./message"); +const utils_1 = require("./utils"); +/** + * Creates a protocol complient WebSocket GraphQL + * subscription server. Read more about the protocol + * in the PROTOCOL.md documentation file. + */ +function createServer(options, websocketOptionsOrServer) { + const { schema, execute, onConnect, connectionInitWaitTimeout = 3 * 1000, // 3 seconds + validationRules, formatExecutionResult, onSubscribe, onComplete, } = options; + const webSocketServer = websocketOptionsOrServer instanceof ws_1.default.Server + ? websocketOptionsOrServer + : new ws_1.default.Server(websocketOptionsOrServer); + function handleConnection(socket, _request) { + if (socket.protocol === undefined || + socket.protocol !== protocol_1.GRAPHQL_TRANSPORT_WS_PROTOCOL || + (Array.isArray(socket.protocol) && + socket.protocol.indexOf(protocol_1.GRAPHQL_TRANSPORT_WS_PROTOCOL) === -1)) { + // 1002: Protocol Error + socket.close(1002, 'Protocol Error'); + return; + } + const ctxRef = { + current: { + socket, + connectionInitReceived: false, + acknowledged: false, + subscriptions: {}, + }, + }; + // kick the client off (close socket) if the connection has + // not been initialised after the specified wait timeout + const connectionInitWait = connectionInitWaitTimeout !== Infinity && + setTimeout(() => { + if (!ctxRef.current.connectionInitReceived) { + ctxRef.current.socket.close(4408, 'Connection initialisation timeout'); + } + }, connectionInitWaitTimeout); + function errorOrCloseHandler(errorOrClose) { + if (connectionInitWait) { + clearTimeout(connectionInitWait); + } + if (isErrorEvent(errorOrClose)) { + // TODO-db-200805 leaking sensitive information by sending the error message too? + // 1011: Internal Error + ctxRef.current.socket.close(1011, errorOrClose.message); + } + Object.entries(ctxRef.current.subscriptions).forEach(([, subscription]) => { + (subscription.return || utils_1.noop)(); + }); + } + socket.onerror = errorOrCloseHandler; + socket.onclose = errorOrCloseHandler; + socket.onmessage = makeOnMessage(ctxRef.current); + } + webSocketServer.on('connection', handleConnection); + webSocketServer.on('error', (err) => { + for (const client of webSocketServer.clients) { + // report server errors by erroring out all clients with the same error + client.emit('error', err); + } + }); + // Sends through a message only if the socket is open. + function sendMessage(ctx, message, callback) { + return new Promise((resolve, reject) => { + if (ctx.socket.readyState === ws_1.default.OPEN) { + try { + ctx.socket.send(message_1.stringifyMessage(message), (err) => { + if (callback) + callback(err); + if (err) { + return reject(err); + } + return resolve(); + }); + } + catch (err) { + reject(err); + } + } + else { + if (callback) + callback(); + resolve(); + } + }); + } + function makeOnMessage(ctx) { + return async function (event) { + var e_1, _a; + var _b; + try { + const message = message_1.parseMessage(event.data); + switch (message.type) { + case message_1.MessageType.ConnectionInit: { + ctx.connectionInitReceived = true; + if (utils_1.isObject(message.payload)) { + ctx.connectionParams = message.payload; + } + if (onConnect) { + const permitted = await onConnect(ctx); + if (!permitted) { + return ctx.socket.close(4403, 'Forbidden'); + } + } + await sendMessage(ctx, { + type: message_1.MessageType.ConnectionAck, + }); + ctx.acknowledged = true; + break; + } + case message_1.MessageType.Subscribe: { + if (!ctx.acknowledged) { + return ctx.socket.close(4401, 'Unauthorized'); + } + const operation = message.payload; + let execArgsMaybeSchema = { + schema, + operationName: operation.operationName, + document: typeof operation.query === 'string' + ? graphql_1.parse(operation.query) + : operation.query, + variableValues: operation.variables, + }; + if (onSubscribe) { + execArgsMaybeSchema = await onSubscribe(ctx, message, execArgsMaybeSchema); + } + if (!execArgsMaybeSchema.schema) { + // not providing a schema is a fatal server error + return webSocketServer.emit('error', new Error('The GraphQL schema is not provided')); + } + // the execution arguments should be complete now + const execArgs = execArgsMaybeSchema; + // validate + const validationErrors = graphql_1.validate(execArgs.schema, execArgs.document, validationRules); + if (validationErrors.length > 0) { + return await sendMessage(ctx, { + id: message.id, + type: message_1.MessageType.Error, + payload: validationErrors, + }); + } + // execute + const operationAST = graphql_1.getOperationAST(execArgs.document, execArgs.operationName); + if (!operationAST) { + throw new Error('Unable to get operation AST'); + } + if (operationAST.operation === 'subscription') { + const subscriptionOrResult = await graphql_1.subscribe(execArgs); + if (utils_1.isAsyncIterable(subscriptionOrResult)) { + ctx.subscriptions[message.id] = subscriptionOrResult; + try { + try { + for (var subscriptionOrResult_1 = __asyncValues(subscriptionOrResult), subscriptionOrResult_1_1; subscriptionOrResult_1_1 = await subscriptionOrResult_1.next(), !subscriptionOrResult_1_1.done;) { + let result = subscriptionOrResult_1_1.value; + if (formatExecutionResult) { + result = await formatExecutionResult(ctx, result); + } + await sendMessage(ctx, { + id: message.id, + type: message_1.MessageType.Next, + payload: result, + }); + } + } + catch (e_1_1) { e_1 = { error: e_1_1 }; } + finally { + try { + if (subscriptionOrResult_1_1 && !subscriptionOrResult_1_1.done && (_a = subscriptionOrResult_1.return)) await _a.call(subscriptionOrResult_1); + } + finally { if (e_1) throw e_1.error; } + } + const completeMessage = { + id: message.id, + type: message_1.MessageType.Complete, + }; + await sendMessage(ctx, completeMessage); + if (onComplete) { + onComplete(ctx, completeMessage); + } + } + catch (err) { + await sendMessage(ctx, { + id: message.id, + type: message_1.MessageType.Error, + payload: [ + new graphql_1.GraphQLError(err instanceof Error + ? err.message + : new Error(err).message), + ], + }); + } + finally { + delete ctx.subscriptions[message.id]; + } + } + else { + let result = subscriptionOrResult; + if (formatExecutionResult) { + result = await formatExecutionResult(ctx, result); + } + await sendMessage(ctx, { + id: message.id, + type: message_1.MessageType.Next, + payload: result, + }); + const completeMessage = { + id: message.id, + type: message_1.MessageType.Complete, + }; + await sendMessage(ctx, completeMessage); + if (onComplete) { + onComplete(ctx, completeMessage); + } + } + } + else { + // operationAST.operation === 'query' || 'mutation' + let result = await execute(execArgs); + if (formatExecutionResult) { + result = await formatExecutionResult(ctx, result); + } + await sendMessage(ctx, { + id: message.id, + type: message_1.MessageType.Next, + payload: result, + }); + const completeMessage = { + id: message.id, + type: message_1.MessageType.Complete, + }; + await sendMessage(ctx, completeMessage); + if (onComplete) { + onComplete(ctx, completeMessage); + } + } + break; + } + case message_1.MessageType.Complete: { + if (ctx.subscriptions[message.id]) { + await ((_b = ctx.subscriptions[message.id].return) !== null && _b !== void 0 ? _b : utils_1.noop)(); + } + break; + } + default: + throw new Error(`Unexpected message of type ${message.type} received`); + } + } + catch (err) { + ctx.socket.close(4400, err.message); + } + }; + } + return { + webSocketServer, + dispose: async () => { + for (const client of webSocketServer.clients) { + // 1001: Going away + client.close(1001, 'Going away'); + } + webSocketServer.removeAllListeners(); + await new Promise((resolve, reject) => webSocketServer.close((err) => (err ? reject(err) : resolve()))); + }, + }; +} +exports.createServer = createServer; +function isErrorEvent(obj) { + return (utils_1.isObject(obj) && + utils_1.hasOwnObjectProperty(obj, 'error') && + utils_1.hasOwnStringProperty(obj, 'message') && + utils_1.hasOwnStringProperty(obj, 'type')); +} diff --git a/lib/utils.d.ts b/lib/utils.d.ts new file mode 100644 index 00000000..49585435 --- /dev/null +++ b/lib/utils.d.ts @@ -0,0 +1,14 @@ +/** + * + * utils + * + */ +export declare type Optional = Pick> & Partial>; +export declare function isObject(val: unknown): val is Record; +export declare function isArray(val: unknown): val is unknown[]; +export declare function isAsyncIterable(val: unknown): val is AsyncIterableIterator; +export declare function hasOwnProperty, P extends PropertyKey>(obj: O, prop: P): obj is O & Record; +export declare function hasOwnObjectProperty, P extends PropertyKey>(obj: O, prop: P): obj is O & Record>; +export declare function hasOwnArrayProperty, P extends PropertyKey>(obj: O, prop: P): obj is O & Record; +export declare function hasOwnStringProperty, P extends PropertyKey>(obj: O, prop: P): obj is O & Record; +export declare function noop(): void; diff --git a/lib/utils.js b/lib/utils.js new file mode 100644 index 00000000..70bea24b --- /dev/null +++ b/lib/utils.js @@ -0,0 +1,41 @@ +"use strict"; +/** + * + * utils + * + */ +Object.defineProperty(exports, "__esModule", { value: true }); +exports.noop = exports.hasOwnStringProperty = exports.hasOwnArrayProperty = exports.hasOwnObjectProperty = exports.hasOwnProperty = exports.isAsyncIterable = exports.isArray = exports.isObject = void 0; +function isObject(val) { + return typeof val === 'object' && val !== null; +} +exports.isObject = isObject; +function isArray(val) { + return typeof val === 'object' && val !== null && Array.isArray(val); +} +exports.isArray = isArray; +function isAsyncIterable(val) { + return typeof Object(val)[Symbol.asyncIterator] === 'function'; +} +exports.isAsyncIterable = isAsyncIterable; +function hasOwnProperty(obj, prop) { + return Object.prototype.hasOwnProperty.call(obj, prop); +} +exports.hasOwnProperty = hasOwnProperty; +function hasOwnObjectProperty(obj, prop) { + return Object.prototype.hasOwnProperty.call(obj, prop) && isObject(obj[prop]); +} +exports.hasOwnObjectProperty = hasOwnObjectProperty; +function hasOwnArrayProperty(obj, prop) { + return Object.prototype.hasOwnProperty.call(obj, prop) && isArray(obj[prop]); +} +exports.hasOwnArrayProperty = hasOwnArrayProperty; +function hasOwnStringProperty(obj, prop) { + return (Object.prototype.hasOwnProperty.call(obj, prop) && + typeof obj[prop] === 'string'); +} +exports.hasOwnStringProperty = hasOwnStringProperty; +function noop() { + /**/ +} +exports.noop = noop;