Skip to content

Commit

Permalink
fix: add polling for pipeline status, error info ui msgs (#137)
Browse files Browse the repository at this point in the history
* [do-not-merge] added logs for validation

* add polling

* fix publishing on error

* add error info message
  • Loading branch information
vcashwin authored Jan 31, 2025
1 parent 081d504 commit 90c7b5c
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 27 deletions.
20 changes: 18 additions & 2 deletions apps/app/app/api/pipelines/validation.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"use server";
import { upsertStream } from "../streams/upsert";
import { createServerClient } from "@repo/supabase";
import { createServerClient } from "@repo/supabase/server";
import { serverConfig } from "@/lib/serverEnv";
import { app } from "@/lib/env";

Expand Down Expand Up @@ -72,6 +73,8 @@ export async function createSmokeTestStream(pipelineId: string) {
"did:privy:cm4x2cuiw007lh8fcj34919fu"
); // Using system user ID

console.log("Stream created successfully:", stream);

if (streamError) {
console.error("Error creating smoke test stream:", streamError);
throw new Error(streamError);
Expand Down Expand Up @@ -177,6 +180,10 @@ export async function pollStreamStatus(stream: any) {
continue;
}
console.error("Polling error:", error);
console.log(
"Polling error::Stream status map:",
global.streamStatusMap
);
global.streamStatusMap.delete(streamId);
break;
}
Expand Down Expand Up @@ -209,13 +216,22 @@ export async function getAndStoreStreamStatus(
}

const data: StreamStatus = await response.json();
console.log("getAndStoreStreamStatus::Stream status data:", data);
console.log(
"getAndStoreStreamStatus::Stream status map:",
global.streamStatusMap
);
global.streamStatusMap.set(streamId, data);

return data;
}

export async function getStoredStreamStatus(
streamId: string
): Promise<StreamStatus | undefined> {
): Promise<any | undefined> {
if (!global.streamStatusMap) {
console.log("getStoredStreamStatus::Stream status map is undefined");
return undefined;
}
return global.streamStatusMap.get(streamId);
}
52 changes: 27 additions & 25 deletions apps/app/components/pipeline/validate-pipeline.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"use client";
import { publishPipeline } from "@/app/api/pipelines/edit";
import { getStoredStreamStatus } from "@/app/api/pipelines/validation";
import { usePrivy } from "@privy-io/react-auth";
import { Button } from "@repo/design-system/components/ui/button";
import {
Expand All @@ -22,31 +23,20 @@ function usePipelineStatus(streamId: string) {
const [isLoading, setIsLoading] = useState(true);

useEffect(() => {
let source: EventSource;
const fetchSseMessages = async () => {
source = new EventSource(`/api/streams/${streamId}/sse`);
source.onmessage = (event) => {
const data = JSON.parse(event.data);
setData(data);
};
source.onerror = (error) => {
console.error("Error fetching pipeline status:", error);
source.close();
};
setTimeout(() => {
if (source) {
source.close();
}
const intervalId = setInterval(async () => {
const status = await getStoredStreamStatus(streamId);
const error = status?.inference_status?.last_error;
setData(status);
if (error) {
setIsLoading(false);
}, TIMEOUT_MS);
};
fetchSseMessages();

return () => {
if (source) {
source.close();
clearInterval(intervalId);
}
};
}, 5000);
setTimeout(() => {
clearInterval(intervalId);
setIsLoading(false);
}, TIMEOUT_MS);
return () => clearInterval(intervalId);
}, []);

return { data, isLoading };
Expand Down Expand Up @@ -231,11 +221,14 @@ export default function ValidatePipeline({
streamId: string;
}) {
const router = useRouter();
const { authenticated, user, ready: isAuthLoaded } = usePrivy();
const { user } = usePrivy();
const { data, isLoading } = usePipelineStatus(streamId);
const { status, degradation, outputFps, error } = getPipelineStatus(data);
const isPublishable =
degradation.variant !== "error" && outputFps.variant !== "error" && !error;
degradation.variant !== "error" &&
outputFps.variant !== "error" &&
status.variant !== "info" &&
!error;

return (
<main className="flex-1 p-4">
Expand Down Expand Up @@ -307,6 +300,15 @@ export default function ValidatePipeline({
/>
</div>
)}
{!isLoading && !isPublishable && (
<div className="flex gap-2 items-center">
<Info className="w-4 h-4 text-red-500 dark:text-red-400" />
<span className="text-red-700 dark:text-red-200 text-sm">
The pipeline is not publishable. Please fix any errors or try
again later.
</span>
</div>
)}
</CardContent>
<Separator className="bg-border" />
<CardFooter className="pt-6">
Expand Down

0 comments on commit 90c7b5c

Please sign in to comment.