Skip to content

Commit

Permalink
Optionally trigger batched items sequentially to preserve order
Browse files Browse the repository at this point in the history
  • Loading branch information
ericallam committed Dec 4, 2024
1 parent 91afa5e commit 2ecf510
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 76 deletions.
6 changes: 6 additions & 0 deletions .changeset/sweet-suits-kick.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
---

Add option to trigger batched items sequentially, and default to parallel triggering which is faster
14 changes: 11 additions & 3 deletions apps/webapp/app/routes/api.v1.tasks.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,21 @@ import { env } from "~/env.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import { BatchTriggerV2Service } from "~/v3/services/batchTriggerV2.server";
import {
BatchProcessingStrategy,
BatchTriggerV2Service,
} from "~/v3/services/batchTriggerV2.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { z } from "zod";

const { action, loader } = createActionApiRoute(
{
headers: HeadersSchema,
headers: HeadersSchema.extend({
"batch-processing-strategy": BatchProcessingStrategy.nullish(),
}),
body: BatchTriggerTaskV2RequestBody,
allowJWT: true,
maxContentLength: env.BATCH_TASK_PAYLOAD_MAXIMUM_SIZE,
Expand Down Expand Up @@ -52,6 +58,7 @@ const { action, loader } = createActionApiRoute(
"x-trigger-span-parent-as-link": spanParentAsLink,
"x-trigger-worker": isFromWorker,
"x-trigger-client": triggerClient,
"batch-processing-strategy": batchProcessingStrategy,
traceparent,
tracestate,
} = headers;
Expand All @@ -67,6 +74,7 @@ const { action, loader } = createActionApiRoute(
triggerClient,
traceparent,
tracestate,
batchProcessingStrategy,
});

const traceContext =
Expand All @@ -79,7 +87,7 @@ const { action, loader } = createActionApiRoute(
resolveIdempotencyKeyTTL(idempotencyKeyTTL) ??
new Date(Date.now() + 24 * 60 * 60 * 1000 * 30);

const service = new BatchTriggerV2Service();
const service = new BatchTriggerV2Service(batchProcessingStrategy ?? undefined);

try {
const batch = await service.call(authentication.environment, body, {
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/routes/api.v3.runs.$runId.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export const loader = createLoaderApiRoute(
findResource: (params, auth) => {
return ApiRetrieveRunPresenter.findRun(params.runId, auth.environment);
},
shouldRetryNotFound: true,
authorization: {
action: "read",
resource: (run) => ({
Expand Down
7 changes: 6 additions & 1 deletion apps/webapp/app/services/routeBuilders/apiBuilder.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type ApiKeyRouteBuilderOptions<
params: TParamsSchema extends z.AnyZodObject ? z.infer<TParamsSchema> : undefined,
authentication: ApiAuthenticationResultSuccess
) => Promise<TResource | undefined>;
shouldRetryNotFound?: boolean;
authorization?: {
action: AuthorizationAction;
resource: (
Expand Down Expand Up @@ -81,6 +82,7 @@ export function createLoaderApiRoute<
corsStrategy = "none",
authorization,
findResource,
shouldRetryNotFound,
} = options;

if (corsStrategy !== "none" && request.method.toUpperCase() === "OPTIONS") {
Expand Down Expand Up @@ -162,7 +164,10 @@ export function createLoaderApiRoute<
if (!resource) {
return await wrapResponse(
request,
json({ error: "Not found" }, { status: 404 }),
json(
{ error: "Not found" },
{ status: 404, headers: { "x-should-retry": shouldRetryNotFound ? "true" : "false" } }
),
corsStrategy !== "none"
);
}
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ function getWorkerQueue() {
priority: 0,
maxAttempts: 5,
handler: async (payload, job) => {
const service = new BatchTriggerV2Service();
const service = new BatchTriggerV2Service(payload.strategy);

await service.processBatchTaskRun(payload);
},
Expand Down
26 changes: 17 additions & 9 deletions apps/webapp/app/v3/services/batchTriggerV2.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
parsePacket,
} from "@trigger.dev/core/v3";
import { BatchTaskRun, Prisma, TaskRunAttempt } from "@trigger.dev/database";
import { $transaction, PrismaClientOrTransaction } from "~/db.server";
import { $transaction, prisma, PrismaClientOrTransaction } from "~/db.server";
import { env } from "~/env.server";
import { batchTaskRunItemStatusForRunStatus } from "~/models/taskRun.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
Expand All @@ -26,11 +26,8 @@ import { z } from "zod";
const PROCESSING_BATCH_SIZE = 50;
const ASYNC_BATCH_PROCESS_SIZE_THRESHOLD = 20;

const BatchProcessingStrategy = z.enum(["sequential", "parallel"]);

type BatchProcessingStrategy = z.infer<typeof BatchProcessingStrategy>;

const CURRENT_STRATEGY: BatchProcessingStrategy = "parallel";
export const BatchProcessingStrategy = z.enum(["sequential", "parallel"]);
export type BatchProcessingStrategy = z.infer<typeof BatchProcessingStrategy>;

export const BatchProcessingOptions = z.object({
batchId: z.string(),
Expand All @@ -52,6 +49,17 @@ export type BatchTriggerTaskServiceOptions = {
};

export class BatchTriggerV2Service extends BaseService {
private _batchProcessingStrategy: BatchProcessingStrategy;

constructor(
batchProcessingStrategy?: BatchProcessingStrategy,
protected readonly _prisma: PrismaClientOrTransaction = prisma
) {
super(_prisma);

this._batchProcessingStrategy = batchProcessingStrategy ?? "parallel";
}

public async call(
environment: AuthenticatedEnvironment,
body: BatchTriggerTaskV2RequestBody,
Expand Down Expand Up @@ -452,14 +460,14 @@ export class BatchTriggerV2Service extends BaseService {
},
});

switch (CURRENT_STRATEGY) {
switch (this._batchProcessingStrategy) {
case "sequential": {
await this.#enqueueBatchTaskRun({
batchId: batch.id,
processingId: batchId,
range: { start: 0, count: PROCESSING_BATCH_SIZE },
attemptCount: 0,
strategy: CURRENT_STRATEGY,
strategy: this._batchProcessingStrategy,
});

break;
Expand All @@ -480,7 +488,7 @@ export class BatchTriggerV2Service extends BaseService {
processingId: `${index}`,
range,
attemptCount: 0,
strategy: CURRENT_STRATEGY,
strategy: this._batchProcessingStrategy,
},
tx
)
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/v3/apiClient/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export type ClientTriggerOptions = {
export type ClientBatchTriggerOptions = ClientTriggerOptions & {
idempotencyKey?: string;
idempotencyKeyTTL?: string;
processingStrategy?: "parallel" | "sequential";
};

export type TriggerRequestOptions = ZodFetchOptions & {
Expand Down Expand Up @@ -239,6 +240,7 @@ export class ApiClient {
headers: this.#getHeaders(clientOptions?.spanParentAsLink ?? false, {
"idempotency-key": clientOptions?.idempotencyKey,
"idempotency-key-ttl": clientOptions?.idempotencyKeyTTL,
"batch-processing-strategy": clientOptions?.processingStrategy,
}),
body: JSON.stringify(body),
},
Expand Down
29 changes: 28 additions & 1 deletion packages/core/src/v3/types/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,8 @@ export interface Task<TIdentifier extends string, TInput = void, TOutput = any>
* ```
*/
batchTriggerAndWait: (
items: Array<BatchTriggerAndWaitItem<TInput>>
items: Array<BatchTriggerAndWaitItem<TInput>>,
options?: BatchTriggerAndWaitOptions
) => Promise<BatchResult<TIdentifier, TOutput>>;
}

Expand Down Expand Up @@ -781,6 +782,32 @@ export type TriggerAndWaitOptions = Omit<TriggerOptions, "idempotencyKey" | "ide
export type BatchTriggerOptions = {
idempotencyKey?: IdempotencyKey | string | string[];
idempotencyKeyTTL?: string;

/**
* When true, triggers tasks sequentially in batch order. This ensures ordering but may be slower,
* especially for large batches.
*
* When false (default), triggers tasks in parallel for better performance, but order is not guaranteed.
*
* Note: This only affects the order of run creation, not the actual task execution.
*
* @default false
*/
triggerSequentially?: boolean;
};

export type BatchTriggerAndWaitOptions = {
/**
* When true, triggers tasks sequentially in batch order. This ensures ordering but may be slower,
* especially for large batches.
*
* When false (default), triggers tasks in parallel for better performance, but order is not guaranteed.
*
* Note: This only affects the order of run creation, not the actual task execution.
*
* @default false
*/
triggerSequentially?: boolean;
};

export type TaskMetadataWithFunctions = TaskMetadata & {
Expand Down
28 changes: 22 additions & 6 deletions packages/trigger-sdk/src/v3/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ import type {
TriggerApiRequestOptions,
TriggerOptions,
AnyTaskRunResult,
BatchTriggerAndWaitOptions,
} from "@trigger.dev/core/v3";

export type {
Expand Down Expand Up @@ -181,7 +182,7 @@ export function createTask<
});
}, params.id);
},
batchTriggerAndWait: async (items) => {
batchTriggerAndWait: async (items, options) => {
const taskMetadata = taskCatalog.getTaskManifest(params.id);

return await batchTriggerAndWait_internal<TIdentifier, TInput, TOutput>(
Expand All @@ -191,6 +192,7 @@ export function createTask<
params.id,
items,
undefined,
options,
undefined,
customQueue
);
Expand Down Expand Up @@ -326,7 +328,7 @@ export function createSchemaTask<
});
}, params.id);
},
batchTriggerAndWait: async (items) => {
batchTriggerAndWait: async (items, options) => {
const taskMetadata = taskCatalog.getTaskManifest(params.id);

return await batchTriggerAndWait_internal<TIdentifier, inferSchemaIn<TSchema>, TOutput>(
Expand All @@ -336,6 +338,7 @@ export function createSchemaTask<
params.id,
items,
parsePayload,
options,
undefined,
customQueue
);
Expand Down Expand Up @@ -469,13 +472,14 @@ export function triggerAndWait<TTask extends AnyTask>(
export async function batchTriggerAndWait<TTask extends AnyTask>(
id: TaskIdentifier<TTask>,
items: Array<BatchItem<TaskPayload<TTask>>>,
options?: BatchTriggerAndWaitOptions,
requestOptions?: ApiRequestOptions
): Promise<BatchResult<TaskIdentifier<TTask>, TaskOutput<TTask>>> {
return await batchTriggerAndWait_internal<
TaskIdentifier<TTask>,
TaskPayload<TTask>,
TaskOutput<TTask>
>("tasks.batchTriggerAndWait()", id, items, undefined, requestOptions);
>("tasks.batchTriggerAndWait()", id, items, undefined, options, requestOptions);
}

/**
Expand Down Expand Up @@ -618,6 +622,7 @@ export async function batchTriggerById<TTask extends AnyTask>(
spanParentAsLink: true,
idempotencyKey: await makeIdempotencyKey(options?.idempotencyKey),
idempotencyKeyTTL: options?.idempotencyKeyTTL,
processingStrategy: options?.triggerSequentially ? "sequential" : undefined,
},
{
name: "batch.trigger()",
Expand Down Expand Up @@ -740,6 +745,7 @@ export async function batchTriggerById<TTask extends AnyTask>(
*/
export async function batchTriggerByIdAndWait<TTask extends AnyTask>(
items: Array<BatchByIdAndWaitItem<InferRunTypes<TTask>>>,
options?: BatchTriggerAndWaitOptions,
requestOptions?: TriggerApiRequestOptions
): Promise<BatchByIdResult<TTask>> {
const ctx = taskContext.ctx;
Expand Down Expand Up @@ -786,7 +792,9 @@ export async function batchTriggerByIdAndWait<TTask extends AnyTask>(
),
dependentAttempt: ctx.attempt.id,
},
{},
{
processingStrategy: options?.triggerSequentially ? "sequential" : undefined,
},
requestOptions
);

Expand Down Expand Up @@ -948,6 +956,7 @@ export async function batchTriggerTasks<TTasks extends readonly AnyTask[]>(
spanParentAsLink: true,
idempotencyKey: await makeIdempotencyKey(options?.idempotencyKey),
idempotencyKeyTTL: options?.idempotencyKeyTTL,
processingStrategy: options?.triggerSequentially ? "sequential" : undefined,
},
{
name: "batch.triggerByTask()",
Expand Down Expand Up @@ -1072,6 +1081,7 @@ export async function batchTriggerAndWaitTasks<TTasks extends readonly AnyTask[]
items: {
[K in keyof TTasks]: BatchByTaskAndWaitItem<TTasks[K]>;
},
options?: BatchTriggerAndWaitOptions,
requestOptions?: TriggerApiRequestOptions
): Promise<BatchByTaskResult<TTasks>> {
const ctx = taskContext.ctx;
Expand Down Expand Up @@ -1118,7 +1128,9 @@ export async function batchTriggerAndWaitTasks<TTasks extends readonly AnyTask[]
),
dependentAttempt: ctx.attempt.id,
},
{},
{
processingStrategy: options?.triggerSequentially ? "sequential" : undefined,
},
requestOptions
);

Expand Down Expand Up @@ -1256,6 +1268,7 @@ async function batchTrigger_internal<TRunTypes extends AnyRunTypes>(
spanParentAsLink: true,
idempotencyKey: await makeIdempotencyKey(options?.idempotencyKey),
idempotencyKeyTTL: options?.idempotencyKeyTTL,
processingStrategy: options?.triggerSequentially ? "sequential" : undefined,
},
{
name,
Expand Down Expand Up @@ -1377,6 +1390,7 @@ async function batchTriggerAndWait_internal<TIdentifier extends string, TPayload
id: TIdentifier,
items: Array<BatchTriggerAndWaitItem<TPayload>>,
parsePayload?: SchemaParseFn<TPayload>,
options?: BatchTriggerAndWaitOptions,
requestOptions?: ApiRequestOptions,
queue?: QueueOptions
): Promise<BatchResult<TIdentifier, TOutput>> {
Expand Down Expand Up @@ -1420,7 +1434,9 @@ async function batchTriggerAndWait_internal<TIdentifier extends string, TPayload
),
dependentAttempt: ctx.attempt.id,
},
{},
{
processingStrategy: options?.triggerSequentially ? "sequential" : undefined,
},
requestOptions
);

Expand Down
Loading

0 comments on commit 2ecf510

Please sign in to comment.