Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally trigger batched items sequentially to preserve order #1536

Merged
merged 2 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/sweet-suits-kick.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
---

Add option to trigger batched items sequentially, and default to parallel triggering which is faster
14 changes: 11 additions & 3 deletions apps/webapp/app/routes/api.v1.tasks.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,21 @@ import { env } from "~/env.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import { BatchTriggerV2Service } from "~/v3/services/batchTriggerV2.server";
import {
BatchProcessingStrategy,
BatchTriggerV2Service,
} from "~/v3/services/batchTriggerV2.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { z } from "zod";

const { action, loader } = createActionApiRoute(
{
headers: HeadersSchema,
headers: HeadersSchema.extend({
"batch-processing-strategy": BatchProcessingStrategy.nullish(),
}),
body: BatchTriggerTaskV2RequestBody,
allowJWT: true,
maxContentLength: env.BATCH_TASK_PAYLOAD_MAXIMUM_SIZE,
Expand Down Expand Up @@ -52,6 +58,7 @@ const { action, loader } = createActionApiRoute(
"x-trigger-span-parent-as-link": spanParentAsLink,
"x-trigger-worker": isFromWorker,
"x-trigger-client": triggerClient,
"batch-processing-strategy": batchProcessingStrategy,
traceparent,
tracestate,
} = headers;
Expand All @@ -67,6 +74,7 @@ const { action, loader } = createActionApiRoute(
triggerClient,
traceparent,
tracestate,
batchProcessingStrategy,
});

const traceContext =
Expand All @@ -79,7 +87,7 @@ const { action, loader } = createActionApiRoute(
resolveIdempotencyKeyTTL(idempotencyKeyTTL) ??
new Date(Date.now() + 24 * 60 * 60 * 1000 * 30);

const service = new BatchTriggerV2Service();
const service = new BatchTriggerV2Service(batchProcessingStrategy ?? undefined);

try {
const batch = await service.call(authentication.environment, body, {
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/routes/api.v3.runs.$runId.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export const loader = createLoaderApiRoute(
findResource: (params, auth) => {
return ApiRetrieveRunPresenter.findRun(params.runId, auth.environment);
},
shouldRetryNotFound: true,
authorization: {
action: "read",
resource: (run) => ({
Expand Down
7 changes: 6 additions & 1 deletion apps/webapp/app/services/routeBuilders/apiBuilder.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type ApiKeyRouteBuilderOptions<
params: TParamsSchema extends z.AnyZodObject ? z.infer<TParamsSchema> : undefined,
authentication: ApiAuthenticationResultSuccess
) => Promise<TResource | undefined>;
shouldRetryNotFound?: boolean;
authorization?: {
action: AuthorizationAction;
resource: (
Expand Down Expand Up @@ -81,6 +82,7 @@ export function createLoaderApiRoute<
corsStrategy = "none",
authorization,
findResource,
shouldRetryNotFound,
} = options;

if (corsStrategy !== "none" && request.method.toUpperCase() === "OPTIONS") {
Expand Down Expand Up @@ -162,7 +164,10 @@ export function createLoaderApiRoute<
if (!resource) {
return await wrapResponse(
request,
json({ error: "Not found" }, { status: 404 }),
json(
{ error: "Not found" },
{ status: 404, headers: { "x-should-retry": shouldRetryNotFound ? "true" : "false" } }
),
corsStrategy !== "none"
);
}
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ function getWorkerQueue() {
priority: 0,
maxAttempts: 5,
handler: async (payload, job) => {
const service = new BatchTriggerV2Service();
const service = new BatchTriggerV2Service(payload.strategy);

await service.processBatchTaskRun(payload);
},
Expand Down
37 changes: 28 additions & 9 deletions apps/webapp/app/v3/services/batchTriggerV2.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
parsePacket,
} from "@trigger.dev/core/v3";
import { BatchTaskRun, Prisma, TaskRunAttempt } from "@trigger.dev/database";
import { $transaction, PrismaClientOrTransaction } from "~/db.server";
import { $transaction, prisma, PrismaClientOrTransaction } from "~/db.server";
import { env } from "~/env.server";
import { batchTaskRunItemStatusForRunStatus } from "~/models/taskRun.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
Expand All @@ -25,12 +25,10 @@ import { z } from "zod";

const PROCESSING_BATCH_SIZE = 50;
const ASYNC_BATCH_PROCESS_SIZE_THRESHOLD = 20;
const MAX_ATTEMPTS = 10;

const BatchProcessingStrategy = z.enum(["sequential", "parallel"]);

type BatchProcessingStrategy = z.infer<typeof BatchProcessingStrategy>;

const CURRENT_STRATEGY: BatchProcessingStrategy = "parallel";
export const BatchProcessingStrategy = z.enum(["sequential", "parallel"]);
export type BatchProcessingStrategy = z.infer<typeof BatchProcessingStrategy>;

export const BatchProcessingOptions = z.object({
batchId: z.string(),
Expand All @@ -52,6 +50,17 @@ export type BatchTriggerTaskServiceOptions = {
};

export class BatchTriggerV2Service extends BaseService {
private _batchProcessingStrategy: BatchProcessingStrategy;

constructor(
batchProcessingStrategy?: BatchProcessingStrategy,
protected readonly _prisma: PrismaClientOrTransaction = prisma
) {
super(_prisma);

this._batchProcessingStrategy = batchProcessingStrategy ?? "parallel";
}

public async call(
environment: AuthenticatedEnvironment,
body: BatchTriggerTaskV2RequestBody,
Expand Down Expand Up @@ -452,14 +461,14 @@ export class BatchTriggerV2Service extends BaseService {
},
});

switch (CURRENT_STRATEGY) {
switch (this._batchProcessingStrategy) {
case "sequential": {
await this.#enqueueBatchTaskRun({
batchId: batch.id,
processingId: batchId,
range: { start: 0, count: PROCESSING_BATCH_SIZE },
attemptCount: 0,
strategy: CURRENT_STRATEGY,
strategy: this._batchProcessingStrategy,
});

break;
Expand All @@ -480,7 +489,7 @@ export class BatchTriggerV2Service extends BaseService {
processingId: `${index}`,
range,
attemptCount: 0,
strategy: CURRENT_STRATEGY,
strategy: this._batchProcessingStrategy,
},
tx
)
Expand Down Expand Up @@ -539,6 +548,16 @@ export class BatchTriggerV2Service extends BaseService {

const $attemptCount = options.attemptCount + 1;

// Add early return if max attempts reached
if ($attemptCount > MAX_ATTEMPTS) {
logger.error("[BatchTriggerV2][processBatchTaskRun] Max attempts reached", {
options,
attemptCount: $attemptCount,
});
// You might want to update the batch status to failed here
return;
}

const batch = await this._prisma.batchTaskRun.findFirst({
where: { id: options.batchId },
include: {
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ services:
- 6379:6379

electric:
image: electricsql/electric:0.8.1
image: electricsql/electric:0.9.4
restart: always
environment:
DATABASE_URL: postgresql://postgres:postgres@database:5432/postgres?sslmode=disable
Expand Down
2 changes: 1 addition & 1 deletion internal-packages/testcontainers/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export async function createElectricContainer(
network.getName()
)}:5432/${postgresContainer.getDatabase()}?sslmode=disable`;

const container = await new GenericContainer("electricsql/electric:0.8.1")
const container = await new GenericContainer("electricsql/electric:0.9.4")
.withExposedPorts(3000)
.withNetwork(network)
.withEnvironment({
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/v3/apiClient/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export type ClientTriggerOptions = {
export type ClientBatchTriggerOptions = ClientTriggerOptions & {
idempotencyKey?: string;
idempotencyKeyTTL?: string;
processingStrategy?: "parallel" | "sequential";
};

export type TriggerRequestOptions = ZodFetchOptions & {
Expand Down Expand Up @@ -239,6 +240,7 @@ export class ApiClient {
headers: this.#getHeaders(clientOptions?.spanParentAsLink ?? false, {
"idempotency-key": clientOptions?.idempotencyKey,
"idempotency-key-ttl": clientOptions?.idempotencyKeyTTL,
"batch-processing-strategy": clientOptions?.processingStrategy,
}),
body: JSON.stringify(body),
},
Expand Down
29 changes: 28 additions & 1 deletion packages/core/src/v3/types/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,8 @@ export interface Task<TIdentifier extends string, TInput = void, TOutput = any>
* ```
*/
batchTriggerAndWait: (
items: Array<BatchTriggerAndWaitItem<TInput>>
items: Array<BatchTriggerAndWaitItem<TInput>>,
options?: BatchTriggerAndWaitOptions
) => Promise<BatchResult<TIdentifier, TOutput>>;
}

Expand Down Expand Up @@ -781,6 +782,32 @@ export type TriggerAndWaitOptions = Omit<TriggerOptions, "idempotencyKey" | "ide
export type BatchTriggerOptions = {
idempotencyKey?: IdempotencyKey | string | string[];
idempotencyKeyTTL?: string;

/**
* When true, triggers tasks sequentially in batch order. This ensures ordering but may be slower,
* especially for large batches.
*
* When false (default), triggers tasks in parallel for better performance, but order is not guaranteed.
*
* Note: This only affects the order of run creation, not the actual task execution.
*
* @default false
*/
triggerSequentially?: boolean;
};

export type BatchTriggerAndWaitOptions = {
/**
* When true, triggers tasks sequentially in batch order. This ensures ordering but may be slower,
* especially for large batches.
*
* When false (default), triggers tasks in parallel for better performance, but order is not guaranteed.
*
* Note: This only affects the order of run creation, not the actual task execution.
*
* @default false
*/
triggerSequentially?: boolean;
};

export type TaskMetadataWithFunctions = TaskMetadata & {
Expand Down
Loading
Loading