Skip to content

Commit

Permalink
fix event passed to subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
olivermrbl committed Aug 6, 2024
1 parent 200baf8 commit a36fc26
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 16 deletions.
36 changes: 21 additions & 15 deletions packages/modules/event-bus-redis/src/services/event-bus-redis.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { InternalModuleDeclaration } from "@medusajs/modules-sdk"
import { Event, Logger, Message } from "@medusajs/types"
import { Logger, Message } from "@medusajs/types"
import {
AbstractEventBusModuleService,
isPresent,
Expand All @@ -14,7 +14,9 @@ type InjectedDependencies = {
eventBusRedisConnection: Redis
}

type IORedisEventType<T = unknown> = Event<T> & {
type IORedisEventType<T = unknown> = {
name: string
data: T
opts: BulkJobOptions
}

Expand Down Expand Up @@ -93,15 +95,14 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
}

return eventsData.map((eventData) => {
const { options, ...eventBody } = eventData

return {
...eventBody,
data: eventData.data,
name: eventData.name,
opts: {
// options for event group
...opts,
// options for a particular event
...options,
...eventData.options,
},
}
})
Expand Down Expand Up @@ -216,8 +217,8 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
* @return resolves to the results of the subscriber calls.
*/
worker_ = async <T>(job: BullJob<T>): Promise<unknown> => {
const { data, name: eventName } = job
const eventSubscribers = this.eventToSubscribersMap.get(eventName) || []
const { data, name } = job
const eventSubscribers = this.eventToSubscribersMap.get(name) || []
const wildcardSubscribers = this.eventToSubscribersMap.get("*") || []

const allSubscribers = eventSubscribers.concat(wildcardSubscribers)
Expand All @@ -239,31 +240,36 @@ export default class RedisEventBusService extends AbstractEventBusModuleService

if (isRetry) {
if (isFinalAttempt) {
this.logger_.info(`Final retry attempt for ${eventName}`)
this.logger_.info(`Final retry attempt for ${name}`)
}

this.logger_.info(
`Retrying ${eventName} which has ${eventSubscribers.length} subscribers (${subscribersInCurrentAttempt.length} of them failed)`
`Retrying ${name} which has ${eventSubscribers.length} subscribers (${subscribersInCurrentAttempt.length} of them failed)`
)
} else {
this.logger_.info(
`Processing ${eventName} which has ${eventSubscribers.length} subscribers`
`Processing ${name} which has ${eventSubscribers.length} subscribers`
)
}

const completedSubscribersInCurrentAttempt: string[] = []

const subscribersResult = await Promise.all(
subscribersInCurrentAttempt.map(async ({ id, subscriber }) => {
return await subscriber(data)
const event = {
name,
data,
}

return await subscriber(event)
.then(async (data) => {
// For every subscriber that completes successfully, add their id to the list of completed subscribers
completedSubscribersInCurrentAttempt.push(id)
return data
})
.catch((err) => {
this.logger_.warn(
`An error occurred while processing ${eventName}: ${err}`
`An error occurred while processing ${name}: ${err}`
)
return err
})
Expand Down Expand Up @@ -291,7 +297,7 @@ export default class RedisEventBusService extends AbstractEventBusModuleService

await job.updateData(job.data)

const errorMessage = `One or more subscribers of ${eventName} failed. Retrying...`
const errorMessage = `One or more subscribers of ${name} failed. Retrying...`

this.logger_.warn(errorMessage)

Expand All @@ -301,7 +307,7 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
if (didSubscribersFail && !isFinalAttempt) {
// If retrying is not configured, we log a warning to allow server admins to recover manually
this.logger_.warn(
`One or more subscribers of ${eventName} failed. Retrying is not configured. Use 'attempts' option when emitting events.`
`One or more subscribers of ${name} failed. Retrying is not configured. Use 'attempts' option when emitting events.`
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import {
ProductTypes,
} from "@medusajs/types"
import {
Image as ProductImage,
Product,
ProductCategory,
ProductCollection,
Image as ProductImage,
ProductOption,
ProductOptionValue,
ProductTag,
Expand Down

0 comments on commit a36fc26

Please sign in to comment.