Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into feat/batch-completion…
Browse files Browse the repository at this point in the history
…-checker
  • Loading branch information
nicktrn committed Dec 2, 2024
2 parents bf39997 + 67592ec commit d16f736
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 35 deletions.
5 changes: 5 additions & 0 deletions .changeset/modern-nails-refuse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Multiple streams can now be consumed simultaneously
80 changes: 52 additions & 28 deletions packages/core/src/v3/apiClient/runStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
IOPacket,
parsePacket,
} from "../utils/ioSerialization.js";
import { ApiError } from "./errors.js";
import { ApiClient } from "./index.js";
import { AsyncIterableStream, createAsyncIterableStream, zodShapeStream } from "./stream.js";
import { EventSourceParserStream } from "eventsource-parser/stream";
Expand Down Expand Up @@ -97,7 +98,7 @@ export function runShapeStream<TRunTypes extends AnyRunTypes>(

// First, define interfaces for the stream handling
export interface StreamSubscription {
subscribe(onChunk: (chunk: unknown) => Promise<void>): Promise<() => void>;
subscribe(): Promise<ReadableStream<unknown>>;
}

export interface StreamSubscriptionFactory {
Expand All @@ -111,33 +112,38 @@ export class SSEStreamSubscription implements StreamSubscription {
private options: { headers?: Record<string, string>; signal?: AbortSignal }
) {}

async subscribe(onChunk: (chunk: unknown) => Promise<void>): Promise<() => void> {
const response = await fetch(this.url, {
async subscribe(): Promise<ReadableStream<unknown>> {
return fetch(this.url, {
headers: {
Accept: "text/event-stream",
...this.options.headers,
},
signal: this.options.signal,
});

if (!response.body) {
throw new Error("No response body");
}

const reader = response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream())
.getReader();

while (true) {
const { done, value } = await reader.read();

if (done) break;
}).then((response) => {
if (!response.ok) {
throw ApiError.generate(
response.status,
{},
"Could not subscribe to stream",
Object.fromEntries(response.headers)
);
}

await onChunk(safeParseJSON(value.data));
}
if (!response.body) {
throw new Error("No response body");
}

return () => reader.cancel();
return response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream())
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
controller.enqueue(safeParseJSON(chunk.data));
},
})
);
});
}
}

Expand Down Expand Up @@ -254,13 +260,31 @@ export class RunSubscription<TRunTypes extends AnyRunTypes> {
this.options.client?.baseUrl
);

await subscription.subscribe(async (chunk) => {
controller.enqueue({
type: streamKey,
chunk: chunk as TStreams[typeof streamKey],
run,
} as StreamPartResult<RunShape<TRunTypes>, TStreams>);
});
const stream = await subscription.subscribe();

// Create the pipeline and start it
stream
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
controller.enqueue({
type: streamKey,
chunk: chunk as TStreams[typeof streamKey],
run,
} as StreamPartResult<RunShape<TRunTypes>, TStreams>);
},
})
)
.pipeTo(
new WritableStream({
write(chunk) {
controller.enqueue(chunk);
},
})
)
.catch((error) => {
console.error(`Error in stream ${streamKey}:`, error);
});
}
}
}
Expand Down
16 changes: 11 additions & 5 deletions packages/core/test/runStream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,23 @@ import {
import type { SubscribeRunRawShape } from "../src/v3/schemas/api.js";

// Test implementations
// Update TestStreamSubscription to return a ReadableStream
class TestStreamSubscription implements StreamSubscription {
constructor(private chunks: unknown[]) {}

async subscribe(onChunk: (chunk: unknown) => Promise<void>): Promise<() => void> {
for (const chunk of this.chunks) {
await onChunk(chunk);
}
return () => {};
async subscribe(): Promise<ReadableStream<unknown>> {
return new ReadableStream({
start: async (controller) => {
for (const chunk of this.chunks) {
controller.enqueue(chunk);
}
controller.close();
},
});
}
}

// TestStreamSubscriptionFactory can remain the same
class TestStreamSubscriptionFactory implements StreamSubscriptionFactory {
private streams = new Map<string, unknown[]>();

Expand Down
12 changes: 10 additions & 2 deletions scripts/publish-prerelease.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,17 @@ else
fi

# Run your commands

# Run changeset version command and capture its output
echo "Running: pnpm exec changeset version --snapshot $version"
pnpm exec changeset version --snapshot $version
if output=$(pnpm exec changeset version --snapshot $version 2>&1); then
if echo "$output" | grep -q "No unreleased changesets found"; then
echo "No unreleased changesets found. Exiting."
exit 0
fi
else
echo "Error running changeset version command"
exit 1
fi

echo "Running: pnpm run build --filter \"@trigger.dev/*\" --filter \"trigger.dev\""
pnpm run build --filter "@trigger.dev/*" --filter "trigger.dev"
Expand Down

0 comments on commit d16f736

Please sign in to comment.