Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Ensure async workflow executions have access to shared container #8157

Merged
merged 2 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/core/medusa-test-utils/src/init-modules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ export async function initModules({
injectedDependencies,
})

await medusaApp.onApplicationStart()

async function shutdown() {
if (shouldDestroyConnectionAutomatically) {
await medusaApp.onApplicationPrepareShutdown()
Expand Down
1 change: 1 addition & 0 deletions packages/core/modules-sdk/src/definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ export const ModulesDefinition: {
isRequired: false,
isQueryable: true,
dependencies: ["logger"],
__passSharedContainer: true,
defaultModuleDeclaration: {
scope: MODULE_SCOPE.INTERNAL,
resources: MODULE_RESOURCE_TYPE.SHARED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ export async function loadInternalModule(
)
}

if (resolution.definition.__passSharedContainer) {
localContainer.register(
"sharedContainer",
asFunction(() => {
return container
})
)
}

const loaders = moduleResources.loaders ?? loadedModule?.loaders ?? []
const error = await runLoaders(loaders, {
container,
Expand Down
13 changes: 8 additions & 5 deletions packages/core/modules-sdk/src/medusa-app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ export type MedusaAppOutput = {
generateMigrations: GenerateMigrations
onApplicationShutdown: () => Promise<void>
onApplicationPrepareShutdown: () => Promise<void>
onApplicationStart: () => Promise<void>
sharedContainer?: MedusaContainer
}

Expand Down Expand Up @@ -284,6 +285,10 @@ async function MedusaApp_({
await promiseAll([MedusaModule.onApplicationPrepareShutdown()])
}

const onApplicationStart = async () => {
await MedusaModule.onApplicationStart()
}

const modules: MedusaModuleConfig =
modulesConfig ??
(
Expand Down Expand Up @@ -349,6 +354,7 @@ async function MedusaApp_({
return {
onApplicationShutdown,
onApplicationPrepareShutdown,
onApplicationStart,
modules: allModules,
link: undefined,
query: async () => {
Expand Down Expand Up @@ -536,6 +542,7 @@ async function MedusaApp_({
return {
onApplicationShutdown,
onApplicationPrepareShutdown,
onApplicationStart,
modules: allModules,
link: remoteLink,
query,
Expand All @@ -551,11 +558,7 @@ async function MedusaApp_({
export async function MedusaApp(
options: MedusaAppOptions = {}
): Promise<MedusaAppOutput> {
try {
return await MedusaApp_(options)
} finally {
MedusaModule.onApplicationStart(options.onApplicationStartCb)
}
return await MedusaApp_(options)
}

export async function MedusaAppMigrateUp(
Expand Down
2 changes: 2 additions & 0 deletions packages/core/types/src/modules-sdk/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ export type ModuleDefinition = {
isRequired?: boolean
isQueryable?: boolean // If the module is queryable via Remote Joiner
dependencies?: string[]
/** @internal only used in exceptional cases - relying on the shared contrainer breaks encapsulation */
__passSharedContainer?: boolean
defaultModuleDeclaration:
| InternalModuleDeclaration
| ExternalModuleDeclaration
Expand Down
16 changes: 10 additions & 6 deletions packages/medusa/src/loaders/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,21 +153,25 @@ export default async ({

const plugins = getResolvedPlugins(rootDirectory, configModule, true) || []
const pluginLinks = await resolvePluginsLinks(plugins, container)
await registerWorkflows(plugins)

const { onApplicationShutdown, onApplicationPrepareShutdown } =
await loadMedusaApp({
container,
linkModules: pluginLinks,
})
const {
onApplicationStart,
onApplicationShutdown,
onApplicationPrepareShutdown,
} = await loadMedusaApp({
container,
linkModules: pluginLinks,
})

await registerWorkflows(plugins)
sradevski marked this conversation as resolved.
Show resolved Hide resolved
const entrypointsShutdown = await loadEntrypoints(
plugins,
container,
expressApp,
rootDirectory
)
await createDefaultsWorkflow(container).run()
await onApplicationStart()

const shutdown = async () => {
const pgConnection = container.resolve(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import {
} from "@medusajs/workflows-sdk"

export const createScheduled = (name: string, schedule?: SchedulerOptions) => {
const workflowScheduledStepInvoke = jest.fn((input, context) => {
return new StepResponse({})
const workflowScheduledStepInvoke = jest.fn((input, { container }) => {
return new StepResponse({
testValue: container.resolve("test-value"),
})
})

const step = createStep("step_1", workflowScheduledStepInvoke)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
} from "../__fixtures__/workflow_event_group_id"
import { createScheduled } from "../__fixtures__/workflow_scheduled"
import { WorkflowsModuleService } from "@services"
import { asFunction } from "awilix"

jest.setTimeout(100000)

Expand Down Expand Up @@ -367,6 +368,25 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
)
})

it("the scheduled workflow should have access to the shared container", async () => {
const sharedContainer =
workflowOrcModule["workflowOrchestratorService_"]["container_"]

sharedContainer.register(
"test-value",
asFunction(() => "test")
)

const spy = await createScheduled("remove-scheduled", {
cron: "* * * * * *",
})
await jest.runOnlyPendingTimersAsync()
expect(spy).toHaveBeenCalledTimes(1)
expect(spy).toHaveReturnedWith(
expect.objectContaining({ output: { testValue: "test" } })
)
})

it("should fetch an idempotent workflow after its completion", async () => {
const { transaction: firstRun } = await workflowOrcModule.run(
"workflow_idempotent",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,17 @@ const AnySubscriber = "any"

export class WorkflowOrchestratorService {
private subscribers: Subscribers = new Map()
private container_: MedusaContainer

constructor({
inMemoryDistributedTransactionStorage,
sharedContainer,
}: {
inMemoryDistributedTransactionStorage: InMemoryDistributedTransactionStorage
workflowOrchestratorService: WorkflowOrchestratorService
sharedContainer: MedusaContainer
}) {
this.container_ = sharedContainer
inMemoryDistributedTransactionStorage.setWorkflowOrchestratorService(this)
DistributedTransaction.setStorage(inMemoryDistributedTransactionStorage)
WorkflowScheduler.setStorage(inMemoryDistributedTransactionStorage)
Expand Down Expand Up @@ -136,7 +140,9 @@ export class WorkflowOrchestratorService {
)
}

const flow = exportedWorkflow(container as MedusaContainer)
const flow = exportedWorkflow(
(container as MedusaContainer) ?? this.container_
)

const ret = await flow.run({
input,
Expand Down Expand Up @@ -191,7 +197,9 @@ export class WorkflowOrchestratorService {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}

const flow = exportedWorkflow(container as MedusaContainer)
const flow = exportedWorkflow(
(container as MedusaContainer) ?? this.container_
)

const transaction = await flow.getRunningTransaction(transactionId, context)

Expand Down Expand Up @@ -227,7 +235,9 @@ export class WorkflowOrchestratorService {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}

const flow = exportedWorkflow(container as MedusaContainer)
const flow = exportedWorkflow(
(container as MedusaContainer) ?? this.container_
)

const events = this.buildWorkflowEvents({
customEventHandlers: eventHandlers,
Expand Down Expand Up @@ -287,7 +297,9 @@ export class WorkflowOrchestratorService {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}

const flow = exportedWorkflow(container as MedusaContainer)
const flow = exportedWorkflow(
(container as MedusaContainer) ?? this.container_
)

const events = this.buildWorkflowEvents({
customEventHandlers: eventHandlers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
Context,
DAL,
InternalModuleDeclaration,
MedusaContainer,
ModulesSdkTypes,
WorkflowsSdkTypes,
} from "@medusajs/types"
Expand Down Expand Up @@ -31,6 +32,7 @@ export class WorkflowsModuleService<
protected baseRepository_: DAL.RepositoryService
protected workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService<TWorkflowExecution>
protected workflowOrchestratorService_: WorkflowOrchestratorService
protected container_: MedusaContainer

constructor(
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import {
} from "@medusajs/workflows-sdk"

export const createScheduled = (name: string, schedule?: SchedulerOptions) => {
const workflowScheduledStepInvoke = jest.fn((input, context) => {
return new StepResponse({})
const workflowScheduledStepInvoke = jest.fn((input, { container }) => {
return new StepResponse({
testValue: container.resolve("test-value"),
})
})

const step = createStep("step_1", workflowScheduledStepInvoke)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
TransactionHandlerType,
TransactionStepState,
} from "@medusajs/utils"
import { asValue } from "awilix"
import { asFunction, asValue } from "awilix"
import { knex } from "knex"
import { setTimeout } from "timers/promises"
import "../__fixtures__"
Expand Down Expand Up @@ -54,6 +54,7 @@ describe("Workflow Orchestrator module", function () {
query: remoteQuery,
modules,
sharedContainer,
onApplicationStart,
} = await MedusaApp({
sharedContainer: container,
sharedResourcesConfig: {
Expand All @@ -73,6 +74,8 @@ describe("Workflow Orchestrator module", function () {
},
})

await onApplicationStart()

query = remoteQuery
sharedContainer_ = sharedContainer!

Expand Down Expand Up @@ -381,5 +384,21 @@ describe("Workflow Orchestrator module", function () {
"Tried to execute a scheduled workflow with ID remove-scheduled that does not exist, removing it from the scheduler."
)
})

it("the scheduled workflow should have access to the shared container", async () => {
sharedContainer_.register(
"test-value",
asFunction(() => "test")
)

const spy = await createScheduled("remove-scheduled", {
cron: "* * * * * *",
})
await setTimeout(1100)
expect(spy).toHaveBeenCalledTimes(1)
expect(spy).toHaveReturnedWith(
expect.objectContaining({ output: { testValue: "test" } })
)
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export class WorkflowOrchestratorService {
private instanceId = ulid()
protected redisPublisher: Redis
protected redisSubscriber: Redis
protected container_: MedusaContainer
private subscribers: Subscribers = new Map()
private activeStepsCount: number = 0
private logger: Logger
Expand All @@ -95,14 +96,17 @@ export class WorkflowOrchestratorService {
redisPublisher,
redisSubscriber,
logger,
sharedContainer,
}: {
dataLoaderOnly: boolean
redisDistributedTransactionStorage: RedisDistributedTransactionStorage
workflowOrchestratorService: WorkflowOrchestratorService
redisPublisher: Redis
redisSubscriber: Redis
logger: Logger
sharedContainer: MedusaContainer
}) {
this.container_ = sharedContainer
this.redisPublisher = redisPublisher
this.redisSubscriber = redisSubscriber
this.logger = logger
Expand Down Expand Up @@ -137,6 +141,10 @@ export class WorkflowOrchestratorService {
}
}

async onApplicationStart() {
await this.redisDistributedTransactionStorage_.onApplicationStart()
}

@InjectSharedContext()
async run<T = unknown>(
workflowIdOrWorkflow: string | ReturnWorkflow<any, any, any>,
Expand Down Expand Up @@ -175,7 +183,9 @@ export class WorkflowOrchestratorService {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}

const flow = exportedWorkflow(container as MedusaContainer)
const flow = exportedWorkflow(
(container as MedusaContainer) ?? this.container_
)

const ret = await flow.run({
input,
Expand Down Expand Up @@ -230,7 +240,9 @@ export class WorkflowOrchestratorService {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}

const flow = exportedWorkflow(container as MedusaContainer)
const flow = exportedWorkflow(
(container as MedusaContainer) ?? this.container_
)

const transaction = await flow.getRunningTransaction(transactionId, context)

Expand Down Expand Up @@ -266,7 +278,9 @@ export class WorkflowOrchestratorService {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}

const flow = exportedWorkflow(container as MedusaContainer)
const flow = exportedWorkflow(
(container as MedusaContainer) ?? this.container_
)

const events = this.buildWorkflowEvents({
customEventHandlers: eventHandlers,
Expand Down Expand Up @@ -326,7 +340,9 @@ export class WorkflowOrchestratorService {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}

const flow = exportedWorkflow(container as MedusaContainer)
const flow = exportedWorkflow(
(container as MedusaContainer) ?? this.container_
)

const events = this.buildWorkflowEvents({
customEventHandlers: eventHandlers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ export class WorkflowsModuleService<
onApplicationPrepareShutdown: async () => {
await this.workflowOrchestratorService_.onApplicationPrepareShutdown()
},
onApplicationStart: async () => {
await this.workflowOrchestratorService_.onApplicationStart()
},
}

@InjectSharedContext()
Expand Down
Loading
Loading