diff --git a/.changeset/green-poets-teach.md b/.changeset/green-poets-teach.md new file mode 100644 index 0000000000000..80f4ccff26290 --- /dev/null +++ b/.changeset/green-poets-teach.md @@ -0,0 +1,6 @@ +--- +"@medusajs/event-bus-local": patch +"@medusajs/utils": patch +--- + +fix(event): Subscriber ID missusage diff --git a/packages/core/utils/src/event-bus/__tests__/abstract-event-bus-module.spec.ts b/packages/core/utils/src/event-bus/__tests__/abstract-event-bus-module.spec.ts new file mode 100644 index 0000000000000..44db3c2c2f8b8 --- /dev/null +++ b/packages/core/utils/src/event-bus/__tests__/abstract-event-bus-module.spec.ts @@ -0,0 +1,52 @@ +import { EventBusTypes } from "@medusajs/types" +import { AbstractEventBusModuleService } from ".." + +class MockEventBusModuleService extends AbstractEventBusModuleService { + constructor() { + super({}, {}, {} as any) + } + + async emit( + data: EventBusTypes.Message | EventBusTypes.Message[], + options: Record + ): Promise { + return Promise.resolve() + } + + async releaseGroupedEvents(eventGroupId: string): Promise { + return Promise.resolve() + } + + async clearGroupedEvents(eventGroupId: string): Promise { + return Promise.resolve() + } +} + +describe("AbstractEventBusModuleService", () => { + it("should be able to subscribe to an event", () => { + const eventBus = new MockEventBusModuleService() + const subscriber = jest.fn() + eventBus.subscribe("test", subscriber) + expect(eventBus.eventToSubscribersMap.get("test")).toEqual([ + { id: (subscriber as any).subscriberId, subscriber }, + ]) + }) + + it("should throw an error if a subscriber with the same id is already subscribed to an event", () => { + const eventBus = new MockEventBusModuleService() + const subscriber = jest.fn() + const subscriberId = "test" + eventBus.subscribe("test", subscriber, { subscriberId }) + expect(() => + eventBus.subscribe("test", subscriber, { subscriberId }) + ).toThrow() + }) + + it("should be able to unsubscribe from an event", () => { + const eventBus = new MockEventBusModuleService() + const subscriber = jest.fn() + eventBus.subscribe("test", subscriber) + eventBus.unsubscribe("test", subscriber) + expect(eventBus.eventToSubscribersMap.get("test")).toEqual([]) + }) +}) diff --git a/packages/core/utils/src/event-bus/index.ts b/packages/core/utils/src/event-bus/index.ts index c10a4de9d0989..63a99744970d4 100644 --- a/packages/core/utils/src/event-bus/index.ts +++ b/packages/core/utils/src/event-bus/index.ts @@ -85,12 +85,14 @@ export abstract class AbstractEventBusModuleService * otherwise we generate a random using a ulid */ - const randId = ulid() const event = eventName.toString() + const subscriberId = context?.subscriberId ?? `${event}-${ulid()}` + + ;(subscriber as any).subscriberId = subscriberId this.storeSubscribers({ event, - subscriberId: context?.subscriberId ?? `${event}-${randId}`, + subscriberId, subscriber, }) @@ -100,21 +102,19 @@ export abstract class AbstractEventBusModuleService unsubscribe( eventName: string | symbol, subscriber: EventBusTypes.Subscriber, - context: EventBusTypes.SubscriberContext + context?: EventBusTypes.SubscriberContext ): this { if (!this.isWorkerMode) { return this } - if (typeof subscriber !== `function`) { - throw new Error("Subscriber must be a function") - } - const existingSubscribers = this.eventToSubscribersMap_.get(eventName) + const subscriberId = + context?.subscriberId ?? (subscriber as any).subscriberId if (existingSubscribers?.length) { const subIndex = existingSubscribers?.findIndex( - (sub) => sub.id === context?.subscriberId + (sub) => sub.id === subscriberId ) if (subIndex !== -1) { diff --git a/packages/modules/event-bus-local/src/services/event-bus-local.ts b/packages/modules/event-bus-local/src/services/event-bus-local.ts index 1bc97179e3f9a..eb537c672bfd9 100644 --- a/packages/modules/event-bus-local/src/services/event-bus-local.ts +++ b/packages/modules/event-bus-local/src/services/event-bus-local.ts @@ -10,7 +10,6 @@ import { import { AbstractEventBusModuleService } from "@medusajs/framework/utils" import { EventEmitter } from "events" import { setTimeout } from "timers/promises" -import { ulid } from "ulid" type InjectedDependencies = { logger: Logger @@ -133,13 +132,13 @@ export default class LocalEventBusService extends AbstractEventBusModuleService this.groupedEventsMap_.delete(eventGroupId) } - subscribe(event: string | symbol, subscriber: Subscriber): this { - if (!this.isWorkerMode) { - return this - } + subscribe( + event: string | symbol, + subscriber: Subscriber, + context?: EventBusTypes.SubscriberContext + ): this { + super.subscribe(event, subscriber, context) - const randId = ulid() - this.storeSubscribers({ event, subscriberId: randId, subscriber }) this.eventEmitter_.on(event, async (data: Event) => { try { await subscriber(data) @@ -150,29 +149,16 @@ export default class LocalEventBusService extends AbstractEventBusModuleService this.logger_?.error(err) } }) + return this } unsubscribe( event: string | symbol, subscriber: Subscriber, - context?: EventBusTypes.SubscriberContext + context: EventBusTypes.SubscriberContext ): this { - if (!this.isWorkerMode) { - return this - } - - const existingSubscribers = this.eventToSubscribersMap_.get(event) - - if (existingSubscribers?.length) { - const subIndex = existingSubscribers?.findIndex( - (sub) => sub.id === context?.subscriberId - ) - - if (subIndex !== -1) { - this.eventToSubscribersMap_.get(event)?.splice(subIndex as number, 1) - } - } + super.unsubscribe(event, subscriber, context) this.eventEmitter_.off(event, subscriber) return this