From 9f9db396987776039ad0c2b8d5792d0ebdbf8792 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" <37986729+carlos-r-l-rodrigues@users.noreply.github.com> Date: Wed, 22 Nov 2023 17:23:39 +0000 Subject: [PATCH] feat(workflows): Workflow DX (#5607) --- .changeset/wet-crews-sneeze.md | 8 + .eslintignore | 2 + .eslintrc.js | 4 +- .../workflows/utils/composer/compose.ts | 1845 +++++++++++++++++ integration-tests/plugins/package.json | 2 +- packages/link-modules/src/index.ts | 1 + packages/medusa/src/loaders/plugins.ts | 10 + .../src/workflow/workflow-manager.ts | 17 +- .../workflows/src/helper/workflow-export.ts | 15 +- packages/workflows/src/index.ts | 2 + .../src/utils/composer/create-step.ts | 242 +++ .../src/utils/composer/create-workflow.ts | 182 ++ .../src/utils/composer/helpers/index.ts | 3 + .../src/utils/composer/helpers/proxy.ts | 28 + .../utils/composer/helpers/resolve-value.ts | 71 + .../utils/composer/helpers/step-response.ts | 32 + .../src/utils/composer/helpers/symbol.ts | 12 + packages/workflows/src/utils/composer/hook.ts | 43 + .../workflows/src/utils/composer/index.ts | 9 + .../src/utils/composer/parallelize.ts | 58 + .../workflows/src/utils/composer/transform.ts | 133 ++ packages/workflows/src/utils/composer/type.ts | 65 + 22 files changed, 2768 insertions(+), 16 deletions(-) create mode 100644 .changeset/wet-crews-sneeze.md create mode 100644 integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts create mode 100644 packages/workflows/src/utils/composer/create-step.ts create mode 100644 packages/workflows/src/utils/composer/create-workflow.ts create mode 100644 packages/workflows/src/utils/composer/helpers/index.ts create mode 100644 packages/workflows/src/utils/composer/helpers/proxy.ts create mode 100644 packages/workflows/src/utils/composer/helpers/resolve-value.ts create mode 100644 packages/workflows/src/utils/composer/helpers/step-response.ts create mode 100644 packages/workflows/src/utils/composer/helpers/symbol.ts create mode 100644 packages/workflows/src/utils/composer/hook.ts create mode 100644 packages/workflows/src/utils/composer/index.ts create mode 100644 packages/workflows/src/utils/composer/parallelize.ts create mode 100644 packages/workflows/src/utils/composer/transform.ts create mode 100644 packages/workflows/src/utils/composer/type.ts diff --git a/.changeset/wet-crews-sneeze.md b/.changeset/wet-crews-sneeze.md new file mode 100644 index 0000000000000..fb5151f9131f7 --- /dev/null +++ b/.changeset/wet-crews-sneeze.md @@ -0,0 +1,8 @@ +--- +"@medusajs/orchestration": minor +"@medusajs/workflows": minor +"@medusajs/link-modules": patch +"@medusajs/medusa": patch +--- + +Workflows composer api diff --git a/.eslintignore b/.eslintignore index 0469b3263f8bf..dbe62ec0c90a2 100644 --- a/.eslintignore +++ b/.eslintignore @@ -21,6 +21,8 @@ packages/* !packages/cache-inmemory !packages/create-medusa-app !packages/product +!packages/orchestration +!packages/workflows **/models/* diff --git a/.eslintrc.js b/.eslintrc.js index 1f757251da28a..83745e590b6bb 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -90,14 +90,14 @@ module.exports = { "./packages/event-bus-redis/tsconfig.spec.json", "./packages/medusa-plugin-meilisearch/tsconfig.spec.json", "./packages/medusa-plugin-algolia/tsconfig.spec.json", - "./packages/admin-ui/tsconfig.json", "./packages/inventory/tsconfig.spec.json", "./packages/stock-location/tsconfig.spec.json", "./packages/cache-redis/tsconfig.spec.json", "./packages/cache-inmemory/tsconfig.spec.json", - "./packages/admin-ui/tsconfig.json", "./packages/create-medusa-app/tsconfig.json", "./packages/product/tsconfig.json", + "./packages/orchestration/tsconfig.json", + "./packages/workflows/tsconfig.spec.json", ], }, rules: { diff --git a/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts b/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts new file mode 100644 index 0000000000000..c1173e8e8687d --- /dev/null +++ b/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts @@ -0,0 +1,1845 @@ +import { promiseAll } from "@medusajs/utils" +import { + createStep, + createWorkflow, + hook, + parallelize, + StepResponse, + transform, +} from "@medusajs/workflows" + +jest.setTimeout(30000) + +describe("Workflow composer", function () { + describe("Using steps returning plain values", function () { + afterEach(async () => { + jest.clearAllMocks() + }) + + it("should compose a new workflow and execute it", async () => { + const mockStep1Fn = jest.fn().mockImplementation((input) => { + return { inputs: [input], obj: "return from 1" } + }) + const mockStep2Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return { + inputs, + obj: "return from 2", + } + }) + const mockStep3Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return { + inputs, + obj: "return from 3", + } + }) + + const step1 = createStep("step1", mockStep1Fn) + const step2 = createStep("step2", mockStep2Fn) + const step3 = createStep("step3", mockStep3Fn) + + const workflow = createWorkflow("workflow1", function (input) { + const returnStep1 = step1(input) + const ret2 = step2(returnStep1) + return step3({ one: returnStep1, two: ret2 }) + }) + + const workflowInput = { test: "payload1" } + const { result: workflowResult } = await workflow().run({ + input: workflowInput, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(1) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + + expect(mockStep2Fn).toHaveBeenCalledTimes(1) + expect(mockStep2Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep2Fn.mock.calls[0][0]).toEqual({ + inputs: [workflowInput], + obj: "return from 1", + }) + + expect(mockStep3Fn).toHaveBeenCalledTimes(1) + expect(mockStep3Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep3Fn.mock.calls[0][0]).toEqual({ + one: { + inputs: [workflowInput], + obj: "return from 1", + }, + two: { + inputs: [ + { + inputs: [workflowInput], + obj: "return from 1", + }, + ], + obj: "return from 2", + }, + }) + + expect(workflowResult).toEqual({ + inputs: [ + { + one: { + inputs: [workflowInput], + obj: "return from 1", + }, + two: { + inputs: [ + { + inputs: [workflowInput], + obj: "return from 1", + }, + ], + obj: "return from 2", + }, + }, + ], + obj: "return from 3", + }) + }) + + it("should compose two new workflows sequentially and execute them sequentially", async () => { + const mockStep1Fn = jest.fn().mockImplementation((input, context) => { + return { inputs: [input], obj: "return from 1" } + }) + const mockStep2Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return { + inputs, + obj: "return from 2", + } + }) + const mockStep3Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return { + inputs, + obj: "return from 3", + } + }) + + const step1 = createStep("step1", mockStep1Fn) + const step2 = createStep("step2", mockStep2Fn) + const step3 = createStep("step3", mockStep3Fn) + + const workflow = createWorkflow("workflow1", function (input) { + const returnStep1 = step1(input) + const ret2 = step2(returnStep1) + return step3({ one: returnStep1, two: ret2 }) + }) + + const workflow2 = createWorkflow("workflow2", function (input) { + const returnStep1 = step1(input) + const ret2 = step2(returnStep1) + return step3({ one: returnStep1, two: ret2 }) + }) + + const workflowInput = { test: "payload1" } + const { result: workflowResult } = await workflow().run({ + input: workflowInput, + }) + + const workflow2Input = { test: "payload2" } + const { result: workflow2Result } = await workflow2().run({ + input: workflow2Input, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(2) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + expect(mockStep1Fn.mock.calls[1][0]).toEqual(workflow2Input) + + expect(mockStep2Fn).toHaveBeenCalledTimes(2) + expect(mockStep2Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep2Fn.mock.calls[0][0]).toEqual({ + inputs: [{ test: "payload1" }], + obj: "return from 1", + }) + expect(mockStep2Fn.mock.calls[1][0]).toEqual({ + inputs: [{ test: "payload2" }], + obj: "return from 1", + }) + + expect(mockStep3Fn).toHaveBeenCalledTimes(2) + expect(mockStep3Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep3Fn.mock.calls[0][0]).toEqual({ + one: { inputs: [{ test: "payload1" }], obj: "return from 1" }, + two: { + inputs: [{ inputs: [{ test: "payload1" }], obj: "return from 1" }], + obj: "return from 2", + }, + }) + expect(mockStep3Fn.mock.calls[1][0]).toEqual({ + one: { inputs: [{ test: "payload2" }], obj: "return from 1" }, + two: { + inputs: [{ inputs: [{ test: "payload2" }], obj: "return from 1" }], + obj: "return from 2", + }, + }) + + expect(workflowResult).toEqual({ + inputs: [ + { + one: { inputs: [{ test: "payload1" }], obj: "return from 1" }, + two: { + inputs: [ + { inputs: [{ test: "payload1" }], obj: "return from 1" }, + ], + obj: "return from 2", + }, + }, + ], + obj: "return from 3", + }) + expect(workflow2Result).toEqual({ + inputs: [ + { + one: { inputs: [{ test: "payload2" }], obj: "return from 1" }, + two: { + inputs: [ + { inputs: [{ test: "payload2" }], obj: "return from 1" }, + ], + obj: "return from 2", + }, + }, + ], + obj: "return from 3", + }) + }) + + it("should compose two new workflows concurrently and execute them sequentially", async () => { + const mockStep1Fn = jest.fn().mockImplementation((input, context) => { + return { inputs: [input], obj: "return from 1" } + }) + const mockStep2Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return { + inputs, + obj: "return from 2", + } + }) + const mockStep3Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return { + inputs, + obj: "return from 3", + } + }) + + const step1 = createStep("step1", mockStep1Fn) + const step2 = createStep("step2", mockStep2Fn) + const step3 = createStep("step3", mockStep3Fn) + + const [workflow, workflow2] = await promiseAll([ + createWorkflow("workflow1", function (input) { + const returnStep1 = step1(input) + const ret2 = step2(returnStep1) + return step3({ one: returnStep1, two: ret2 }) + }), + + createWorkflow("workflow2", function (input) { + const returnStep1 = step1(input) + const ret2 = step2(returnStep1) + return step3({ one: returnStep1, two: ret2 }) + }), + ]) + + const workflowInput = { test: "payload1" } + const { result: workflowResult } = await workflow().run({ + input: workflowInput, + }) + + const workflow2Input = { test: "payload2" } + const { result: workflow2Result } = await workflow2().run({ + input: workflow2Input, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(2) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + expect(mockStep1Fn.mock.calls[1][0]).toEqual(workflow2Input) + + expect(mockStep2Fn).toHaveBeenCalledTimes(2) + expect(mockStep2Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep2Fn.mock.calls[0][0]).toEqual({ + inputs: [workflowInput], + obj: "return from 1", + }) + expect(mockStep2Fn.mock.calls[1][0]).toEqual({ + inputs: [workflow2Input], + obj: "return from 1", + }) + + expect(mockStep3Fn).toHaveBeenCalledTimes(2) + expect(mockStep3Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep3Fn.mock.calls[0][0]).toEqual({ + one: { inputs: [{ test: "payload1" }], obj: "return from 1" }, + two: { + inputs: [{ inputs: [{ test: "payload1" }], obj: "return from 1" }], + obj: "return from 2", + }, + }) + expect(mockStep3Fn.mock.calls[1][0]).toEqual({ + one: { inputs: [{ test: "payload2" }], obj: "return from 1" }, + two: { + inputs: [{ inputs: [{ test: "payload2" }], obj: "return from 1" }], + obj: "return from 2", + }, + }) + + expect(workflowResult).toEqual({ + inputs: [ + { + one: { inputs: [{ test: "payload1" }], obj: "return from 1" }, + two: { + inputs: [ + { inputs: [{ test: "payload1" }], obj: "return from 1" }, + ], + obj: "return from 2", + }, + }, + ], + obj: "return from 3", + }) + expect(workflow2Result).toEqual({ + inputs: [ + { + one: { inputs: [{ test: "payload2" }], obj: "return from 1" }, + two: { + inputs: [ + { inputs: [{ test: "payload2" }], obj: "return from 1" }, + ], + obj: "return from 2", + }, + }, + ], + obj: "return from 3", + }) + }) + + it("should compose two new workflows concurrently and execute them concurrently", async () => { + const mockStep1Fn = jest.fn().mockImplementation((input, context) => { + return { inputs: [input], obj: "return from 1" } + }) + const mockStep2Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return { + inputs, + obj: "return from 2", + } + }) + const mockStep3Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return { + inputs, + obj: "return from 3", + } + }) + + const step1 = createStep("step1", mockStep1Fn) + const step2 = createStep("step2", mockStep2Fn) + const step3 = createStep("step3", mockStep3Fn) + + const [workflow, workflow2] = await promiseAll([ + createWorkflow("workflow1", function (input) { + const returnStep1 = step1(input) + const ret2 = step2(returnStep1) + return step3({ one: returnStep1, two: ret2 }) + }), + + createWorkflow("workflow2", function (input) { + const returnStep1 = step1(input) + const ret2 = step2(returnStep1) + return step3({ one: returnStep1, two: ret2 }) + }), + ]) + + const workflowInput = { test: "payload1" } + const workflow2Input = { test: "payload2" } + + const [{ result: workflowResult }, { result: workflow2Result }] = + await promiseAll([ + workflow().run({ + input: workflowInput, + }), + workflow2().run({ + input: workflow2Input, + }), + ]) + + expect(mockStep1Fn).toHaveBeenCalledTimes(2) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + expect(mockStep1Fn.mock.calls[1][0]).toEqual(workflow2Input) + expect(mockStep1Fn.mock.calls[1]).toHaveLength(2) + + expect(mockStep2Fn).toHaveBeenCalledTimes(2) + expect(mockStep2Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep2Fn.mock.calls[0][0]).toEqual({ + inputs: [workflowInput], + obj: "return from 1", + }) + expect(mockStep2Fn.mock.calls[1][0]).toEqual({ + inputs: [workflow2Input], + obj: "return from 1", + }) + + expect(mockStep3Fn).toHaveBeenCalledTimes(2) + expect(mockStep3Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep3Fn.mock.calls[0][0]).toEqual({ + one: { inputs: [{ test: "payload1" }], obj: "return from 1" }, + two: { + inputs: [{ inputs: [{ test: "payload1" }], obj: "return from 1" }], + obj: "return from 2", + }, + }) + expect(mockStep3Fn.mock.calls[1][0]).toEqual({ + one: { inputs: [{ test: "payload2" }], obj: "return from 1" }, + two: { + inputs: [{ inputs: [{ test: "payload2" }], obj: "return from 1" }], + obj: "return from 2", + }, + }) + + expect(workflowResult).toEqual({ + inputs: [ + { + one: { inputs: [{ test: "payload1" }], obj: "return from 1" }, + two: { + inputs: [ + { inputs: [{ test: "payload1" }], obj: "return from 1" }, + ], + obj: "return from 2", + }, + }, + ], + obj: "return from 3", + }) + expect(workflow2Result).toEqual({ + inputs: [ + { + one: { inputs: [{ test: "payload2" }], obj: "return from 1" }, + two: { + inputs: [ + { inputs: [{ test: "payload2" }], obj: "return from 1" }, + ], + obj: "return from 2", + }, + }, + ], + obj: "return from 3", + }) + }) + + it("should compose a new workflow and execute it multiple times concurrently", async () => { + const mockStep1Fn = jest + .fn() + .mockImplementation(function (input, context) { + return { inputs: [input], obj: "return from 1" } + }) + const mockStep2Fn = jest.fn().mockImplementation(function (...inputs) { + const context = inputs.pop() + return { + inputs, + obj: "return from 2", + } + }) + const mockStep3Fn = jest.fn().mockImplementation(function (...inputs) { + const context = inputs.pop() + return { + inputs, + obj: "return from 3", + } + }) + + const step1 = createStep("step1", mockStep1Fn) + const step2 = createStep("step2", mockStep2Fn) + const step3 = createStep("step3", mockStep3Fn) + + const workflow = createWorkflow("workflow1", function (input) { + const returnStep1 = step1(input) + const ret2 = step2(returnStep1) + return step3({ one: returnStep1, two: ret2 }) + }) + + const workflowInput = { test: "payload1" } + const workflowInput2 = { test: "payload2" } + + const [{ result: workflowResult }, { result: workflowResult2 }] = + await promiseAll([ + workflow().run({ + input: workflowInput, + }), + workflow().run({ + input: workflowInput2, + }), + ]) + + expect(mockStep1Fn).toHaveBeenCalledTimes(2) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + expect(mockStep1Fn.mock.calls[1]).toHaveLength(2) + + expect(mockStep2Fn).toHaveBeenCalledTimes(2) + expect(mockStep2Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep2Fn.mock.calls[0][0]).toEqual({ + inputs: [workflowInput], + obj: "return from 1", + }) + + expect(mockStep3Fn).toHaveBeenCalledTimes(2) + expect(mockStep3Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep3Fn.mock.calls[0][0]).toEqual({ + one: { inputs: [{ test: "payload1" }], obj: "return from 1" }, + two: { + inputs: [{ inputs: [{ test: "payload1" }], obj: "return from 1" }], + obj: "return from 2", + }, + }) + + expect(workflowResult).toEqual({ + inputs: [ + { + one: { inputs: [{ test: "payload1" }], obj: "return from 1" }, + two: { + inputs: [ + { inputs: [{ test: "payload1" }], obj: "return from 1" }, + ], + obj: "return from 2", + }, + }, + ], + obj: "return from 3", + }) + expect(workflowResult2).toEqual({ + inputs: [ + { + one: { inputs: [{ test: "payload2" }], obj: "return from 1" }, + two: { + inputs: [ + { inputs: [{ test: "payload2" }], obj: "return from 1" }, + ], + obj: "return from 2", + }, + }, + ], + obj: "return from 3", + }) + }) + + it("should compose a new workflow with parallelize steps", async () => { + const mockStep1Fn = jest.fn().mockImplementation((input, context) => { + return { inputs: [input], obj: "return from 1" } + }) + const mockStep2Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return { + inputs, + obj: "return from 2", + } + }) + const mockStep3Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return { + inputs, + obj: "return from 3", + } + }) + const mockStep4Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return { + inputs, + obj: "return from 4", + } + }) + + const step1 = createStep("step1", mockStep1Fn) + const step2 = createStep("step2", mockStep2Fn) + const step3 = createStep("step3", mockStep3Fn) + const step4 = createStep("step4", mockStep4Fn) + + const workflow = createWorkflow("workflow1", function (input) { + const returnStep1 = step1(input) + const [ret2, ret3] = parallelize(step2(returnStep1), step3(returnStep1)) + return step4({ one: ret2, two: ret3 }) + }) + + const workflowInput = { test: "payload1" } + const { result: workflowResult } = await workflow().run({ + input: workflowInput, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(1) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + + expect(mockStep2Fn).toHaveBeenCalledTimes(1) + expect(mockStep2Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep2Fn.mock.calls[0][0]).toEqual({ + inputs: [workflowInput], + obj: "return from 1", + }) + + expect(mockStep3Fn).toHaveBeenCalledTimes(1) + expect(mockStep3Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep3Fn.mock.calls[0][0]).toEqual({ + inputs: [workflowInput], + obj: "return from 1", + }) + + expect(mockStep4Fn).toHaveBeenCalledTimes(1) + expect(mockStep4Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep4Fn.mock.calls[0][0]).toEqual({ + one: { + inputs: [{ inputs: [{ test: "payload1" }], obj: "return from 1" }], + obj: "return from 2", + }, + two: { + inputs: [{ inputs: [{ test: "payload1" }], obj: "return from 1" }], + obj: "return from 3", + }, + }) + + expect(workflowResult).toEqual({ + inputs: [ + { + one: { + inputs: [ + { inputs: [{ test: "payload1" }], obj: "return from 1" }, + ], + obj: "return from 2", + }, + two: { + inputs: [ + { inputs: [{ test: "payload1" }], obj: "return from 1" }, + ], + obj: "return from 3", + }, + }, + ], + obj: "return from 4", + }) + }) + + it("should overwrite existing workflows if the same name is used", async () => { + const mockStep1Fn = jest.fn().mockImplementation((input, context) => { + return { inputs: [input], obj: "return from 1" } + }) + const mockStep2Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return { + inputs, + obj: "return from 2", + } + }) + const mockStep3Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return { + inputs, + obj: "return from 3", + } + }) + + const step1 = createStep("step1", mockStep1Fn) + const step2 = createStep("step2", mockStep2Fn) + const step3 = createStep("step3", mockStep3Fn) + + createWorkflow("workflow1", function (input) { + const returnStep1 = step1(input) + const ret2 = step2(returnStep1) + return step3({ one: returnStep1, two: ret2 }) + }) + + const overriddenWorkflow = createWorkflow("workflow1", function (input) { + const ret2 = step2(input) + const returnStep1 = step1(ret2) + return step3({ one: returnStep1, two: ret2 }) + }) + + const workflowInput = { test: "payload1" } + const { result: workflowResult } = await overriddenWorkflow().run({ + input: workflowInput, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(1) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual({ + inputs: [workflowInput], + obj: "return from 2", + }) + + expect(mockStep2Fn).toHaveBeenCalledTimes(1) + expect(mockStep2Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep2Fn.mock.calls[0][0]).toEqual(workflowInput) + + expect(mockStep3Fn).toHaveBeenCalledTimes(1) + expect(mockStep3Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep3Fn.mock.calls[0][0]).toEqual({ + one: { + inputs: [{ inputs: [{ test: "payload1" }], obj: "return from 2" }], + obj: "return from 1", + }, + two: { inputs: [{ test: "payload1" }], obj: "return from 2" }, + }) + + expect(workflowResult).toEqual({ + inputs: [ + { + one: { + inputs: [ + { inputs: [{ test: "payload1" }], obj: "return from 2" }, + ], + obj: "return from 1", + }, + two: { inputs: [{ test: "payload1" }], obj: "return from 2" }, + }, + ], + obj: "return from 3", + }) + }) + + it("should transform the values before forward them to the next step", async () => { + const mockStep1Fn = jest.fn().mockImplementation((obj, context) => { + const ret = { + property: "property", + } + return ret + }) + + const mockStep2Fn = jest.fn().mockImplementation((obj, context) => { + const ret = { + ...obj, + sum: "sum = " + obj.sum, + } + + return ret + }) + + const mockStep3Fn = jest.fn().mockImplementation((param, context) => { + const ret = { + avg: "avg = " + param.avg, + ...param, + } + return ret + }) + + const transform1Fn = jest + .fn() + .mockImplementation(({ input, step1Result }) => { + const newObj = { + ...step1Result, + ...input, + sum: input.a + input.b, + } + return { + input: newObj, + } + }) + + const transform2Fn = jest + .fn() + .mockImplementation(async ({ input }, context) => { + input.another_prop = "another_prop" + return input + }) + + const transform3Fn = jest.fn().mockImplementation(({ obj }) => { + obj.avg = (obj.a + obj.b) / 2 + + return obj + }) + + const step1 = createStep("step1", mockStep1Fn) + const step2 = createStep("step2", mockStep2Fn) + const step3 = createStep("step3", mockStep3Fn) + + const mainFlow = createWorkflow("test_", function (input) { + const step1Result = step1(input) + + const sum = transform( + { input, step1Result }, + transform1Fn, + transform2Fn + ) + + const ret2 = step2(sum) + + const avg = transform({ obj: ret2 }, transform3Fn) + + return step3(avg) + }) + + const workflowInput = { a: 1, b: 2 } + await mainFlow().run({ input: workflowInput }) + + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + + expect(mockStep2Fn.mock.calls[0][0]).toEqual({ + property: "property", + a: 1, + b: 2, + sum: 3, + another_prop: "another_prop", + }) + + expect(mockStep3Fn.mock.calls[0][0]).toEqual({ + sum: "sum = 3", + property: "property", + a: 1, + b: 2, + another_prop: "another_prop", + avg: 1.5, + }) + + expect(transform1Fn).toHaveBeenCalledTimes(1) + expect(transform2Fn).toHaveBeenCalledTimes(1) + expect(transform3Fn).toHaveBeenCalledTimes(1) + }) + + it("should compose a new workflow and access properties from steps", async () => { + const mockStep1Fn = jest.fn().mockImplementation(({ input }, context) => { + return { id: input, product: "product_1", variant: "variant_2" } + }) + const mockStep2Fn = jest.fn().mockImplementation(({ product }) => { + return { + product: "Saved product - " + product, + } + }) + const mockStep3Fn = jest.fn().mockImplementation(({ variant }) => { + return { + variant: "Saved variant - " + variant, + } + }) + + const getData = createStep("step1", mockStep1Fn) + const saveProduct = createStep("step2", mockStep2Fn) + const saveVariant = createStep("step3", mockStep3Fn) + + const workflow = createWorkflow("workflow1", function (input) { + const data: any = getData(input) + parallelize( + saveProduct({ product: data.product }), + saveVariant({ variant: data.variant }) + ) + }) + + const workflowInput = "id_123" + await workflow().run({ + input: workflowInput, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(1) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + + expect(mockStep2Fn).toHaveBeenCalledTimes(1) + expect(mockStep2Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep2Fn.mock.calls[0][0]).toEqual({ product: "product_1" }) + + expect(mockStep3Fn).toHaveBeenCalledTimes(1) + expect(mockStep3Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep3Fn.mock.calls[0][0]).toEqual({ variant: "variant_2" }) + }) + + it("should compose a new workflow exposing hooks and log warns if multiple handlers are registered for the same hook", async () => { + const warn = jest.spyOn(console, "warn").mockImplementation(() => {}) + + const mockStep1Fn = jest.fn().mockImplementation(({ input }) => { + return { id: input, product: "product_1", variant: "variant_2" } + }) + + const mockStep2Fn = jest.fn().mockImplementation(({ product }) => { + product.product = "Saved product - " + product.product + return product + }) + + const getData = createStep("step1", mockStep1Fn) + const saveProduct = createStep("step2", mockStep2Fn) + + const workflow = createWorkflow("workflow1", function (input) { + const data = getData({ input }) + + const hookReturn = hook("changeProduct", { + opinionatedPropertyName: data, + }) + const transformedData = transform( + { data, hookReturn }, + ({ data, hookReturn }: { data: any; hookReturn: any }) => { + return { + ...data, + ...hookReturn, + } + } + ) + + return saveProduct({ product: transformedData }) + }) + + workflow.changeProduct(({ opinionatedPropertyName }) => { + return { + newProperties: "new properties", + prod: opinionatedPropertyName.product + "**", + var: opinionatedPropertyName.variant + "**", + other: [1, 2, 3], + nested: { + a: { + b: "c", + }, + }, + moreProperties: "more properties", + } + }) + + workflow.changeProduct((theReturnOfThePreviousHook) => { + return { + ...theReturnOfThePreviousHook, + moreProperties: "2nd hook update", + } + }) + + const workflowInput = "id_123" + const { result: final } = await workflow().run({ + input: workflowInput, + }) + + expect(warn).toHaveBeenCalledTimes(1) + expect(final).toEqual({ + id: "id_123", + prod: "product_1**", + var: "variant_2**", + variant: "variant_2", + product: "Saved product - product_1", + newProperties: "new properties", + other: [1, 2, 3], + nested: { + a: { + b: "c", + }, + }, + moreProperties: "more properties", + }) + }) + }) + + describe("Using steps returning StepResponse", function () { + afterEach(async () => { + jest.clearAllMocks() + }) + + it("should compose a new workflow and execute it", async () => { + const mockStep1Fn = jest.fn().mockImplementation((input) => { + return new StepResponse({ inputs: [input], obj: "return from 1" }) + }) + const mockStep2Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return new StepResponse({ + inputs, + obj: "return from 2", + }) + }) + const mockStep3Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return new StepResponse({ + inputs, + obj: "return from 3", + }) + }) + + const step1 = createStep("step1", mockStep1Fn) + const step2 = createStep("step2", mockStep2Fn) + const step3 = createStep("step3", mockStep3Fn) + + const workflow = createWorkflow("workflow1", function (input) { + const returnStep1 = step1(input) + const ret2 = step2(returnStep1) + return step3({ one: returnStep1, two: ret2 }) + }) + + const workflowInput = { test: "payload1" } + const { result: workflowResult } = await workflow().run({ + input: workflowInput, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(1) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + + expect(mockStep2Fn).toHaveBeenCalledTimes(1) + expect(mockStep2Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep2Fn.mock.calls[0][0]).toEqual({ + inputs: [workflowInput], + obj: "return from 1", + }) + + expect(mockStep3Fn).toHaveBeenCalledTimes(1) + expect(mockStep3Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep3Fn.mock.calls[0][0]).toEqual({ + one: { + inputs: [workflowInput], + obj: "return from 1", + }, + two: { + inputs: [ + { + inputs: [workflowInput], + obj: "return from 1", + }, + ], + obj: "return from 2", + }, + }) + + expect(workflowResult).toEqual({ + inputs: [ + { + one: { + inputs: [workflowInput], + obj: "return from 1", + }, + two: { + inputs: [ + { + inputs: [workflowInput], + obj: "return from 1", + }, + ], + obj: "return from 2", + }, + }, + ], + obj: "return from 3", + }) + }) + + it("should compose two new workflows sequentially and execute them sequentially", async () => { + const mockStep1Fn = jest.fn().mockImplementation((input, context) => { + return new StepResponse({ inputs: [input], obj: "return from 1" }) + }) + const mockStep2Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return new StepResponse({ + inputs, + obj: "return from 2", + }) + }) + const mockStep3Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return new StepResponse({ + inputs, + obj: "return from 3", + }) + }) + + const step1 = createStep("step1", mockStep1Fn) + const step2 = createStep("step2", mockStep2Fn) + const step3 = createStep("step3", mockStep3Fn) + + const workflow = createWorkflow("workflow1", function (input) { + const returnStep1 = step1(input) + const ret2 = step2(returnStep1) + return step3({ one: returnStep1, two: ret2 }) + }) + + const workflow2 = createWorkflow("workflow2", function (input) { + const returnStep1 = step1(input) + const ret2 = step2(returnStep1) + return step3({ one: returnStep1, two: ret2 }) + }) + + const workflowInput = { test: "payload1" } + const { result: workflowResult } = await workflow().run({ + input: workflowInput, + }) + + const workflow2Input = { test: "payload2" } + const { result: workflow2Result } = await workflow2().run({ + input: workflow2Input, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(2) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + expect(mockStep1Fn.mock.calls[1][0]).toEqual(workflow2Input) + + expect(mockStep2Fn).toHaveBeenCalledTimes(2) + expect(mockStep2Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep2Fn.mock.calls[0][0]).toEqual({ + inputs: [{ test: "payload1" }], + obj: "return from 1", + }) + expect(mockStep2Fn.mock.calls[1][0]).toEqual({ + inputs: [{ test: "payload2" }], + obj: "return from 1", + }) + + expect(mockStep3Fn).toHaveBeenCalledTimes(2) + expect(mockStep3Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep3Fn.mock.calls[0][0]).toEqual({ + one: { inputs: [{ test: "payload1" }], obj: "return from 1" }, + two: { + inputs: [{ inputs: [{ test: "payload1" }], obj: "return from 1" }], + obj: "return from 2", + }, + }) + expect(mockStep3Fn.mock.calls[1][0]).toEqual({ + one: { inputs: [{ test: "payload2" }], obj: "return from 1" }, + two: { + inputs: [{ inputs: [{ test: "payload2" }], obj: "return from 1" }], + obj: "return from 2", + }, + }) + + expect(workflowResult).toEqual({ + inputs: [ + { + one: { inputs: [{ test: "payload1" }], obj: "return from 1" }, + two: { + inputs: [ + { inputs: [{ test: "payload1" }], obj: "return from 1" }, + ], + obj: "return from 2", + }, + }, + ], + obj: "return from 3", + }) + expect(workflow2Result).toEqual({ + inputs: [ + { + one: { inputs: [{ test: "payload2" }], obj: "return from 1" }, + two: { + inputs: [ + { inputs: [{ test: "payload2" }], obj: "return from 1" }, + ], + obj: "return from 2", + }, + }, + ], + obj: "return from 3", + }) + }) + + it("should compose two new workflows concurrently and execute them sequentially", async () => { + const mockStep1Fn = jest.fn().mockImplementation((input, context) => { + return new StepResponse({ inputs: [input], obj: "return from 1" }) + }) + const mockStep2Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return new StepResponse({ + inputs, + obj: "return from 2", + }) + }) + const mockStep3Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return new StepResponse({ + inputs, + obj: "return from 3", + }) + }) + + const step1 = createStep("step1", mockStep1Fn) + const step2 = createStep("step2", mockStep2Fn) + const step3 = createStep("step3", mockStep3Fn) + + const [workflow, workflow2] = await promiseAll([ + createWorkflow("workflow1", function (input) { + const returnStep1 = step1(input) + const ret2 = step2(returnStep1) + return step3({ one: returnStep1, two: ret2 }) + }), + + createWorkflow("workflow2", function (input) { + const returnStep1 = step1(input) + const ret2 = step2(returnStep1) + return step3({ one: returnStep1, two: ret2 }) + }), + ]) + + const workflowInput = { test: "payload1" } + const { result: workflowResult } = await workflow().run({ + input: workflowInput, + }) + + const workflow2Input = { test: "payload2" } + const { result: workflow2Result } = await workflow2().run({ + input: workflow2Input, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(2) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + expect(mockStep1Fn.mock.calls[1][0]).toEqual(workflow2Input) + + expect(mockStep2Fn).toHaveBeenCalledTimes(2) + expect(mockStep2Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep2Fn.mock.calls[0][0]).toEqual({ + inputs: [workflowInput], + obj: "return from 1", + }) + expect(mockStep2Fn.mock.calls[1][0]).toEqual({ + inputs: [workflow2Input], + obj: "return from 1", + }) + + expect(mockStep3Fn).toHaveBeenCalledTimes(2) + expect(mockStep3Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep3Fn.mock.calls[0][0]).toEqual({ + one: { inputs: [{ test: "payload1" }], obj: "return from 1" }, + two: { + inputs: [{ inputs: [{ test: "payload1" }], obj: "return from 1" }], + obj: "return from 2", + }, + }) + expect(mockStep3Fn.mock.calls[1][0]).toEqual({ + one: { inputs: [{ test: "payload2" }], obj: "return from 1" }, + two: { + inputs: [{ inputs: [{ test: "payload2" }], obj: "return from 1" }], + obj: "return from 2", + }, + }) + + expect(workflowResult).toEqual({ + inputs: [ + { + one: { inputs: [{ test: "payload1" }], obj: "return from 1" }, + two: { + inputs: [ + { inputs: [{ test: "payload1" }], obj: "return from 1" }, + ], + obj: "return from 2", + }, + }, + ], + obj: "return from 3", + }) + expect(workflow2Result).toEqual({ + inputs: [ + { + one: { inputs: [{ test: "payload2" }], obj: "return from 1" }, + two: { + inputs: [ + { inputs: [{ test: "payload2" }], obj: "return from 1" }, + ], + obj: "return from 2", + }, + }, + ], + obj: "return from 3", + }) + }) + + it("should compose two new workflows concurrently and execute them concurrently", async () => { + const mockStep1Fn = jest.fn().mockImplementation((input, context) => { + return new StepResponse({ inputs: [input], obj: "return from 1" }) + }) + const mockStep2Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return new StepResponse({ + inputs, + obj: "return from 2", + }) + }) + const mockStep3Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return new StepResponse({ + inputs, + obj: "return from 3", + }) + }) + + const step1 = createStep("step1", mockStep1Fn) + const step2 = createStep("step2", mockStep2Fn) + const step3 = createStep("step3", mockStep3Fn) + + const [workflow, workflow2] = await promiseAll([ + createWorkflow("workflow1", function (input) { + const returnStep1 = step1(input) + const ret2 = step2(returnStep1) + return step3({ one: returnStep1, two: ret2 }) + }), + + createWorkflow("workflow2", function (input) { + const returnStep1 = step1(input) + const ret2 = step2(returnStep1) + return step3({ one: returnStep1, two: ret2 }) + }), + ]) + + const workflowInput = { test: "payload1" } + const workflow2Input = { test: "payload2" } + + const [{ result: workflowResult }, { result: workflow2Result }] = + await promiseAll([ + workflow().run({ + input: workflowInput, + }), + workflow2().run({ + input: workflow2Input, + }), + ]) + + expect(mockStep1Fn).toHaveBeenCalledTimes(2) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + expect(mockStep1Fn.mock.calls[1][0]).toEqual(workflow2Input) + expect(mockStep1Fn.mock.calls[1]).toHaveLength(2) + + expect(mockStep2Fn).toHaveBeenCalledTimes(2) + expect(mockStep2Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep2Fn.mock.calls[0][0]).toEqual({ + inputs: [workflowInput], + obj: "return from 1", + }) + expect(mockStep2Fn.mock.calls[1][0]).toEqual({ + inputs: [workflow2Input], + obj: "return from 1", + }) + + expect(mockStep3Fn).toHaveBeenCalledTimes(2) + expect(mockStep3Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep3Fn.mock.calls[0][0]).toEqual({ + one: { inputs: [{ test: "payload1" }], obj: "return from 1" }, + two: { + inputs: [{ inputs: [{ test: "payload1" }], obj: "return from 1" }], + obj: "return from 2", + }, + }) + expect(mockStep3Fn.mock.calls[1][0]).toEqual({ + one: { inputs: [{ test: "payload2" }], obj: "return from 1" }, + two: { + inputs: [{ inputs: [{ test: "payload2" }], obj: "return from 1" }], + obj: "return from 2", + }, + }) + + expect(workflowResult).toEqual({ + inputs: [ + { + one: { inputs: [{ test: "payload1" }], obj: "return from 1" }, + two: { + inputs: [ + { inputs: [{ test: "payload1" }], obj: "return from 1" }, + ], + obj: "return from 2", + }, + }, + ], + obj: "return from 3", + }) + expect(workflow2Result).toEqual({ + inputs: [ + { + one: { inputs: [{ test: "payload2" }], obj: "return from 1" }, + two: { + inputs: [ + { inputs: [{ test: "payload2" }], obj: "return from 1" }, + ], + obj: "return from 2", + }, + }, + ], + obj: "return from 3", + }) + }) + + it("should compose a new workflow and execute it multiple times concurrently", async () => { + const mockStep1Fn = jest + .fn() + .mockImplementation(function (input, context) { + return new StepResponse({ inputs: [input], obj: "return from 1" }) + }) + const mockStep2Fn = jest.fn().mockImplementation(function (...inputs) { + const context = inputs.pop() + return new StepResponse({ + inputs, + obj: "return from 2", + }) + }) + const mockStep3Fn = jest.fn().mockImplementation(function (...inputs) { + const context = inputs.pop() + return new StepResponse({ + inputs, + obj: "return from 3", + }) + }) + + const step1 = createStep("step1", mockStep1Fn) + const step2 = createStep("step2", mockStep2Fn) + const step3 = createStep("step3", mockStep3Fn) + + const workflow = createWorkflow("workflow1", function (input) { + const returnStep1 = step1(input) + const ret2 = step2(returnStep1) + return step3({ one: returnStep1, two: ret2 }) + }) + + const workflowInput = { test: "payload1" } + const workflowInput2 = { test: "payload2" } + + const [{ result: workflowResult }, { result: workflowResult2 }] = + await promiseAll([ + workflow().run({ + input: workflowInput, + }), + workflow().run({ + input: workflowInput2, + }), + ]) + + expect(mockStep1Fn).toHaveBeenCalledTimes(2) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + expect(mockStep1Fn.mock.calls[1]).toHaveLength(2) + + expect(mockStep2Fn).toHaveBeenCalledTimes(2) + expect(mockStep2Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep2Fn.mock.calls[0][0]).toEqual({ + inputs: [workflowInput], + obj: "return from 1", + }) + + expect(mockStep3Fn).toHaveBeenCalledTimes(2) + expect(mockStep3Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep3Fn.mock.calls[0][0]).toEqual({ + one: { inputs: [{ test: "payload1" }], obj: "return from 1" }, + two: { + inputs: [{ inputs: [{ test: "payload1" }], obj: "return from 1" }], + obj: "return from 2", + }, + }) + + expect(workflowResult).toEqual({ + inputs: [ + { + one: { inputs: [{ test: "payload1" }], obj: "return from 1" }, + two: { + inputs: [ + { inputs: [{ test: "payload1" }], obj: "return from 1" }, + ], + obj: "return from 2", + }, + }, + ], + obj: "return from 3", + }) + expect(workflowResult2).toEqual({ + inputs: [ + { + one: { inputs: [{ test: "payload2" }], obj: "return from 1" }, + two: { + inputs: [ + { inputs: [{ test: "payload2" }], obj: "return from 1" }, + ], + obj: "return from 2", + }, + }, + ], + obj: "return from 3", + }) + }) + + it("should compose a new workflow with parallelize steps", async () => { + const mockStep1Fn = jest.fn().mockImplementation((input, context) => { + return new StepResponse({ inputs: [input], obj: "return from 1" }) + }) + const mockStep2Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return new StepResponse({ + inputs, + obj: "return from 2", + }) + }) + const mockStep3Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return new StepResponse({ + inputs, + obj: "return from 3", + }) + }) + const mockStep4Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return { + inputs, + obj: "return from 4", + } + }) + + const step1 = createStep("step1", mockStep1Fn) + const step2 = createStep("step2", mockStep2Fn) + const step3 = createStep("step3", mockStep3Fn) + const step4 = createStep("step4", mockStep4Fn) + + const workflow = createWorkflow("workflow1", function (input) { + const returnStep1 = step1(input) + const [ret2, ret3] = parallelize(step2(returnStep1), step3(returnStep1)) + return step4({ one: ret2, two: ret3 }) + }) + + const workflowInput = { test: "payload1" } + const { result: workflowResult } = await workflow().run({ + input: workflowInput, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(1) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + + expect(mockStep2Fn).toHaveBeenCalledTimes(1) + expect(mockStep2Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep2Fn.mock.calls[0][0]).toEqual({ + inputs: [workflowInput], + obj: "return from 1", + }) + + expect(mockStep3Fn).toHaveBeenCalledTimes(1) + expect(mockStep3Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep3Fn.mock.calls[0][0]).toEqual({ + inputs: [workflowInput], + obj: "return from 1", + }) + + expect(mockStep4Fn).toHaveBeenCalledTimes(1) + expect(mockStep4Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep4Fn.mock.calls[0][0]).toEqual({ + one: { + inputs: [{ inputs: [{ test: "payload1" }], obj: "return from 1" }], + obj: "return from 2", + }, + two: { + inputs: [{ inputs: [{ test: "payload1" }], obj: "return from 1" }], + obj: "return from 3", + }, + }) + + expect(workflowResult).toEqual({ + inputs: [ + { + one: { + inputs: [ + { inputs: [{ test: "payload1" }], obj: "return from 1" }, + ], + obj: "return from 2", + }, + two: { + inputs: [ + { inputs: [{ test: "payload1" }], obj: "return from 1" }, + ], + obj: "return from 3", + }, + }, + ], + obj: "return from 4", + }) + }) + + it("should overwrite existing workflows if the same name is used", async () => { + const mockStep1Fn = jest.fn().mockImplementation((input, context) => { + return new StepResponse({ inputs: [input], obj: "return from 1" }) + }) + const mockStep2Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return new StepResponse({ + inputs, + obj: "return from 2", + }) + }) + const mockStep3Fn = jest.fn().mockImplementation((...inputs) => { + const context = inputs.pop() + return new StepResponse({ + inputs, + obj: "return from 3", + }) + }) + + const step1 = createStep("step1", mockStep1Fn) + const step2 = createStep("step2", mockStep2Fn) + const step3 = createStep("step3", mockStep3Fn) + + createWorkflow("workflow1", function (input) { + const returnStep1 = step1(input) + const ret2 = step2(returnStep1) + return step3({ one: returnStep1, two: ret2 }) + }) + + const overriddenWorkflow = createWorkflow("workflow1", function (input) { + const ret2 = step2(input) + const returnStep1 = step1(ret2) + return step3({ one: returnStep1, two: ret2 }) + }) + + const workflowInput = { test: "payload1" } + const { result: workflowResult } = await overriddenWorkflow().run({ + input: workflowInput, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(1) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual({ + inputs: [workflowInput], + obj: "return from 2", + }) + + expect(mockStep2Fn).toHaveBeenCalledTimes(1) + expect(mockStep2Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep2Fn.mock.calls[0][0]).toEqual(workflowInput) + + expect(mockStep3Fn).toHaveBeenCalledTimes(1) + expect(mockStep3Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep3Fn.mock.calls[0][0]).toEqual({ + one: { + inputs: [{ inputs: [{ test: "payload1" }], obj: "return from 2" }], + obj: "return from 1", + }, + two: { inputs: [{ test: "payload1" }], obj: "return from 2" }, + }) + + expect(workflowResult).toEqual({ + inputs: [ + { + one: { + inputs: [ + { inputs: [{ test: "payload1" }], obj: "return from 2" }, + ], + obj: "return from 1", + }, + two: { inputs: [{ test: "payload1" }], obj: "return from 2" }, + }, + ], + obj: "return from 3", + }) + }) + + it("should transform the values before forward them to the next step", async () => { + const mockStep1Fn = jest.fn().mockImplementation((obj, context) => { + const ret = new StepResponse({ + property: "property", + }) + return ret + }) + + const mockStep2Fn = jest.fn().mockImplementation((obj, context) => { + const ret = new StepResponse({ + ...obj, + sum: "sum = " + obj.sum, + }) + + return ret + }) + + const mockStep3Fn = jest.fn().mockImplementation((param, context) => { + const ret = new StepResponse({ + avg: "avg = " + param.avg, + ...param, + }) + return ret + }) + + const transform1Fn = jest + .fn() + .mockImplementation(({ input, step1Result }) => { + const newObj = { + ...step1Result, + ...input, + sum: input.a + input.b, + } + return { + input: newObj, + } + }) + + const transform2Fn = jest + .fn() + .mockImplementation(async ({ input }, context) => { + input.another_prop = "another_prop" + return input + }) + + const transform3Fn = jest.fn().mockImplementation(({ obj }) => { + obj.avg = (obj.a + obj.b) / 2 + + return obj + }) + + const step1 = createStep("step1", mockStep1Fn) + const step2 = createStep("step2", mockStep2Fn) + const step3 = createStep("step3", mockStep3Fn) + + const mainFlow = createWorkflow("test_", function (input) { + const step1Result = step1(input) + + const sum = transform( + { input, step1Result }, + transform1Fn, + transform2Fn + ) + + const ret2 = step2(sum) + + const avg = transform({ obj: ret2 }, transform3Fn) + + return step3(avg) + }) + + const workflowInput = { a: 1, b: 2 } + await mainFlow().run({ input: workflowInput }) + + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + + expect(mockStep2Fn.mock.calls[0][0]).toEqual({ + property: "property", + a: 1, + b: 2, + sum: 3, + another_prop: "another_prop", + }) + + expect(mockStep3Fn.mock.calls[0][0]).toEqual({ + sum: "sum = 3", + property: "property", + a: 1, + b: 2, + another_prop: "another_prop", + avg: 1.5, + }) + + expect(transform1Fn).toHaveBeenCalledTimes(1) + expect(transform2Fn).toHaveBeenCalledTimes(1) + expect(transform3Fn).toHaveBeenCalledTimes(1) + }) + + it("should compose a new workflow and access properties from steps", async () => { + const mockStep1Fn = jest.fn().mockImplementation(({ input }, context) => { + return new StepResponse({ + id: input, + product: "product_1", + variant: "variant_2", + }) + }) + const mockStep2Fn = jest.fn().mockImplementation(({ product }) => { + return new StepResponse({ + product: "Saved product - " + product, + }) + }) + const mockStep3Fn = jest.fn().mockImplementation(({ variant }) => { + return new StepResponse({ + variant: "Saved variant - " + variant, + }) + }) + + const getData = createStep("step1", mockStep1Fn) + const saveProduct = createStep("step2", mockStep2Fn) + const saveVariant = createStep("step3", mockStep3Fn) + + const workflow = createWorkflow("workflow1", function (input) { + const data: any = getData(input) + parallelize( + saveProduct({ product: data.product }), + saveVariant({ variant: data.variant }) + ) + }) + + const workflowInput = "id_123" + await workflow().run({ + input: workflowInput, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(1) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + + expect(mockStep2Fn).toHaveBeenCalledTimes(1) + expect(mockStep2Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep2Fn.mock.calls[0][0]).toEqual({ product: "product_1" }) + + expect(mockStep3Fn).toHaveBeenCalledTimes(1) + expect(mockStep3Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep3Fn.mock.calls[0][0]).toEqual({ variant: "variant_2" }) + }) + + it("should compose a new workflow exposing hooks and log warns if multiple handlers are registered for the same hook", async () => { + const warn = jest.spyOn(console, "warn").mockImplementation(() => {}) + + const mockStep1Fn = jest.fn().mockImplementation(({ input }) => { + return new StepResponse({ + id: input, + product: "product_1", + variant: "variant_2", + }) + }) + + const mockStep2Fn = jest.fn().mockImplementation(({ product }) => { + product.product = "Saved product - " + product.product + return new StepResponse(product) + }) + + const getData = createStep("step1", mockStep1Fn) + const saveProduct = createStep("step2", mockStep2Fn) + + const workflow = createWorkflow("workflow1", function (input) { + const data = getData({ input }) + + const hookReturn = hook("changeProduct", { + opinionatedPropertyName: data, + }) + const transformedData = transform( + { data, hookReturn }, + ({ data, hookReturn }: { data: any; hookReturn: any }) => { + return { + ...data, + ...hookReturn, + } + } + ) + + return saveProduct({ product: transformedData }) + }) + + workflow.changeProduct(({ opinionatedPropertyName }) => { + return { + newProperties: "new properties", + prod: opinionatedPropertyName.product + "**", + var: opinionatedPropertyName.variant + "**", + other: [1, 2, 3], + nested: { + a: { + b: "c", + }, + }, + moreProperties: "more properties", + } + }) + + workflow.changeProduct((theReturnOfThePreviousHook) => { + return { + ...theReturnOfThePreviousHook, + moreProperties: "2nd hook update", + } + }) + + const workflowInput = "id_123" + const { result: final } = await workflow().run({ + input: workflowInput, + }) + + expect(warn).toHaveBeenCalledTimes(1) + expect(final).toEqual({ + id: "id_123", + prod: "product_1**", + var: "variant_2**", + variant: "variant_2", + product: "Saved product - product_1", + newProperties: "new properties", + other: [1, 2, 3], + nested: { + a: { + b: "c", + }, + }, + moreProperties: "more properties", + }) + }) + }) +}) diff --git a/integration-tests/plugins/package.json b/integration-tests/plugins/package.json index e3c07aa393c05..35a2ab67ecce5 100644 --- a/integration-tests/plugins/package.json +++ b/integration-tests/plugins/package.json @@ -5,7 +5,7 @@ "license": "MIT", "private": true, "scripts": { - "test:integration": "node --expose-gc ./../../node_modules/.bin/jest --silent=false --runInBand --bail --detectOpenHandles --logHeapUsage --clearMocks --no-compilation-cache --forceExit", + "test:integration": "node --expose-gc ./../../node_modules/.bin/jest --silent=false --runInBand --bail --detectOpenHandles --logHeapUsage --clearMocks --forceExit", "build": "babel src -d dist --extensions \".ts,.js\"" }, "dependencies": { diff --git a/packages/link-modules/src/index.ts b/packages/link-modules/src/index.ts index 3acb2f2c61577..4bbfce7a57d4d 100644 --- a/packages/link-modules/src/index.ts +++ b/packages/link-modules/src/index.ts @@ -2,3 +2,4 @@ export * from "./initialize" export * from "./types" export * from "./loaders" export * from "./services" +export * from "./utils/compose-link-name" diff --git a/packages/medusa/src/loaders/plugins.ts b/packages/medusa/src/loaders/plugins.ts index 356c5d3f5ce3c..a5db6bc287931 100644 --- a/packages/medusa/src/loaders/plugins.ts +++ b/packages/medusa/src/loaders/plugins.ts @@ -96,6 +96,7 @@ export default async ({ ) registerCoreRouters(pluginDetails, container) await registerSubscribers(pluginDetails, container, activityId) + await registerWorkflows(pluginDetails) }) ) @@ -634,6 +635,15 @@ function registerRepositories( }) } +/** + * import files from the workflows directory to run the registration of the wofklows + * @param pluginDetails + */ +async function registerWorkflows(pluginDetails: PluginDetails): Promise { + const files = glob.sync(`${pluginDetails.resolve}/workflows/*.js`, {}) + await Promise.all(files.map(async (file) => import(file))) +} + /** * Registers a plugin's models at the right location in our container. Models * must inherit from BaseModel. Models are registered directly in the container. diff --git a/packages/orchestration/src/workflow/workflow-manager.ts b/packages/orchestration/src/workflow/workflow-manager.ts index e46f300f5bf5f..634c63b0fe138 100644 --- a/packages/orchestration/src/workflow/workflow-manager.ts +++ b/packages/orchestration/src/workflow/workflow-manager.ts @@ -70,7 +70,7 @@ export class WorkflowManager { static register( workflowId: string, - flow: TransactionStepsDefinition | OrchestratorBuilder, + flow: TransactionStepsDefinition | OrchestratorBuilder | undefined, handlers: WorkflowHandler, requiredModules?: Set, optionalModules?: Set @@ -78,19 +78,22 @@ export class WorkflowManager { const finalFlow = flow instanceof OrchestratorBuilder ? flow.build() : flow if (WorkflowManager.workflows.has(workflowId)) { - const areStepsEqual = - JSON.stringify(finalFlow) === - JSON.stringify(WorkflowManager.workflows.get(workflowId)!.flow_) + const areStepsEqual = finalFlow + ? JSON.stringify(finalFlow) === + JSON.stringify(WorkflowManager.workflows.get(workflowId)!.flow_) + : true if (!areStepsEqual) { - throw new Error(`Workflow with id "${workflowId}" and step definition already exists.`) + throw new Error( + `Workflow with id "${workflowId}" and step definition already exists.` + ) } } WorkflowManager.workflows.set(workflowId, { id: workflowId, - flow_: finalFlow, - orchestrator: new TransactionOrchestrator(workflowId, finalFlow), + flow_: finalFlow!, + orchestrator: new TransactionOrchestrator(workflowId, finalFlow ?? {}), handler: WorkflowManager.buildHandlers(handlers), handlers_: handlers, requiredModules, diff --git a/packages/workflows/src/helper/workflow-export.ts b/packages/workflows/src/helper/workflow-export.ts index 3b2fc8dcec70b..ed75b21d30eae 100644 --- a/packages/workflows/src/helper/workflow-export.ts +++ b/packages/workflows/src/helper/workflow-export.ts @@ -11,6 +11,7 @@ import { MedusaModule } from "@medusajs/modules-sdk" import { EOL } from "os" import { ulid } from "ulid" import { Workflows } from "../definitions" +import { SymbolWorkflowWorkflowData } from "../utils/composer" export type FlowRunOptions = { input?: TData @@ -26,7 +27,7 @@ export type WorkflowResult = { } export const exportWorkflow = ( - workflowId: Workflows, + workflowId: Workflows | string, defaultResult?: string, dataPreparation?: (data: TData) => Promise ) => { @@ -63,7 +64,7 @@ export const exportWorkflow = ( if (typeof dataPreparation === "function") { try { - const copyInput = JSON.parse(JSON.stringify(input)) + const copyInput = input ? JSON.parse(JSON.stringify(input)) : input input = await dataPreparation(copyInput as TData) } catch (err) { if (throwOnError) { @@ -97,11 +98,13 @@ export const exportWorkflow = ( if (resultFrom) { if (Array.isArray(resultFrom)) { - result = resultFrom.map( - (from) => transaction.getContext().invoke?.[from] - ) + result = resultFrom.map((from) => { + const res = transaction.getContext().invoke?.[from] + return res?.__type === SymbolWorkflowWorkflowData ? res.output : res + }) } else { - result = transaction.getContext().invoke?.[resultFrom] + const res = transaction.getContext().invoke?.[resultFrom] + result = res?.__type === SymbolWorkflowWorkflowData ? res.output : res } } diff --git a/packages/workflows/src/index.ts b/packages/workflows/src/index.ts index 2816d42f49232..e60ea511b25cb 100644 --- a/packages/workflows/src/index.ts +++ b/packages/workflows/src/index.ts @@ -2,3 +2,5 @@ export * from "./definition" export * from "./definitions" export * as Handlers from "./handlers" export * from "./helper" +export * from "./utils/composer" +export * as Composer from "./utils/composer" diff --git a/packages/workflows/src/utils/composer/create-step.ts b/packages/workflows/src/utils/composer/create-step.ts new file mode 100644 index 0000000000000..5405559c6a8f8 --- /dev/null +++ b/packages/workflows/src/utils/composer/create-step.ts @@ -0,0 +1,242 @@ +import { + resolveValue, + StepResponse, + SymbolMedusaWorkflowComposerContext, + SymbolWorkflowStep, + SymbolWorkflowStepBind, + SymbolWorkflowStepResponse, + SymbolWorkflowWorkflowData, +} from "./helpers" +import { + CreateWorkflowComposerContext, + StepExecutionContext, + StepFunction, + StepFunctionResult, + WorkflowData, +} from "./type" +import { proxify } from "./helpers/proxy" + +type InvokeFn = ( + input: { + [Key in keyof TInput]: TInput[Key] + }, + context: StepExecutionContext +) => + | void + | StepResponse< + TOutput, + TCompensateInput extends undefined ? TOutput : TCompensateInput + > + | Promise> + +type CompensateFn = ( + input: T, + context: StepExecutionContext +) => unknown | Promise + +interface ApplyStepOptions< + TStepInputs extends { + [K in keyof TInvokeInput]: WorkflowData + }, + TInvokeInput extends object, + TInvokeResultOutput, + TInvokeResultCompensateInput +> { + stepName: string + input: TStepInputs + invokeFn: InvokeFn< + TInvokeInput, + TInvokeResultOutput, + TInvokeResultCompensateInput + > + compensateFn?: CompensateFn +} + +/** + * Internal function to create the invoke and compensate handler for a step. + * This is where the inputs and context are passed to the underlying invoke and compensate function. + * + * @param stepName + * @param input + * @param invokeFn + * @param compensateFn + */ +function applyStep< + TInvokeInput extends object, + TStepInput extends { + [K in keyof TInvokeInput]: WorkflowData + }, + TInvokeResultOutput, + TInvokeResultCompensateInput +>({ + stepName, + input, + invokeFn, + compensateFn, +}: ApplyStepOptions< + TStepInput, + TInvokeInput, + TInvokeResultOutput, + TInvokeResultCompensateInput +>): StepFunctionResult { + return function (this: CreateWorkflowComposerContext) { + if (!this.workflowId) { + throw new Error( + "createStep must be used inside a createWorkflow definition" + ) + } + + const handler = { + invoke: async (transactionContext) => { + const executionContext: StepExecutionContext = { + container: transactionContext.container, + metadata: transactionContext.metadata, + context: transactionContext.context, + } + + const argInput = await resolveValue(input, transactionContext) + const stepResponse: StepResponse = await invokeFn.apply( + this, + [argInput, executionContext] + ) + + const stepResponseJSON = + stepResponse?.__type === SymbolWorkflowStepResponse + ? stepResponse.toJSON() + : stepResponse + + return { + __type: SymbolWorkflowWorkflowData, + output: stepResponseJSON, + } + }, + compensate: compensateFn + ? async (transactionContext) => { + const executionContext: StepExecutionContext = { + container: transactionContext.container, + metadata: transactionContext.metadata, + context: transactionContext.context, + } + + const stepOutput = transactionContext.invoke[stepName].output + const invokeResult = + stepOutput?.__type === SymbolWorkflowStepResponse + ? stepOutput.compensateInput && + JSON.parse(JSON.stringify(stepOutput.compensateInput)) + : stepOutput && JSON.parse(JSON.stringify(stepOutput)) + + const args = [invokeResult, executionContext] + const output = await compensateFn.apply(this, args) + return { + output, + } + } + : undefined, + } + + this.flow.addAction(stepName, { + noCompensation: !compensateFn, + }) + this.handlers.set(stepName, handler) + + const ret = { + __type: SymbolWorkflowStep, + __step__: stepName, + } + + return proxify(ret) + } +} + +/** + * Function which will create a StepFunction to be used inside a createWorkflow composer function. + * This function will return a function which can be used to bind the step to a workflow. + * The types of the input to be passed to the step function is defined by the generic of the invoke function provided. + * + * @param name + * @param invokeFn + * @param compensateFn + * + * @example + * ```ts + * interface CreateProductInput { + * title: string + * } + * + * interface CreateProductOutput { + * product: { id: string; title: string } + * compensateInput: { + * product_id: string + * } + * } + * + * export const createProductStep = createStep( + * "createProductStep", + * async function (input: Step1Input, context: StepExecutionContext): Promise { + * const productService = context.container.resolve("productService") + * const product = await productService.create(input) + * return { + * product, + * compensateInput: { + * product_id: product.id + * } + * } + * }, + * async function (input: { product_id: string }, context: StepExecutionContext) { + * const productService = context.container.resolve("productService") + * await productService.delete(input.product_id) + * }) + */ +export function createStep< + TInvokeInput extends object, + TInvokeResultOutput, + TInvokeResultCompensateInput +>( + name: string, + invokeFn: InvokeFn< + TInvokeInput, + TInvokeResultOutput, + TInvokeResultCompensateInput + >, + compensateFn?: CompensateFn +): StepFunction { + const stepName = name ?? invokeFn.name + + const returnFn = function (input: { + [K in keyof TInvokeInput]: WorkflowData + }): WorkflowData { + if (!global[SymbolMedusaWorkflowComposerContext]) { + throw new Error( + "createStep must be used inside a createWorkflow definition" + ) + } + + const stepBinder = ( + global[ + SymbolMedusaWorkflowComposerContext + ] as CreateWorkflowComposerContext + ).stepBinder + + return stepBinder( + applyStep< + TInvokeInput, + { [K in keyof TInvokeInput]: WorkflowData }, + TInvokeResultOutput, + TInvokeResultCompensateInput + >({ + stepName, + input, + invokeFn, + compensateFn, + }) + ) + } + + returnFn.__type = SymbolWorkflowStepBind + returnFn.__step__ = stepName + + return returnFn as unknown as StepFunction +} diff --git a/packages/workflows/src/utils/composer/create-workflow.ts b/packages/workflows/src/utils/composer/create-workflow.ts new file mode 100644 index 0000000000000..c9524b5dbcba5 --- /dev/null +++ b/packages/workflows/src/utils/composer/create-workflow.ts @@ -0,0 +1,182 @@ +import { + LocalWorkflow, + WorkflowHandler, + WorkflowManager, +} from "@medusajs/orchestration" +import { LoadedModule, MedusaContainer } from "@medusajs/types" +import { exportWorkflow, FlowRunOptions, WorkflowResult } from "../../helper" +import { + CreateWorkflowComposerContext, + WorkflowData, + WorkflowDataProperties, +} from "./type" +import { + resolveValue, + SymbolInputReference, + SymbolMedusaWorkflowComposerContext, + SymbolWorkflowStep, +} from "./helpers" +import { proxify } from "./helpers/proxy" + +global[SymbolMedusaWorkflowComposerContext] = null + +type ReturnWorkflow> = { + ( + container?: LoadedModule[] | MedusaContainer + ): Omit & { + run: ( + args?: FlowRunOptions< + TDataOverride extends undefined ? TData : TDataOverride + > + ) => Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > + } +} & THooks + +/** + * Creates a new workflow with the given name and composer function. + * The composer function will compose the workflow by using the step, parallelize and other util functions that + * will allow to define the flow of event of a workflow. + * + * @param name + * @param composer + * + * @example + * ```ts + * import { createWorkflow, WorkflowData } from "@medusajs/workflows" + * import { createProductStep, getProductStep, createPricesStep } from "./steps" + * + * interface MyWorkflowData { + * title: string + * } + * + * const myWorkflow = createWorkflow("my-workflow", (input: WorkflowData) => { + * // Everything here will be executed and resolved later during the execution. Including the data access. + * + * const product = createProductStep(input) + * const prices = createPricesStep(product) + * + * const id = product.id + * return getProductStep(product.id) + * }) + * ``` + */ + +export function createWorkflow< + TData, + TResult, + THooks extends Record = Record +>( + name: string, + composer: (input: WorkflowData) => + | void + | WorkflowData + | { + [K in keyof TResult]: + | WorkflowData + | WorkflowDataProperties + } +): ReturnWorkflow { + const handlers: WorkflowHandler = new Map() + + if (WorkflowManager.getWorkflow(name)) { + WorkflowManager.unregister(name) + } + + WorkflowManager.register(name, undefined, handlers) + + const context: CreateWorkflowComposerContext = { + workflowId: name, + flow: WorkflowManager.getTransactionDefinition(name), + handlers, + hooks_: [], + hooksCallback_: {}, + hookBinder: (name, fn) => { + context.hooks_.push(name) + return fn(context) + }, + stepBinder: (fn) => { + return fn.bind(context)() + }, + parallelizeBinder: (fn) => { + return fn.bind(context)() + }, + } + + global[SymbolMedusaWorkflowComposerContext] = context + + const inputPlaceHolder = proxify({ + __type: SymbolInputReference, + __step__: "", + }) + + const returnedStep = composer.apply(context, [inputPlaceHolder]) + + delete global[SymbolMedusaWorkflowComposerContext] + + WorkflowManager.update(name, context.flow, handlers) + + const workflow = exportWorkflow(name) + + const mainFlow = ( + container?: LoadedModule[] | MedusaContainer + ) => { + const workflow_ = workflow(container) + const originalRun = workflow_.run + + workflow_.run = (async ( + args?: FlowRunOptions< + TDataOverride extends undefined ? TData : TDataOverride + > + ): Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > => { + args ??= {} + args.resultFrom ??= + returnedStep?.__type === SymbolWorkflowStep + ? returnedStep.__step__ + : undefined + + // Forwards the input to the ref object on composer.apply + const workflowResult = (await originalRun( + args + )) as unknown as WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + + workflowResult.result = await resolveValue( + workflowResult.result || returnedStep, + workflowResult.transaction.getContext() + ) + + return workflowResult + }) as any + + return workflow_ + } + + let shouldRegisterHookHandler = true + + for (const hook of context.hooks_) { + mainFlow[hook] = (fn) => { + context.hooksCallback_[hook] ??= [] + + if (!shouldRegisterHookHandler) { + console.warn( + `A hook handler has already been registered for the ${hook} hook. The current handler registration will be skipped.` + ) + return + } + + context.hooksCallback_[hook].push(fn) + shouldRegisterHookHandler = false + } + } + + return mainFlow as ReturnWorkflow +} diff --git a/packages/workflows/src/utils/composer/helpers/index.ts b/packages/workflows/src/utils/composer/helpers/index.ts new file mode 100644 index 0000000000000..2636ee016ffcc --- /dev/null +++ b/packages/workflows/src/utils/composer/helpers/index.ts @@ -0,0 +1,3 @@ +export * from "./step-response" +export * from "./symbol" +export * from "./resolve-value" \ No newline at end of file diff --git a/packages/workflows/src/utils/composer/helpers/proxy.ts b/packages/workflows/src/utils/composer/helpers/proxy.ts new file mode 100644 index 0000000000000..68ca47b2f3359 --- /dev/null +++ b/packages/workflows/src/utils/composer/helpers/proxy.ts @@ -0,0 +1,28 @@ +import { transform } from "../transform" +import { WorkflowData, WorkflowTransactionContext } from "../type" +import { SymbolInputReference, SymbolWorkflowStepTransformer } from "./symbol" +import { resolveValue } from "./resolve-value" + +export function proxify(obj: WorkflowData): T { + return new Proxy(obj, { + get(target: any, prop: string | symbol): any { + if (prop in target) { + return target[prop] + } + + return transform(target[prop], async function (input, context) { + const { invoke } = context as WorkflowTransactionContext + let output = + target.__type === SymbolInputReference || + target.__type === SymbolWorkflowStepTransformer + ? target + : invoke?.[obj.__step__]?.output + + output = await resolveValue(output, context) + output = output?.[prop] + + return output && JSON.parse(JSON.stringify(output)) + }) + }, + }) as unknown as T +} diff --git a/packages/workflows/src/utils/composer/helpers/resolve-value.ts b/packages/workflows/src/utils/composer/helpers/resolve-value.ts new file mode 100644 index 0000000000000..5a74e19448c0f --- /dev/null +++ b/packages/workflows/src/utils/composer/helpers/resolve-value.ts @@ -0,0 +1,71 @@ +import { promiseAll } from "@medusajs/utils" +import { + SymbolInputReference, + SymbolWorkflowHook, + SymbolWorkflowStep, + SymbolWorkflowStepResponse, + SymbolWorkflowStepTransformer, +} from "./symbol" + +async function resolveProperty(property, transactionContext) { + const { invoke: invokeRes } = transactionContext + + if (property?.__type === SymbolInputReference) { + return transactionContext.payload + } else if (property?.__type === SymbolWorkflowStepTransformer) { + return await property.__resolver(transactionContext) + } else if (property?.__type === SymbolWorkflowHook) { + return await property.__value(transactionContext) + } else if (property?.__type === SymbolWorkflowStep) { + const output = invokeRes[property.__step__]?.output + if (output?.__type === SymbolWorkflowStepResponse) { + return output.output + } + + return output + } else if (property?.__type === SymbolWorkflowStepResponse) { + return property.output + } else { + return property + } +} + +export async function resolveValue(input, transactionContext) { + const unwrapInput = async ( + inputTOUnwrap: Record, + parentRef: any + ) => { + if (inputTOUnwrap == null) { + return inputTOUnwrap + } + + if (Array.isArray(inputTOUnwrap)) { + return await promiseAll( + inputTOUnwrap.map((i) => unwrapInput(i, transactionContext)) + ) + } + + if (typeof inputTOUnwrap !== "object") { + return inputTOUnwrap + } + + for (const key of Object.keys(inputTOUnwrap)) { + parentRef[key] = await resolveProperty( + inputTOUnwrap[key], + transactionContext + ) + + if (typeof parentRef[key] === "object") { + await unwrapInput(parentRef[key], parentRef[key]) + } + } + + return parentRef + } + + const result = input?.__type + ? await resolveProperty(input, transactionContext) + : await unwrapInput(input, {}) + + return result && JSON.parse(JSON.stringify(result)) +} diff --git a/packages/workflows/src/utils/composer/helpers/step-response.ts b/packages/workflows/src/utils/composer/helpers/step-response.ts new file mode 100644 index 0000000000000..c9d9bf55cbbd9 --- /dev/null +++ b/packages/workflows/src/utils/composer/helpers/step-response.ts @@ -0,0 +1,32 @@ +import { SymbolWorkflowStepResponse } from "./symbol" + +export class StepResponse { + readonly #__type = SymbolWorkflowStepResponse + readonly #output: TOutput + readonly #compensateInput?: TCompensateInput + + constructor(output: TOutput, compensateInput?: TCompensateInput) { + this.#output = output + this.#compensateInput = (compensateInput ?? output) as TCompensateInput + } + + get __type() { + return this.#__type + } + + get output(): TOutput { + return this.#output + } + + get compensateInput(): TCompensateInput { + return this.#compensateInput as TCompensateInput + } + + toJSON() { + return { + __type: this.#__type, + output: this.#output, + compensateInput: this.#compensateInput, + } + } +} diff --git a/packages/workflows/src/utils/composer/helpers/symbol.ts b/packages/workflows/src/utils/composer/helpers/symbol.ts new file mode 100644 index 0000000000000..8ec8177d7f28d --- /dev/null +++ b/packages/workflows/src/utils/composer/helpers/symbol.ts @@ -0,0 +1,12 @@ +export const SymbolMedusaWorkflowComposerContext = Symbol.for( + "MedusaWorkflowComposerContext" +) +export const SymbolInputReference = Symbol.for("WorkflowInputReference") +export const SymbolWorkflowStep = Symbol.for("WorkflowStep") +export const SymbolWorkflowHook = Symbol.for("WorkflowHook") +export const SymbolWorkflowWorkflowData = Symbol.for("WorkflowWorkflowData") +export const SymbolWorkflowStepResponse = Symbol.for("WorkflowStepResponse") +export const SymbolWorkflowStepBind = Symbol.for("WorkflowStepBind") +export const SymbolWorkflowStepTransformer = Symbol.for( + "WorkflowStepTransformer" +) diff --git a/packages/workflows/src/utils/composer/hook.ts b/packages/workflows/src/utils/composer/hook.ts new file mode 100644 index 0000000000000..e3aa12b093749 --- /dev/null +++ b/packages/workflows/src/utils/composer/hook.ts @@ -0,0 +1,43 @@ +import { + resolveValue, + SymbolMedusaWorkflowComposerContext, + SymbolWorkflowHook, +} from "./helpers" +import { + CreateWorkflowComposerContext, + StepExecutionContext, + WorkflowData, +} from "./type" + +export function hook(name: string, value: any): WorkflowData { + const hookBinder = ( + global[SymbolMedusaWorkflowComposerContext] as CreateWorkflowComposerContext + ).hookBinder + + return hookBinder(name, function (context) { + return { + __value: async function (transactionContext) { + const executionContext: StepExecutionContext = { + container: transactionContext.container, + metadata: transactionContext.metadata, + context: transactionContext.context, + } + + const allValues = await resolveValue(value, transactionContext) + const stepValue = allValues + ? JSON.parse(JSON.stringify(allValues)) + : allValues + + let finalResult + const functions = context.hooksCallback_[name] + for (let i = 0; i < functions.length; i++) { + const fn = functions[i] + const arg = i === 0 ? stepValue : finalResult + finalResult = await fn.apply(fn, [arg, executionContext]) + } + return finalResult + }, + __type: SymbolWorkflowHook, + } + }) +} diff --git a/packages/workflows/src/utils/composer/index.ts b/packages/workflows/src/utils/composer/index.ts new file mode 100644 index 0000000000000..485d8451cc6f4 --- /dev/null +++ b/packages/workflows/src/utils/composer/index.ts @@ -0,0 +1,9 @@ +export * from "./create-step" +export * from "./create-workflow" +export * from "./hook" +export * from "./parallelize" +export * from "./helpers/resolve-value" +export * from "./helpers/symbol" +export * from "./helpers/step-response" +export * from "./transform" +export * from "./type" diff --git a/packages/workflows/src/utils/composer/parallelize.ts b/packages/workflows/src/utils/composer/parallelize.ts new file mode 100644 index 0000000000000..f1cf22a052f07 --- /dev/null +++ b/packages/workflows/src/utils/composer/parallelize.ts @@ -0,0 +1,58 @@ +import { CreateWorkflowComposerContext, WorkflowData } from "./type" +import { SymbolMedusaWorkflowComposerContext } from "./helpers" + +/** + * Parallelize multiple steps. + * The steps will be run in parallel. The result of each step will be returned as part of the result array. + * Each StepResult can be accessed from the resulted array in the order they were passed to the parallelize function. + * + * @param steps + * + * @example + * ```ts + * import { createWorkflow, WorkflowData, parallelize } from "@medusajs/workflows" + * import { createProductStep, getProductStep, createPricesStep, attachProductToSalesChannelStep } from "./steps" + * + * interface MyWorkflowData { + * title: string + * } + * + * const myWorkflow = createWorkflow("my-workflow", (input: WorkflowData) => { + * const product = createProductStep(input) + * + * const [prices, productSalesChannel] = parallelize( + * createPricesStep(product), + * attachProductToSalesChannelStep(product) + * ) + * + * const id = product.id + * return getProductStep(product.id) + * }) + */ +export function parallelize( + ...steps: TResult +): TResult { + if (!global[SymbolMedusaWorkflowComposerContext]) { + throw new Error( + "parallelize must be used inside a createWorkflow definition" + ) + } + + const parallelizeBinder = ( + global[SymbolMedusaWorkflowComposerContext] as CreateWorkflowComposerContext + ).parallelizeBinder + + const resultSteps = steps.map((step) => step) + + return parallelizeBinder(function ( + this: CreateWorkflowComposerContext + ) { + const stepOntoMerge = steps.shift()! + this.flow.mergeActions( + stepOntoMerge.__step__, + ...steps.map((step) => step.__step__) + ) + + return resultSteps as unknown as TResult + }) +} diff --git a/packages/workflows/src/utils/composer/transform.ts b/packages/workflows/src/utils/composer/transform.ts new file mode 100644 index 0000000000000..42648019e6b2f --- /dev/null +++ b/packages/workflows/src/utils/composer/transform.ts @@ -0,0 +1,133 @@ +import { resolveValue, SymbolWorkflowStepTransformer } from "./helpers" +import { StepExecutionContext, WorkflowData } from "./type" +import { proxify } from "./helpers/proxy" + +type Func1 = ( + input: T extends WorkflowData + ? U + : T extends object + ? { [K in keyof T]: T[K] extends WorkflowData ? U : T[K] } + : {}, + context: StepExecutionContext +) => U | Promise + +type Func = (input: T, context: StepExecutionContext) => U | Promise + +// prettier-ignore +// eslint-disable-next-line max-len +export function transform( + values: T, + ...func: + | [Func1] +): WorkflowData + +// prettier-ignore +// eslint-disable-next-line max-len +export function transform( + values: T, + ...func: + | [Func1] + | [Func1, Func] +): WorkflowData + +// prettier-ignore +// eslint-disable-next-line max-len +export function transform( + values: T, + ...func: + | [Func1] + | [Func1, Func] + | [Func1, Func, Func] +): WorkflowData + +// prettier-ignore +// eslint-disable-next-line max-len +export function transform( + values: T, + ...func: + | [Func1] + | [Func1, Func] + | [Func1, Func, Func] + | [Func1, Func, Func, Func] +): WorkflowData + +// prettier-ignore +// eslint-disable-next-line max-len +export function transform( + values: T, + ...func: + | [Func1] + | [Func1, Func] + | [Func1, Func, Func] + | [Func1, Func, Func, Func] + | [Func1, Func, Func, Func, Func] +): WorkflowData + +// prettier-ignore +// eslint-disable-next-line max-len +export function transform( + values: T, + ...func: + | [Func1] + | [Func1, Func] + | [Func1, Func, Func] + | [Func1, Func, Func, Func] + | [Func1, Func, Func, Func, Func] + | [Func1, Func, Func, Func, Func, Func] +): WorkflowData + +// prettier-ignore +// eslint-disable-next-line max-len +export function transform( + values: T, + ...func: + | [Func1] + | [Func1, Func] + | [Func1, Func, Func] + | [Func1, Func, Func, Func] + | [Func1, Func, Func, Func, Func] + | [Func1, Func, Func, Func, Func, Func] + | [Func1, Func, Func, Func, Func, Func, Func] +): WorkflowData + +/** + * Transforms the input value(s) using the provided functions. + * Allow to perform transformation on the future result of the step(s) to be passed + * to other steps later on at run time. + * + * @param values + * @param functions + */ +export function transform( + values: any | any[], + ...functions: Function[] +): unknown { + const ret = { + __type: SymbolWorkflowStepTransformer, + __resolver: undefined, + } + + const returnFn = async function (transactionContext): Promise { + const allValues = await resolveValue(values, transactionContext) + const stepValue = allValues + ? JSON.parse(JSON.stringify(allValues)) + : allValues + + let finalResult + for (let i = 0; i < functions.length; i++) { + const fn = functions[i] + const arg = i === 0 ? stepValue : finalResult + + finalResult = await fn.apply(fn, [arg, transactionContext]) + } + + return finalResult + } + + const proxyfiedRet = proxify( + ret as unknown as WorkflowData + ) + proxyfiedRet.__resolver = returnFn as any + + return proxyfiedRet +} diff --git a/packages/workflows/src/utils/composer/type.ts b/packages/workflows/src/utils/composer/type.ts new file mode 100644 index 0000000000000..664f7f182e14a --- /dev/null +++ b/packages/workflows/src/utils/composer/type.ts @@ -0,0 +1,65 @@ +import { + OrchestratorBuilder, + TransactionContext as OriginalWorkflowTransactionContext, + TransactionPayload, + WorkflowHandler, +} from "@medusajs/orchestration" +import { Context, MedusaContainer } from "@medusajs/types" + +export type StepFunctionResult = + (this: CreateWorkflowComposerContext) => TOutput extends [] + ? [ + ...WorkflowData<{ + [K in keyof TOutput]: TOutput[number][K] + }>[] + ] + : WorkflowData<{ [K in keyof TOutput]: TOutput[K] }> + +export type StepFunction = { + (input: { [K in keyof TInput]: WorkflowData }): WorkflowData<{ + [K in keyof TOutput]: TOutput[K] + }> +} & WorkflowDataProperties<{ + [K in keyof TOutput]: TOutput[K] +}> + +export type WorkflowDataProperties = { + __type: Symbol + __step__: string +} + +export type WorkflowData = (T extends object + ? { + [Key in keyof T]: WorkflowData + } + : WorkflowDataProperties) & + WorkflowDataProperties + +export type CreateWorkflowComposerContext = { + hooks_: string[] + hooksCallback_: Record + workflowId: string + flow: OrchestratorBuilder + handlers: WorkflowHandler + stepBinder: ( + fn: StepFunctionResult + ) => WorkflowData + hookBinder: ( + name: string, + fn: Function + ) => WorkflowData + parallelizeBinder: ( + fn: (this: CreateWorkflowComposerContext) => TOutput + ) => TOutput +} + +export interface StepExecutionContext { + container: MedusaContainer + metadata: TransactionPayload["metadata"] + context: Context +} + +export type WorkflowTransactionContext = StepExecutionContext & + OriginalWorkflowTransactionContext & { + invoke: { [key: string]: { output: any } } + }