Skip to content

Commit

Permalink
fix: Register workflow worker on application start
Browse files Browse the repository at this point in the history
  • Loading branch information
sradevski committed Jul 17, 2024
1 parent 5f374a4 commit f41dde9
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 27 deletions.
2 changes: 2 additions & 0 deletions packages/core/medusa-test-utils/src/init-modules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ export async function initModules({
injectedDependencies,
})

await medusaApp.onApplicationStart()

async function shutdown() {
if (shouldDestroyConnectionAutomatically) {
await medusaApp.onApplicationPrepareShutdown()
Expand Down
2 changes: 1 addition & 1 deletion packages/core/modules-sdk/src/definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ export const ModulesDefinition: {
isRequired: false,
isQueryable: true,
dependencies: ["logger"],
passSharedContainer: true,
__passSharedContainer: true,
defaultModuleDeclaration: {
scope: MODULE_SCOPE.INTERNAL,
resources: MODULE_RESOURCE_TYPE.SHARED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ export async function loadInternalModule(
)
}

if (resolution.definition.passSharedContainer) {
if (resolution.definition.__passSharedContainer) {
localContainer.register(
"sharedContainer",
asFunction(() => {
Expand Down
13 changes: 8 additions & 5 deletions packages/core/modules-sdk/src/medusa-app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ export type MedusaAppOutput = {
generateMigrations: GenerateMigrations
onApplicationShutdown: () => Promise<void>
onApplicationPrepareShutdown: () => Promise<void>
onApplicationStart: () => Promise<void>
sharedContainer?: MedusaContainer
}

Expand Down Expand Up @@ -284,6 +285,10 @@ async function MedusaApp_({
await promiseAll([MedusaModule.onApplicationPrepareShutdown()])
}

const onApplicationStart = async () => {
await MedusaModule.onApplicationStart()
}

const modules: MedusaModuleConfig =
modulesConfig ??
(
Expand Down Expand Up @@ -349,6 +354,7 @@ async function MedusaApp_({
return {
onApplicationShutdown,
onApplicationPrepareShutdown,
onApplicationStart,
modules: allModules,
link: undefined,
query: async () => {
Expand Down Expand Up @@ -536,6 +542,7 @@ async function MedusaApp_({
return {
onApplicationShutdown,
onApplicationPrepareShutdown,
onApplicationStart,
modules: allModules,
link: remoteLink,
query,
Expand All @@ -551,11 +558,7 @@ async function MedusaApp_({
export async function MedusaApp(
options: MedusaAppOptions = {}
): Promise<MedusaAppOutput> {
try {
return await MedusaApp_(options)
} finally {
MedusaModule.onApplicationStart(options.onApplicationStartCb)
}
return await MedusaApp_(options)
}

export async function MedusaAppMigrateUp(
Expand Down
4 changes: 2 additions & 2 deletions packages/core/types/src/modules-sdk/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ export type ModuleDefinition = {
isRequired?: boolean
isQueryable?: boolean // If the module is queryable via Remote Joiner
dependencies?: string[]
// Only to be used in exceptional cases - relying on the shared container will break the isolation of the module
passSharedContainer?: boolean
/** @internal only used in exceptional cases - relying on the shared contrainer breaks encapsulation */
__passSharedContainer?: boolean
defaultModuleDeclaration:
| InternalModuleDeclaration
| ExternalModuleDeclaration
Expand Down
14 changes: 9 additions & 5 deletions packages/medusa/src/loaders/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,14 @@ export default async ({
const plugins = getResolvedPlugins(rootDirectory, configModule, true) || []
const pluginLinks = await resolvePluginsLinks(plugins, container)

const { onApplicationShutdown, onApplicationPrepareShutdown } =
await loadMedusaApp({
container,
linkModules: pluginLinks,
})
const {
onApplicationStart,
onApplicationShutdown,
onApplicationPrepareShutdown,
} = await loadMedusaApp({
container,
linkModules: pluginLinks,
})

await registerWorkflows(plugins)
const entrypointsShutdown = await loadEntrypoints(
Expand All @@ -168,6 +171,7 @@ export default async ({
rootDirectory
)
await createDefaultsWorkflow(container).run()
await onApplicationStart()

const shutdown = async () => {
const pgConnection = container.resolve(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ describe("Workflow Orchestrator module", function () {
query: remoteQuery,
modules,
sharedContainer,
onApplicationStart,
} = await MedusaApp({
sharedContainer: container,
sharedResourcesConfig: {
Expand All @@ -73,6 +74,8 @@ describe("Workflow Orchestrator module", function () {
},
})

await onApplicationStart()

query = remoteQuery
sharedContainer_ = sharedContainer!

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ export class WorkflowOrchestratorService {
}
}

async onApplicationStart() {
await this.redisDistributedTransactionStorage_.onApplicationStart()
}

@InjectSharedContext()
async run<T = unknown>(
workflowIdOrWorkflow: string | ReturnWorkflow<any, any, any>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ export class WorkflowsModuleService<
onApplicationPrepareShutdown: async () => {
await this.workflowOrchestratorService_.onApplicationPrepareShutdown()
},
onApplicationStart: async () => {
await this.workflowOrchestratorService_.onApplicationStart()
},
}

@InjectSharedContext()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export class RedisDistributedTransactionStorage
private workflowOrchestratorService_: WorkflowOrchestratorService

private redisClient: Redis
private redisWorkerConnection: Redis
private queueName: string
private queue: Queue
private worker: Worker

Expand All @@ -47,12 +49,24 @@ export class RedisDistributedTransactionStorage
}) {
this.workflowExecutionService_ = workflowExecutionService
this.logger_ = logger

this.redisClient = redisConnection

this.redisWorkerConnection = redisWorkerConnection
this.queueName = redisQueueName
this.queue = new Queue(redisQueueName, { connection: this.redisClient })
}

async onApplicationPrepareShutdown() {
// Close worker gracefully, i.e. wait for the current jobs to finish
await this.worker.close()
}

async onApplicationShutdown() {
await this.queue.close()
}

async onApplicationStart() {
this.worker = new Worker(
redisQueueName,
this.queueName,
async (job) => {
const allJobs = [
JobType.RETRY,
Expand All @@ -75,19 +89,10 @@ export class RedisDistributedTransactionStorage
)
}
},
{ connection: redisWorkerConnection }
{ connection: this.redisWorkerConnection }
)
}

async onApplicationPrepareShutdown() {
// Close worker gracefully, i.e. wait for the current jobs to finish
await this.worker.close()
}

async onApplicationShutdown() {
await this.queue.close()
}

setWorkflowOrchestratorService(workflowOrchestratorService) {
this.workflowOrchestratorService_ = workflowOrchestratorService
}
Expand Down

0 comments on commit f41dde9

Please sign in to comment.