Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(workflow): process all activation jobs as a single batch #1488

Merged
merged 19 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions docs/activation-in-debug-mode.mermaid
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,29 @@ sequenceDiagram
end
Core->>-MT: Respond with Activation
MT->>MT: Decode Payloads
loop patches, signals, updates, completions, queries as jobs
MT->>VM: Activate(jobs)
MT->>+WT: Run Workflow Activation

WT->>VM: Update Activator (now, WorkflowInfo, SDK flags, patches)

alt "Single Batch mode"
WT->>VM: Activate(queries)
VM->>VM: Run Microtasks
MT->>VM: Try Unblock Conditions
WT->>VM: Try Unblock Conditions
else Legacy "Multi Batches mode"
loop [signals, updates+completions] as jobs
WT->>VM: Activate(jobs)
VM->>VM: Run Microtasks
WT->>VM: Try Unblock Conditions
end
end

MT->>VM: Collect Commands
MT->>MT: Encode Payloads
MT->>+VM: Collect Sink Calls
VM-->>-MT: Respond with Sink Calls
MT->>MT: Run Sink Functions
MT->>Core: Complete Activation
opt Completed Workflow Task
opt Completed Workflow Task
Core->>Server: Complete Workflow Task
end

16 changes: 13 additions & 3 deletions docs/activation.mermaid
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,21 @@ sequenceDiagram
Core->>-MT: Respond with Activation
MT->>MT: Decode Payloads
MT->>+WT: Run Workflow Activation
loop patches, signals, updates, completions, queries as jobs
WT->>VM: Activate(jobs)

WT->>VM: Update Activator (now, WorkflowInfo, SDK flags, patches)

alt "Single Batch mode"
WT->>VM: Activate(queries)
VM->>VM: Run Microtasks
WT->>VM: Try Unblock Conditions
else Legacy "Multi Batches mode"
loop [signals, updates+completions] as jobs
WT->>VM: Activate(jobs)
VM->>VM: Run Microtasks
WT->>VM: Try Unblock Conditions
end
end

WT->>VM: Collect Commands
WT-->>-MT: Respond to Activation
MT->>MT: Encode Payloads
Expand All @@ -28,6 +38,6 @@ sequenceDiagram
WT-->>-MT: Respond with Sink Calls
MT->>MT: Run Sink Functions
MT->>Core: Complete Activation
opt Completed Workflow Task
opt Completed Workflow Task
Core->>Server: Complete Workflow Task
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
{
"events": [
{
"eventId": "1",
"eventTime": "2024-08-14T03:50:59.998228Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
"taskId": "1048642",
"workflowExecutionStartedEventAttributes": {
"workflowType": {
"name": "canCompleteUpdateAfterWorkflowReturns"
},
"taskQueue": {
"name": "test",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"workflowExecutionTimeout": "0s",
"workflowRunTimeout": "0s",
"workflowTaskTimeout": "10s",
"originalExecutionRunId": "b9d2c3ad-e03e-49e4-857e-1939d9d32f5e",
"identity": "temporal-cli:jwatkins@JamesMBTemporal",
"firstExecutionRunId": "b9d2c3ad-e03e-49e4-857e-1939d9d32f5e",
"attempt": 1,
"firstWorkflowTaskBackoff": "0s",
"header": {},
"workflowId": "eb5f6727-7fb3-4f48-aba2-1bd7d46823a1"
}
},
{
"eventId": "2",
"eventTime": "2024-08-14T03:50:59.998393Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
"taskId": "1048643",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "test",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"startToCloseTimeout": "10s",
"attempt": 1
}
},
{
"eventId": "3",
"eventTime": "2024-08-14T03:51:24.737259Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
"taskId": "1048648",
"workflowTaskStartedEventAttributes": {
"scheduledEventId": "2",
"identity": "13971@JamesMBTemporal",
"requestId": "f8a583b6-d423-45b7-a34d-b3c8e822d10f",
"historySizeBytes": "293",
"workerVersion": {
"buildId": "@temporalio/[email protected]+8983e4c58e21c0f316606d45c034d286695e7f31b7693b88a8ca3c102fce506c"
}
}
},
{
"eventId": "4",
"eventTime": "2024-08-14T03:51:24.779886Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
"taskId": "1048652",
"workflowTaskCompletedEventAttributes": {
"scheduledEventId": "2",
"startedEventId": "3",
"identity": "13971@JamesMBTemporal",
"workerVersion": {
"buildId": "@temporalio/[email protected]+8983e4c58e21c0f316606d45c034d286695e7f31b7693b88a8ca3c102fce506c"
},
"sdkMetadata": {
"coreUsedFlags": [2, 1]
},
"meteringMetadata": {}
}
},
{
"eventId": "5",
"eventTime": "2024-08-14T03:51:24.779952Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED",
"taskId": "1048653",
"workflowExecutionUpdateAcceptedEventAttributes": {
"protocolInstanceId": "fb28b772-4538-45a4-99f0-550fae0b7668",
"acceptedRequestMessageId": "fb28b772-4538-45a4-99f0-550fae0b7668/request",
"acceptedRequestSequencingEventId": "2",
"acceptedRequest": {
"meta": {
"updateId": "fb28b772-4538-45a4-99f0-550fae0b7668",
"identity": "temporal-cli:jwatkins@JamesMBTemporal"
},
"input": {
"header": {},
"name": "doneUpdate"
}
}
}
},
{
"eventId": "6",
"eventTime": "2024-08-14T03:51:24.779982Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED",
"taskId": "1048654",
"workflowExecutionCompletedEventAttributes": {
"result": {
"payloads": [
{
"metadata": {
"encoding": "YmluYXJ5L251bGw="
}
}
]
},
"workflowTaskCompletedEventId": "4"
}
}
]
}
15 changes: 15 additions & 0 deletions packages/test/src/helpers-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ import {
DefaultLogger,
LogEntry,
LogLevel,
ReplayWorkerOptions,
Runtime,
WorkerOptions,
WorkflowBundle,
bundleWorkflowCode,
makeTelemetryFilterString,
} from '@temporalio/worker';
import * as workflow from '@temporalio/workflow';
import { temporal } from '@temporalio/proto';
import { ConnectionInjectorInterceptor } from './activities/interceptors';
import {
Worker,
Expand Down Expand Up @@ -105,6 +107,7 @@ export function makeTestFunction(opts: {
export interface Helpers {
taskQueue: string;
createWorker(opts?: Partial<WorkerOptions>): Promise<Worker>;
runReplayHistory(opts: Partial<ReplayWorkerOptions>, history: temporal.api.history.v1.IHistory): Promise<void>;
executeWorkflow<T extends () => Promise<any>>(workflowType: T): Promise<workflow.WorkflowResultType<T>>;
executeWorkflow<T extends workflow.Workflow>(
fn: T,
Expand Down Expand Up @@ -137,6 +140,18 @@ export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvir
...opts,
});
},
async runReplayHistory(
opts: Partial<ReplayWorkerOptions>,
history: temporal.api.history.v1.IHistory
): Promise<void> {
await Worker.runReplayHistory(
{
workflowBundle: t.context.workflowBundle,
...opts,
},
history
);
},
async executeWorkflow(
fn: workflow.Workflow,
opts?: Omit<WorkflowStartOptions, 'taskQueue' | 'workflowId'>
Expand Down
14 changes: 14 additions & 0 deletions packages/test/src/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as fs from 'fs/promises';
import * as net from 'net';
import path from 'path';
import StackUtils from 'stack-utils';
Expand Down Expand Up @@ -128,6 +129,7 @@ export const bundlerOptions = {
'async-retry',
'uuid',
'net',
'fs/promises',
],
};

Expand Down Expand Up @@ -293,3 +295,15 @@ export function asSdkLoggerSink(
},
};
}

export async function getHistories(fname: string): Promise<iface.temporal.api.history.v1.History> {
const isJson = fname.endsWith('json');
const fpath = path.resolve(__dirname, `../history_files/${fname}`);
if (isJson) {
const hist = await fs.readFile(fpath, 'utf8');
return JSON.parse(hist);
} else {
const hist = await fs.readFile(fpath);
return iface.temporal.api.history.v1.History.decode(hist);
}
}
Loading
Loading