diff --git a/src/client.ts b/src/client.ts index c4c7780c..fe26cd44 100644 --- a/src/client.ts +++ b/src/client.ts @@ -293,15 +293,30 @@ export function createClient(options: ClientOptions): Client { // websocket status emitter, subscriptions are handled differently const emitter = (() => { + const message = (() => { + const listeners: { [key: string]: EventMessageListener } = {}; + return { + on(id: string, listener: EventMessageListener) { + listeners[id] = listener; + return () => { + delete listeners[id]; + }; + }, + emit(message: Message) { + if ('id' in message) listeners[message.id]?.(message); + }, + }; + })(); const listeners: { [event in Event]: EventListener[] } = { connecting: on?.connecting ? [on.connecting] : [], connected: on?.connected ? [on.connected] : [], - message: on?.message ? [on.message] : [], + message: on?.message ? [message.emit, on.message] : [message.emit], closed: on?.closed ? [on.closed] : [], error: on?.error ? [on.error] : [], }; return { + onMessage: message.on, on(event: E, listener: EventListener) { const l = listeners[event] as EventListener[]; l.push(listener); @@ -518,28 +533,24 @@ export function createClient(options: ClientOptions): Client { // if completed while waiting for connect, release the connection lock right away if (completed) return release(); - const unlisten = emitter.on('message', (message) => { + const unlisten = emitter.onMessage(id, (message) => { switch (message.type) { case MessageType.Next: { // eslint-disable-next-line @typescript-eslint/no-explicit-any - if (message.id === id) sink.next(message.payload as any); + sink.next(message.payload as any); return; } case MessageType.Error: { - if (message.id === id) { - completed = true; - sink.error(message.payload); - releaser(); - // TODO-db-201025 calling releaser will complete the sink, meaning that both the `error` and `complete` will be - // called. neither promises or observables care; once they settle, additional calls to the resolvers will be ignored - } + completed = true; + sink.error(message.payload); + releaser(); + // TODO-db-201025 calling releaser will complete the sink, meaning that both the `error` and `complete` will be + // called. neither promises or observables care; once they settle, additional calls to the resolvers will be ignored return; } case MessageType.Complete: { - if (message.id === id) { - completed = true; - releaser(); // release completes the sink - } + completed = true; + releaser(); // release completes the sink return; } }