Skip to content

Commit

Permalink
fix(event): Subscriber ID miss usage (#11047)
Browse files Browse the repository at this point in the history
PARTIALLY RESOLVES FRMW-2876

**What**
Fix wrong usage of the `subscriberId` in the event bus. It happens that the subscriber id coming from the context was not used at all. This issue lead to duplicated event subscriber with the same subscriber id, it also prevent unsubscribing from event since rand id will be assigned.

**NOTE**
This PR does not handle overide strategy for subscribers with the same id. this still needs to be discussed
  • Loading branch information
adrien2p authored Jan 22, 2025
1 parent 8119d99 commit ecc8efc
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 31 deletions.
6 changes: 6 additions & 0 deletions .changeset/green-poets-teach.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@medusajs/event-bus-local": patch
"@medusajs/utils": patch
---

fix(event): Subscriber ID missusage
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { EventBusTypes } from "@medusajs/types"
import { AbstractEventBusModuleService } from ".."

class MockEventBusModuleService extends AbstractEventBusModuleService {
constructor() {
super({}, {}, {} as any)
}

async emit<T>(
data: EventBusTypes.Message<T> | EventBusTypes.Message<T>[],
options: Record<string, unknown>
): Promise<void> {
return Promise.resolve()
}

async releaseGroupedEvents(eventGroupId: string): Promise<void> {
return Promise.resolve()
}

async clearGroupedEvents(eventGroupId: string): Promise<void> {
return Promise.resolve()
}
}

describe("AbstractEventBusModuleService", () => {
it("should be able to subscribe to an event", () => {
const eventBus = new MockEventBusModuleService()
const subscriber = jest.fn()
eventBus.subscribe("test", subscriber)
expect(eventBus.eventToSubscribersMap.get("test")).toEqual([
{ id: (subscriber as any).subscriberId, subscriber },
])
})

it("should throw an error if a subscriber with the same id is already subscribed to an event", () => {
const eventBus = new MockEventBusModuleService()
const subscriber = jest.fn()
const subscriberId = "test"
eventBus.subscribe("test", subscriber, { subscriberId })
expect(() =>
eventBus.subscribe("test", subscriber, { subscriberId })
).toThrow()
})

it("should be able to unsubscribe from an event", () => {
const eventBus = new MockEventBusModuleService()
const subscriber = jest.fn()
eventBus.subscribe("test", subscriber)
eventBus.unsubscribe("test", subscriber)
expect(eventBus.eventToSubscribersMap.get("test")).toEqual([])
})
})
16 changes: 8 additions & 8 deletions packages/core/utils/src/event-bus/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,14 @@ export abstract class AbstractEventBusModuleService
* otherwise we generate a random using a ulid
*/

const randId = ulid()
const event = eventName.toString()
const subscriberId = context?.subscriberId ?? `${event}-${ulid()}`

;(subscriber as any).subscriberId = subscriberId

this.storeSubscribers({
event,
subscriberId: context?.subscriberId ?? `${event}-${randId}`,
subscriberId,
subscriber,
})

Expand All @@ -100,21 +102,19 @@ export abstract class AbstractEventBusModuleService
unsubscribe(
eventName: string | symbol,
subscriber: EventBusTypes.Subscriber,
context: EventBusTypes.SubscriberContext
context?: EventBusTypes.SubscriberContext
): this {
if (!this.isWorkerMode) {
return this
}

if (typeof subscriber !== `function`) {
throw new Error("Subscriber must be a function")
}

const existingSubscribers = this.eventToSubscribersMap_.get(eventName)
const subscriberId =
context?.subscriberId ?? (subscriber as any).subscriberId

if (existingSubscribers?.length) {
const subIndex = existingSubscribers?.findIndex(
(sub) => sub.id === context?.subscriberId
(sub) => sub.id === subscriberId
)

if (subIndex !== -1) {
Expand Down
32 changes: 9 additions & 23 deletions packages/modules/event-bus-local/src/services/event-bus-local.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
import { AbstractEventBusModuleService } from "@medusajs/framework/utils"
import { EventEmitter } from "events"
import { setTimeout } from "timers/promises"
import { ulid } from "ulid"

type InjectedDependencies = {
logger: Logger
Expand Down Expand Up @@ -133,13 +132,13 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
this.groupedEventsMap_.delete(eventGroupId)
}

subscribe(event: string | symbol, subscriber: Subscriber): this {
if (!this.isWorkerMode) {
return this
}
subscribe(
event: string | symbol,
subscriber: Subscriber,
context?: EventBusTypes.SubscriberContext
): this {
super.subscribe(event, subscriber, context)

const randId = ulid()
this.storeSubscribers({ event, subscriberId: randId, subscriber })
this.eventEmitter_.on(event, async (data: Event) => {
try {
await subscriber(data)
Expand All @@ -150,29 +149,16 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
this.logger_?.error(err)
}
})

return this
}

unsubscribe(
event: string | symbol,
subscriber: Subscriber,
context?: EventBusTypes.SubscriberContext
context: EventBusTypes.SubscriberContext
): this {
if (!this.isWorkerMode) {
return this
}

const existingSubscribers = this.eventToSubscribersMap_.get(event)

if (existingSubscribers?.length) {
const subIndex = existingSubscribers?.findIndex(
(sub) => sub.id === context?.subscriberId
)

if (subIndex !== -1) {
this.eventToSubscribersMap_.get(event)?.splice(subIndex as number, 1)
}
}
super.unsubscribe(event, subscriber, context)

this.eventEmitter_.off(event, subscriber)
return this
Expand Down

0 comments on commit ecc8efc

Please sign in to comment.