diff --git a/packages/core/medusa-test-utils/src/init-modules.ts b/packages/core/medusa-test-utils/src/init-modules.ts index 2ef4d55ade360..69fae43203470 100644 --- a/packages/core/medusa-test-utils/src/init-modules.ts +++ b/packages/core/medusa-test-utils/src/init-modules.ts @@ -56,6 +56,8 @@ export async function initModules({ injectedDependencies, }) + await medusaApp.onApplicationStart() + async function shutdown() { if (shouldDestroyConnectionAutomatically) { await medusaApp.onApplicationPrepareShutdown() diff --git a/packages/core/modules-sdk/src/definitions.ts b/packages/core/modules-sdk/src/definitions.ts index 1c8aae03f5c11..fc88a4119904d 100644 --- a/packages/core/modules-sdk/src/definitions.ts +++ b/packages/core/modules-sdk/src/definitions.ts @@ -145,6 +145,7 @@ export const ModulesDefinition: { isRequired: false, isQueryable: true, dependencies: ["logger"], + __passSharedContainer: true, defaultModuleDeclaration: { scope: MODULE_SCOPE.INTERNAL, resources: MODULE_RESOURCE_TYPE.SHARED, diff --git a/packages/core/modules-sdk/src/loaders/utils/load-internal.ts b/packages/core/modules-sdk/src/loaders/utils/load-internal.ts index df25cbacfe7c0..83fa8ba11779f 100644 --- a/packages/core/modules-sdk/src/loaders/utils/load-internal.ts +++ b/packages/core/modules-sdk/src/loaders/utils/load-internal.ts @@ -139,6 +139,15 @@ export async function loadInternalModule( ) } + if (resolution.definition.__passSharedContainer) { + localContainer.register( + "sharedContainer", + asFunction(() => { + return container + }) + ) + } + const loaders = moduleResources.loaders ?? loadedModule?.loaders ?? [] const error = await runLoaders(loaders, { container, diff --git a/packages/core/modules-sdk/src/medusa-app.ts b/packages/core/modules-sdk/src/medusa-app.ts index 84f42d93cd430..f0663be82eb56 100644 --- a/packages/core/modules-sdk/src/medusa-app.ts +++ b/packages/core/modules-sdk/src/medusa-app.ts @@ -233,6 +233,7 @@ export type MedusaAppOutput = { generateMigrations: GenerateMigrations onApplicationShutdown: () => Promise onApplicationPrepareShutdown: () => Promise + onApplicationStart: () => Promise sharedContainer?: MedusaContainer } @@ -284,6 +285,10 @@ async function MedusaApp_({ await promiseAll([MedusaModule.onApplicationPrepareShutdown()]) } + const onApplicationStart = async () => { + await MedusaModule.onApplicationStart() + } + const modules: MedusaModuleConfig = modulesConfig ?? ( @@ -349,6 +354,7 @@ async function MedusaApp_({ return { onApplicationShutdown, onApplicationPrepareShutdown, + onApplicationStart, modules: allModules, link: undefined, query: async () => { @@ -536,6 +542,7 @@ async function MedusaApp_({ return { onApplicationShutdown, onApplicationPrepareShutdown, + onApplicationStart, modules: allModules, link: remoteLink, query, @@ -551,11 +558,7 @@ async function MedusaApp_({ export async function MedusaApp( options: MedusaAppOptions = {} ): Promise { - try { - return await MedusaApp_(options) - } finally { - MedusaModule.onApplicationStart(options.onApplicationStartCb) - } + return await MedusaApp_(options) } export async function MedusaAppMigrateUp( diff --git a/packages/core/types/src/modules-sdk/index.ts b/packages/core/types/src/modules-sdk/index.ts index d770bad69c1d3..ab3422c976002 100644 --- a/packages/core/types/src/modules-sdk/index.ts +++ b/packages/core/types/src/modules-sdk/index.ts @@ -87,6 +87,8 @@ export type ModuleDefinition = { isRequired?: boolean isQueryable?: boolean // If the module is queryable via Remote Joiner dependencies?: string[] + /** @internal only used in exceptional cases - relying on the shared contrainer breaks encapsulation */ + __passSharedContainer?: boolean defaultModuleDeclaration: | InternalModuleDeclaration | ExternalModuleDeclaration diff --git a/packages/medusa/src/loaders/index.ts b/packages/medusa/src/loaders/index.ts index 50989a086558d..ad3217a729959 100644 --- a/packages/medusa/src/loaders/index.ts +++ b/packages/medusa/src/loaders/index.ts @@ -153,14 +153,17 @@ export default async ({ const plugins = getResolvedPlugins(rootDirectory, configModule, true) || [] const pluginLinks = await resolvePluginsLinks(plugins, container) - await registerWorkflows(plugins) - const { onApplicationShutdown, onApplicationPrepareShutdown } = - await loadMedusaApp({ - container, - linkModules: pluginLinks, - }) + const { + onApplicationStart, + onApplicationShutdown, + onApplicationPrepareShutdown, + } = await loadMedusaApp({ + container, + linkModules: pluginLinks, + }) + await registerWorkflows(plugins) const entrypointsShutdown = await loadEntrypoints( plugins, container, @@ -168,6 +171,7 @@ export default async ({ rootDirectory ) await createDefaultsWorkflow(container).run() + await onApplicationStart() const shutdown = async () => { const pgConnection = container.resolve( diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_scheduled.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_scheduled.ts index 770527d94ded7..81213a48f04c9 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_scheduled.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_scheduled.ts @@ -6,8 +6,10 @@ import { } from "@medusajs/workflows-sdk" export const createScheduled = (name: string, schedule?: SchedulerOptions) => { - const workflowScheduledStepInvoke = jest.fn((input, context) => { - return new StepResponse({}) + const workflowScheduledStepInvoke = jest.fn((input, { container }) => { + return new StepResponse({ + testValue: container.resolve("test-value"), + }) }) const step = createStep("step_1", workflowScheduledStepInvoke) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index 753a152366b56..bffbd61b48fb5 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -21,6 +21,7 @@ import { } from "../__fixtures__/workflow_event_group_id" import { createScheduled } from "../__fixtures__/workflow_scheduled" import { WorkflowsModuleService } from "@services" +import { asFunction } from "awilix" jest.setTimeout(100000) @@ -367,6 +368,25 @@ moduleIntegrationTestRunner({ ) }) + it("the scheduled workflow should have access to the shared container", async () => { + const sharedContainer = + workflowOrcModule["workflowOrchestratorService_"]["container_"] + + sharedContainer.register( + "test-value", + asFunction(() => "test") + ) + + const spy = await createScheduled("remove-scheduled", { + cron: "* * * * * *", + }) + await jest.runOnlyPendingTimersAsync() + expect(spy).toHaveBeenCalledTimes(1) + expect(spy).toHaveReturnedWith( + expect.objectContaining({ output: { testValue: "test" } }) + ) + }) + it("should fetch an idempotent workflow after its completion", async () => { const { transaction: firstRun } = await workflowOrcModule.run( "workflow_idempotent", diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index 516f27b9cac45..0ea912032ff2f 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -80,13 +80,17 @@ const AnySubscriber = "any" export class WorkflowOrchestratorService { private subscribers: Subscribers = new Map() + private container_: MedusaContainer constructor({ inMemoryDistributedTransactionStorage, + sharedContainer, }: { inMemoryDistributedTransactionStorage: InMemoryDistributedTransactionStorage workflowOrchestratorService: WorkflowOrchestratorService + sharedContainer: MedusaContainer }) { + this.container_ = sharedContainer inMemoryDistributedTransactionStorage.setWorkflowOrchestratorService(this) DistributedTransaction.setStorage(inMemoryDistributedTransactionStorage) WorkflowScheduler.setStorage(inMemoryDistributedTransactionStorage) @@ -136,7 +140,9 @@ export class WorkflowOrchestratorService { ) } - const flow = exportedWorkflow(container as MedusaContainer) + const flow = exportedWorkflow( + (container as MedusaContainer) ?? this.container_ + ) const ret = await flow.run({ input, @@ -191,7 +197,9 @@ export class WorkflowOrchestratorService { throw new Error(`Workflow with id "${workflowId}" not found.`) } - const flow = exportedWorkflow(container as MedusaContainer) + const flow = exportedWorkflow( + (container as MedusaContainer) ?? this.container_ + ) const transaction = await flow.getRunningTransaction(transactionId, context) @@ -227,7 +235,9 @@ export class WorkflowOrchestratorService { throw new Error(`Workflow with id "${workflowId}" not found.`) } - const flow = exportedWorkflow(container as MedusaContainer) + const flow = exportedWorkflow( + (container as MedusaContainer) ?? this.container_ + ) const events = this.buildWorkflowEvents({ customEventHandlers: eventHandlers, @@ -287,7 +297,9 @@ export class WorkflowOrchestratorService { throw new Error(`Workflow with id "${workflowId}" not found.`) } - const flow = exportedWorkflow(container as MedusaContainer) + const flow = exportedWorkflow( + (container as MedusaContainer) ?? this.container_ + ) const events = this.buildWorkflowEvents({ customEventHandlers: eventHandlers, diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts index f17cf7603ef0b..170e0c05c84ef 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts @@ -2,6 +2,7 @@ import { Context, DAL, InternalModuleDeclaration, + MedusaContainer, ModulesSdkTypes, WorkflowsSdkTypes, } from "@medusajs/types" @@ -31,6 +32,7 @@ export class WorkflowsModuleService< protected baseRepository_: DAL.RepositoryService protected workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService protected workflowOrchestratorService_: WorkflowOrchestratorService + protected container_: MedusaContainer constructor( { diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_scheduled.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_scheduled.ts index 770527d94ded7..81213a48f04c9 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_scheduled.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_scheduled.ts @@ -6,8 +6,10 @@ import { } from "@medusajs/workflows-sdk" export const createScheduled = (name: string, schedule?: SchedulerOptions) => { - const workflowScheduledStepInvoke = jest.fn((input, context) => { - return new StepResponse({}) + const workflowScheduledStepInvoke = jest.fn((input, { container }) => { + return new StepResponse({ + testValue: container.resolve("test-value"), + }) }) const step = createStep("step_1", workflowScheduledStepInvoke) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index dddbb8e3067f0..677b58e606a01 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -17,7 +17,7 @@ import { TransactionHandlerType, TransactionStepState, } from "@medusajs/utils" -import { asValue } from "awilix" +import { asFunction, asValue } from "awilix" import { knex } from "knex" import { setTimeout } from "timers/promises" import "../__fixtures__" @@ -54,6 +54,7 @@ describe("Workflow Orchestrator module", function () { query: remoteQuery, modules, sharedContainer, + onApplicationStart, } = await MedusaApp({ sharedContainer: container, sharedResourcesConfig: { @@ -73,6 +74,8 @@ describe("Workflow Orchestrator module", function () { }, }) + await onApplicationStart() + query = remoteQuery sharedContainer_ = sharedContainer! @@ -381,5 +384,21 @@ describe("Workflow Orchestrator module", function () { "Tried to execute a scheduled workflow with ID remove-scheduled that does not exist, removing it from the scheduler." ) }) + + it("the scheduled workflow should have access to the shared container", async () => { + sharedContainer_.register( + "test-value", + asFunction(() => "test") + ) + + const spy = await createScheduled("remove-scheduled", { + cron: "* * * * * *", + }) + await setTimeout(1100) + expect(spy).toHaveBeenCalledTimes(1) + expect(spy).toHaveReturnedWith( + expect.objectContaining({ output: { testValue: "test" } }) + ) + }) }) }) diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index 14e56fe9f3097..c138b9667b938 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -83,6 +83,7 @@ export class WorkflowOrchestratorService { private instanceId = ulid() protected redisPublisher: Redis protected redisSubscriber: Redis + protected container_: MedusaContainer private subscribers: Subscribers = new Map() private activeStepsCount: number = 0 private logger: Logger @@ -95,6 +96,7 @@ export class WorkflowOrchestratorService { redisPublisher, redisSubscriber, logger, + sharedContainer, }: { dataLoaderOnly: boolean redisDistributedTransactionStorage: RedisDistributedTransactionStorage @@ -102,7 +104,9 @@ export class WorkflowOrchestratorService { redisPublisher: Redis redisSubscriber: Redis logger: Logger + sharedContainer: MedusaContainer }) { + this.container_ = sharedContainer this.redisPublisher = redisPublisher this.redisSubscriber = redisSubscriber this.logger = logger @@ -137,6 +141,10 @@ export class WorkflowOrchestratorService { } } + async onApplicationStart() { + await this.redisDistributedTransactionStorage_.onApplicationStart() + } + @InjectSharedContext() async run( workflowIdOrWorkflow: string | ReturnWorkflow, @@ -175,7 +183,9 @@ export class WorkflowOrchestratorService { throw new Error(`Workflow with id "${workflowId}" not found.`) } - const flow = exportedWorkflow(container as MedusaContainer) + const flow = exportedWorkflow( + (container as MedusaContainer) ?? this.container_ + ) const ret = await flow.run({ input, @@ -230,7 +240,9 @@ export class WorkflowOrchestratorService { throw new Error(`Workflow with id "${workflowId}" not found.`) } - const flow = exportedWorkflow(container as MedusaContainer) + const flow = exportedWorkflow( + (container as MedusaContainer) ?? this.container_ + ) const transaction = await flow.getRunningTransaction(transactionId, context) @@ -266,7 +278,9 @@ export class WorkflowOrchestratorService { throw new Error(`Workflow with id "${workflowId}" not found.`) } - const flow = exportedWorkflow(container as MedusaContainer) + const flow = exportedWorkflow( + (container as MedusaContainer) ?? this.container_ + ) const events = this.buildWorkflowEvents({ customEventHandlers: eventHandlers, @@ -326,7 +340,9 @@ export class WorkflowOrchestratorService { throw new Error(`Workflow with id "${workflowId}" not found.`) } - const flow = exportedWorkflow(container as MedusaContainer) + const flow = exportedWorkflow( + (container as MedusaContainer) ?? this.container_ + ) const events = this.buildWorkflowEvents({ customEventHandlers: eventHandlers, diff --git a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts index 86419a61f4958..6ff6287b744bc 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts @@ -60,6 +60,9 @@ export class WorkflowsModuleService< onApplicationPrepareShutdown: async () => { await this.workflowOrchestratorService_.onApplicationPrepareShutdown() }, + onApplicationStart: async () => { + await this.workflowOrchestratorService_.onApplicationStart() + }, } @InjectSharedContext() diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 0a6b208e49243..37803d67f637f 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -29,6 +29,8 @@ export class RedisDistributedTransactionStorage private workflowOrchestratorService_: WorkflowOrchestratorService private redisClient: Redis + private redisWorkerConnection: Redis + private queueName: string private queue: Queue private worker: Worker @@ -47,12 +49,24 @@ export class RedisDistributedTransactionStorage }) { this.workflowExecutionService_ = workflowExecutionService this.logger_ = logger - this.redisClient = redisConnection - + this.redisWorkerConnection = redisWorkerConnection + this.queueName = redisQueueName this.queue = new Queue(redisQueueName, { connection: this.redisClient }) + } + + async onApplicationPrepareShutdown() { + // Close worker gracefully, i.e. wait for the current jobs to finish + await this.worker.close() + } + + async onApplicationShutdown() { + await this.queue.close() + } + + async onApplicationStart() { this.worker = new Worker( - redisQueueName, + this.queueName, async (job) => { const allJobs = [ JobType.RETRY, @@ -75,19 +89,10 @@ export class RedisDistributedTransactionStorage ) } }, - { connection: redisWorkerConnection } + { connection: this.redisWorkerConnection } ) } - async onApplicationPrepareShutdown() { - // Close worker gracefully, i.e. wait for the current jobs to finish - await this.worker.close() - } - - async onApplicationShutdown() { - await this.queue.close() - } - setWorkflowOrchestratorService(workflowOrchestratorService) { this.workflowOrchestratorService_ = workflowOrchestratorService }