Skip to content

Commit

Permalink
feat(orchestration,workflows): pipe oncomplete and workflow preparati…
Browse files Browse the repository at this point in the history
…on (#4697)

* chore: pipe onComplete and workflow preparation step

* changeset

* fix: tests

---------

Co-authored-by: Adrien de Peretti <[email protected]>
Co-authored-by: Oli Juhl <[email protected]>
  • Loading branch information
3 people authored Aug 8, 2023
1 parent d1e298f commit c0ca002
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 20 deletions.
6 changes: 6 additions & 0 deletions .changeset/weak-berries-flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@medusajs/orchestration": minor
"@medusajs/workflows": minor
---

Add pipe onComplete callback and preparation function to exportsWorkflow
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
DistributedTransaction,
TransactionHandlerType,
TransactionOrchestrator,
TransactionPayload,
Expand Down Expand Up @@ -858,4 +859,32 @@ describe("Transaction Orchestrator", () => {
expect(mocks.oneCompensate).toBeCalledTimes(1)
expect(mocks.twoCompensate).toBeCalledTimes(1)
})

it("Should receive the current transaction as reference in the handler", async () => {
let transactionInHandler

async function handler(
actionId: string,
functionHandlerType: TransactionHandlerType,
payload: TransactionPayload,
transaction?: DistributedTransaction
) {
transactionInHandler = transaction
}

const strategy = new TransactionOrchestrator("transaction-name", {
next: {
action: "firstMethod",
},
})

const transaction = await strategy.beginTransaction(
"transaction_id_123",
handler
)

await strategy.resume(transaction)

expect(transaction).toBe(transactionInHandler)
})
})
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { isDefined } from "@medusajs/utils"
import { TransactionFlow } from "./transaction-orchestrator"
import { TransactionStepHandler } from "./transaction-step"
import { TransactionHandlerType, TransactionState } from "./types"

/**
Expand Down Expand Up @@ -79,11 +80,7 @@ export class DistributedTransaction {

constructor(
private flow: TransactionFlow,
public handler: (
actionId: string,
handlerType: TransactionHandlerType,
payload: TransactionPayload
) => Promise<unknown>,
public handler: TransactionStepHandler,
public payload?: any,
errors?: TransactionStepError[],
context?: TransactionContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ import {
TransactionCheckpoint,
TransactionPayload,
} from "./distributed-transaction"
import { TransactionStep, TransactionStepHandler } from "./transaction-step"
import {
TransactionHandlerType,
TransactionModel,
TransactionState,
TransactionStepStatus,
TransactionStepsDefinition,
} from "./types"
import { TransactionStep, TransactionStepHandler } from "./transaction-step"

import { EventEmitter } from "events"

Expand Down Expand Up @@ -366,7 +365,7 @@ export class TransactionOrchestrator extends EventEmitter {
if (!step.definition.async) {
execution.push(
transaction
.handler(step.definition.action + "", type, payload)
.handler(step.definition.action + "", type, payload, transaction)
.then(async (response) => {
await TransactionOrchestrator.setStepSuccess(
transaction,
Expand All @@ -387,7 +386,7 @@ export class TransactionOrchestrator extends EventEmitter {
execution.push(
transaction.saveCheckpoint().then(async () =>
transaction
.handler(step.definition.action + "", type, payload)
.handler(step.definition.action + "", type, payload, transaction)
.catch(async (error) => {
await TransactionOrchestrator.setStepFailure(
transaction,
Expand Down
14 changes: 9 additions & 5 deletions packages/orchestration/src/transaction/transaction-step.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import { TransactionPayload } from "./distributed-transaction"
import {
TransactionStepsDefinition,
TransactionStepStatus,
TransactionState,
DistributedTransaction,
TransactionPayload,
} from "./distributed-transaction"
import {
TransactionHandlerType,
TransactionState,
TransactionStepStatus,
TransactionStepsDefinition,
} from "./types"

export type TransactionStepHandler = (
actionId: string,
handlerType: TransactionHandlerType,
payload: TransactionPayload
payload: TransactionPayload,
transaction?: DistributedTransaction
) => Promise<unknown>

/**
Expand Down
6 changes: 5 additions & 1 deletion packages/orchestration/src/workflow/workflow-manager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Context, MedusaContainer } from "@medusajs/types"
import {
DistributedTransaction,
OrchestratorBuilder,
TransactionHandlerType,
TransactionMetadata,
Expand Down Expand Up @@ -35,6 +36,7 @@ export type WorkflowStepHandler = (args: {
invoke: { [actions: string]: unknown }
compensate: { [actions: string]: unknown }
metadata: TransactionMetadata
transaction: DistributedTransaction
context?: Context
}) => unknown

Expand Down Expand Up @@ -136,7 +138,8 @@ export class WorkflowManager {
return async (
actionId: string,
handlerType: TransactionHandlerType,
payload?: any
payload?: any,
transaction?: DistributedTransaction
) => {
const command = handlers.get(actionId)

Expand All @@ -157,6 +160,7 @@ export class WorkflowManager {
invoke,
compensate,
metadata,
transaction: transaction as DistributedTransaction,
context,
})
}
Expand Down
2 changes: 1 addition & 1 deletion packages/workflows/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@
"prepare": "cross-env NODE_ENV=production yarn run build",
"build": "rimraf dist && tsc --build",
"watch": "tsc --build --watch",
"test": "jest --passWithNoTests"
"test": "jest"
}
}
40 changes: 40 additions & 0 deletions packages/workflows/src/helper/__tests__/pipe.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,44 @@ describe("Pipe", function () {
expect(result).toBeDefined()
expect(result).toEqual(output)
})

it("should execute onComplete function if available but the output result shouldn't change", async function () {
const payload = { input: "input" }
const output = { test: "test" }
const invoke = {
input: payload,
}

const onComplete = jest.fn(async ({ data }) => {
data.__changed = true

return
})

const handler = jest.fn().mockImplementation(async () => output)
const input = {
inputAlias: "payload",
invoke: [
{
from: "payload",
alias: "input",
},
],
onComplete,
}

const result = await pipe(input, handler)({ invoke, payload } as any)

expect(handler).toHaveBeenCalled()
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
data: {
input: payload,
},
})
)

expect(onComplete).toHaveBeenCalled()
expect(result).toEqual(output)
})
})
64 changes: 64 additions & 0 deletions packages/workflows/src/helper/__tests__/workflow-export.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { exportWorkflow } from "../workflow-export"

jest.mock("@medusajs/orchestration", () => {
return {
TransactionHandlerType: {
INVOKE: "invoke",
COMPENSATE: "compensate",
},
TransactionState: {
FAILED: "failed",
REVERTED: "reverted",
},
LocalWorkflow: jest.fn(() => {
return {
run: jest.fn(() => {
return {
getErrors: jest.fn(),
getState: jest.fn(() => "done"),
getContext: jest.fn(() => {
return {
invoke: { result_step: "invoke_test" },
}
}),
}
}),
}
}),
}
})

describe("Export Workflow", function () {
it("should prepare the input data before initializing the transaction", async function () {
let transformedInput
const prepare = jest.fn().mockImplementation(async (data) => {
data.__transformed = true
transformedInput = data

return data
})

const work = exportWorkflow("id" as any, "result_step", prepare)

const wfHandler = work()

const input = {
test: "payload",
}

const { result } = await wfHandler.run({
input,
})

expect(input).toEqual({
test: "payload",
})

expect(transformedInput).toEqual({
test: "payload",
__transformed: true,
})

expect(result).toEqual("invoke_test")
})
})
31 changes: 28 additions & 3 deletions packages/workflows/src/helper/pipe.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { Context, MedusaContainer, SharedContext } from "@medusajs/types"
import {
TransactionMetadata,
WorkflowStepHandler,
} from "@medusajs/orchestration"
import { Context, MedusaContainer, SharedContext } from "@medusajs/types"

import { DistributedTransaction } from "@medusajs/orchestration"
import { InputAlias } from "../definitions"

export type WorkflowStepMiddlewareReturn = {
Expand All @@ -20,6 +21,7 @@ interface PipelineInput {
inputAlias?: InputAlias | string
invoke?: WorkflowStepMiddlewareInput | WorkflowStepMiddlewareInput[]
compensate?: WorkflowStepMiddlewareInput | WorkflowStepMiddlewareInput[]
onComplete?: (args: WorkflowOnCompleteArguments) => {}
}

export type WorkflowArguments<T = any> = {
Expand All @@ -30,6 +32,15 @@ export type WorkflowArguments<T = any> = {
context: Context | SharedContext
}

export type WorkflowOnCompleteArguments<T = any> = {
container: MedusaContainer
payload: unknown
data: T
metadata: TransactionMetadata
transaction: DistributedTransaction
context: Context | SharedContext
}

export type PipelineHandler<T extends any = undefined> = (
args: WorkflowArguments
) => Promise<
Expand All @@ -48,6 +59,7 @@ export function pipe<T>(
invoke,
compensate,
metadata,
transaction,
context,
}) => {
let data = {}
Expand All @@ -61,8 +73,9 @@ export function pipe<T>(
Object.assign(original.invoke, { [input.inputAlias]: payload })
}

for (const key in input) {
if (!input[key] || key === "inputAlias") {
const dataKeys = ["invoke", "compensate"]
for (const key of dataKeys) {
if (!input[key]) {
continue
}

Expand Down Expand Up @@ -111,6 +124,18 @@ export function pipe<T>(
finalResult = result
}

if (typeof input.onComplete === "function") {
const dataCopy = JSON.parse(JSON.stringify(data))
await input.onComplete({
container,
payload,
data: dataCopy,
metadata,
transaction,
context: context as Context,
})
}

return finalResult
}
}
19 changes: 18 additions & 1 deletion packages/workflows/src/helper/workflow-export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ export type WorkflowResult<TResult = unknown> = {

export const exportWorkflow = <TData = unknown, TResult = unknown>(
workflowId: Workflows,
defaultResult?: string
defaultResult?: string,
dataPreparation?: (data: TData) => Promise<unknown>
) => {
return function <TDataOverride = undefined, TResultOverride = undefined>(
container?: LoadedModule[] | MedusaContainer
Expand Down Expand Up @@ -60,6 +61,22 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
resultFrom ??= defaultResult
throwOnError ??= true

if (typeof dataPreparation === "function") {
try {
const copyInput = JSON.parse(JSON.stringify(input))
input = await dataPreparation(copyInput as TData)
} catch (err) {
if (throwOnError) {
throw new Error(
`Data preparation failed: ${err.message}${EOL}${err.stack}`
)
}
return {
errors: [err],
}
}
}

const transaction = await originalRun(
context?.transactionId ?? ulid(),
input,
Expand Down

0 comments on commit c0ca002

Please sign in to comment.