Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(orchestration,workflows): pipe oncomplete and workflow preparation #4697

Merged
merged 16 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
35adfa4
Merge branch 'develop' of https://github.com/medusajs/medusa into dev…
carlos-r-l-rodrigues Jul 4, 2023
7948d29
Merge branch 'develop' of https://github.com/medusajs/medusa into dev…
carlos-r-l-rodrigues Jul 12, 2023
547b97d
Merge branch 'develop' of https://github.com/medusajs/medusa into dev…
carlos-r-l-rodrigues Jul 17, 2023
963f549
Merge branch 'develop' of https://github.com/medusajs/medusa into dev…
carlos-r-l-rodrigues Jul 24, 2023
c9aeca0
Merge branch 'develop' of https://github.com/medusajs/medusa into dev…
carlos-r-l-rodrigues Jul 25, 2023
926d87e
Merge branch 'develop' of https://github.com/medusajs/medusa into dev…
carlos-r-l-rodrigues Jul 27, 2023
00faeb2
Merge branch 'develop' of https://github.com/medusajs/medusa into dev…
carlos-r-l-rodrigues Aug 4, 2023
cfdb3bb
chore: pipe onComplete and workflow preparation step
carlos-r-l-rodrigues Aug 4, 2023
42464db
changeset
carlos-r-l-rodrigues Aug 4, 2023
d84a686
Merge branch 'develop' of https://github.com/medusajs/medusa into fea…
carlos-r-l-rodrigues Aug 7, 2023
f432619
Merge branch 'develop' into feat/pipi-oncomplete-wirkflow-preparation
carlos-r-l-rodrigues Aug 7, 2023
3a6e8bc
Merge branch 'develop' into feat/pipi-oncomplete-wirkflow-preparation
adrien2p Aug 8, 2023
dcb724f
Merge branch 'feat/pipi-oncomplete-wirkflow-preparation' of https://g…
carlos-r-l-rodrigues Aug 8, 2023
760e943
fix: tests
carlos-r-l-rodrigues Aug 8, 2023
e0cff41
Merge branch 'develop' into feat/pipi-oncomplete-wirkflow-preparation
carlos-r-l-rodrigues Aug 8, 2023
1651959
Merge branch 'develop' into feat/pipi-oncomplete-wirkflow-preparation
olivermrbl Aug 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/weak-berries-flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@medusajs/orchestration": minor
"@medusajs/workflows": minor
---

Add pipe onComplete callback and preparation function to exportsWorkflow
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
DistributedTransaction,
TransactionHandlerType,
TransactionOrchestrator,
TransactionPayload,
Expand Down Expand Up @@ -858,4 +859,32 @@ describe("Transaction Orchestrator", () => {
expect(mocks.oneCompensate).toBeCalledTimes(1)
expect(mocks.twoCompensate).toBeCalledTimes(1)
})

it("Should receive the current transaction as reference in the handler", async () => {
let transactionInHandler

async function handler(
actionId: string,
functionHandlerType: TransactionHandlerType,
payload: TransactionPayload,
transaction?: DistributedTransaction
) {
transactionInHandler = transaction
}

const strategy = new TransactionOrchestrator("transaction-name", {
next: {
action: "firstMethod",
},
})

const transaction = await strategy.beginTransaction(
"transaction_id_123",
handler
)

await strategy.resume(transaction)

expect(transaction).toBe(transactionInHandler)
})
})
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { TransactionFlow } from "./transaction-orchestrator"
import { TransactionStepHandler } from "./transaction-step"
import { TransactionHandlerType, TransactionState } from "./types"

/**
Expand Down Expand Up @@ -78,11 +79,7 @@ export class DistributedTransaction {

constructor(
private flow: TransactionFlow,
public handler: (
actionId: string,
handlerType: TransactionHandlerType,
payload: TransactionPayload
) => Promise<unknown>,
public handler: TransactionStepHandler,
public payload?: any,
errors?: TransactionStepError[],
context?: TransactionContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ import {
TransactionCheckpoint,
TransactionPayload,
} from "./distributed-transaction"
import { TransactionStep, TransactionStepHandler } from "./transaction-step"
import {
TransactionHandlerType,
TransactionModel,
TransactionState,
TransactionStepStatus,
TransactionStepsDefinition,
} from "./types"
import { TransactionStep, TransactionStepHandler } from "./transaction-step"

import { EventEmitter } from "events"

Expand Down Expand Up @@ -366,7 +365,7 @@ export class TransactionOrchestrator extends EventEmitter {
if (!step.definition.async) {
execution.push(
transaction
.handler(step.definition.action + "", type, payload)
.handler(step.definition.action + "", type, payload, transaction)
.then(async (response) => {
await TransactionOrchestrator.setStepSuccess(
transaction,
Expand All @@ -387,7 +386,7 @@ export class TransactionOrchestrator extends EventEmitter {
execution.push(
transaction.saveCheckpoint().then(async () =>
transaction
.handler(step.definition.action + "", type, payload)
.handler(step.definition.action + "", type, payload, transaction)
.catch(async (error) => {
await TransactionOrchestrator.setStepFailure(
transaction,
Expand Down
14 changes: 9 additions & 5 deletions packages/orchestration/src/transaction/transaction-step.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import { TransactionPayload } from "./distributed-transaction"
import {
TransactionStepsDefinition,
TransactionStepStatus,
TransactionState,
DistributedTransaction,
TransactionPayload,
} from "./distributed-transaction"
import {
TransactionHandlerType,
TransactionState,
TransactionStepStatus,
TransactionStepsDefinition,
} from "./types"

export type TransactionStepHandler = (
actionId: string,
handlerType: TransactionHandlerType,
payload: TransactionPayload
payload: TransactionPayload,
transaction?: DistributedTransaction
) => Promise<unknown>

/**
Expand Down
6 changes: 5 additions & 1 deletion packages/orchestration/src/workflow/workflow-manager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Context, MedusaContainer } from "@medusajs/types"
import {
DistributedTransaction,
OrchestratorBuilder,
TransactionHandlerType,
TransactionMetadata,
Expand Down Expand Up @@ -35,6 +36,7 @@ export type WorkflowStepHandler = (args: {
invoke: { [actions: string]: unknown }
compensate: { [actions: string]: unknown }
metadata: TransactionMetadata
transaction: DistributedTransaction
context?: Context
}) => unknown

Expand Down Expand Up @@ -136,7 +138,8 @@ export class WorkflowManager {
return async (
actionId: string,
handlerType: TransactionHandlerType,
payload?: any
payload?: any,
transaction?: DistributedTransaction
) => {
const command = handlers.get(actionId)

Expand All @@ -157,6 +160,7 @@ export class WorkflowManager {
invoke,
compensate,
metadata,
transaction: transaction as DistributedTransaction,
context,
})
}
Expand Down
2 changes: 1 addition & 1 deletion packages/workflows/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@
"prepare": "cross-env NODE_ENV=production yarn run build",
"build": "rimraf dist && tsc --build",
"watch": "tsc --build --watch",
"test": "jest --passWithNoTests"
"test": "jest"
}
}
40 changes: 40 additions & 0 deletions packages/workflows/src/helper/__tests__/pipe.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,44 @@ describe("Pipe", function () {
expect(result).toBeDefined()
expect(result).toEqual(output)
})

it("should execute onComplete function if available but the output result shouldn't change", async function () {
const payload = { input: "input" }
const output = { test: "test" }
const invoke = {
input: payload,
}

const onComplete = jest.fn(async ({ data }) => {
data.__changed = true

return
})

const handler = jest.fn().mockImplementation(async () => output)
const input = {
inputAlias: "payload",
invoke: [
{
from: "payload",
alias: "input",
},
],
onComplete,
}

const result = await pipe(input, handler)({ invoke, payload } as any)

expect(handler).toHaveBeenCalled()
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
data: {
input: payload,
},
})
)

expect(onComplete).toHaveBeenCalled()
expect(result).toEqual(output)
})
})
64 changes: 64 additions & 0 deletions packages/workflows/src/helper/__tests__/workflow-export.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { LocalWorkflow } from "@medusajs/orchestration"
import { exportWorkflow } from "../workflow-export"

jest.mock("@medusajs/orchestration", () => {
return {
TransactionState: {
FAILED: "failed",
REVERTED: "reverted",
},
LocalWorkflow: jest.fn(() => {
return {
run: jest.fn(() => {
return {
getErrors: jest.fn(),
getState: jest.fn(() => "done"),
getContext: jest.fn(() => {
return {
invoke: { result_step: "invoke_test" },
}
}),
}
}),
}
}),
}
})

describe("Export Workflow", function () {
it("should prepare the input data before initializing the transaction", async function () {
let transformedInput
const prepare = jest.fn().mockImplementation(async (data) => {
data.__transformed = true
transformedInput = data

return data
})

let a: LocalWorkflow

const work = exportWorkflow("id" as any, "result", prepare)

const wfHandler = work()

const input = {
test: "payload",
}

const { result } = await wfHandler.run({
input,
resultFrom: "result_step",
})

expect(input).toEqual({
test: "payload",
})

expect(transformedInput).toEqual({
test: "payload",
__transformed: true,
})

expect(result).toEqual("invoke_test")
})
})
31 changes: 28 additions & 3 deletions packages/workflows/src/helper/pipe.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { Context, MedusaContainer, SharedContext } from "@medusajs/types"
import {
TransactionMetadata,
WorkflowStepHandler,
} from "@medusajs/orchestration"
import { Context, MedusaContainer, SharedContext } from "@medusajs/types"

import { DistributedTransaction } from "@medusajs/orchestration"
import { InputAlias } from "../definitions"

export type WorkflowStepMiddlewareReturn = {
Expand All @@ -20,6 +21,7 @@ interface PipelineInput {
inputAlias?: InputAlias | string
invoke?: WorkflowStepMiddlewareInput | WorkflowStepMiddlewareInput[]
compensate?: WorkflowStepMiddlewareInput | WorkflowStepMiddlewareInput[]
onComplete?: (args: WorkflowOnCompleteArguments) => {}
}

export type WorkflowArguments<T = any> = {
Expand All @@ -30,6 +32,15 @@ export type WorkflowArguments<T = any> = {
context: Context | SharedContext
}

export type WorkflowOnCompleteArguments<T = any> = {
container: MedusaContainer
payload: unknown
data: T
metadata: TransactionMetadata
transaction: DistributedTransaction
context: Context | SharedContext
}

export type PipelineHandler<T extends any = undefined> = (
args: WorkflowArguments
) => Promise<
Expand All @@ -48,6 +59,7 @@ export function pipe<T>(
invoke,
compensate,
metadata,
transaction,
context,
}) => {
let data = {}
Expand All @@ -61,8 +73,9 @@ export function pipe<T>(
Object.assign(original.invoke, { [input.inputAlias]: payload })
}

for (const key in input) {
if (!input[key] || key === "inputAlias") {
const dataKeys = ["invoke", "compensate"]
for (const key of dataKeys) {
if (!input[key]) {
continue
}

Expand Down Expand Up @@ -111,6 +124,18 @@ export function pipe<T>(
finalResult = result
}

if (typeof input.onComplete === "function") {
const dataCopy = JSON.parse(JSON.stringify(data))
await input.onComplete({
container,
payload,
data: dataCopy,
metadata,
transaction,
context: context as Context,
})
}

return finalResult
}
}
Loading