Skip to content

Commit

Permalink
bug: handle message reconnect better
Browse files Browse the repository at this point in the history
  • Loading branch information
ibuildthecloud committed Dec 13, 2024
1 parent b59082b commit a9d475a
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 27 deletions.
12 changes: 10 additions & 2 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,10 @@ func (e *Emitter) streamEvents(ctx context.Context, run v1.Run, opts WatchOption
if err := e.printParent(ctx, opts.MaxRuns-1, state, run, result); !apierrors.IsNotFound(err) && err != nil {
return err
}
result <- types.Progress{
ReplayComplete: true,
if run.Status.EndTime.IsZero() {
result <- types.Progress{
ReplayComplete: true,
}
}
}

Expand All @@ -451,6 +453,12 @@ func (e *Emitter) streamEvents(ctx context.Context, run v1.Run, opts WatchOption
}
}

if opts.History && !run.Status.EndTime.IsZero() {
result <- types.Progress{
ReplayComplete: true,
}
}

nextRun, err := e.findNextRun(ctx, run, opts)
if err != nil {
return err
Expand Down
3 changes: 1 addition & 2 deletions ui/user/src/lib/services/chat/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ export function newMessageEventSource(
opts?: {
task?: {
id: string;
follow?: boolean;
};
runID?: string;
}
Expand All @@ -182,7 +181,7 @@ export function newMessageEventSource(
if (opts.runID) {
url = `/assistants/${assistant}/tasks/${opts.task.id}/runs/${opts.runID}/events`;
}
return new EventSource(baseURL + `${url}${opts.task.follow ? '?follow=true' : ''}`);
return new EventSource(baseURL + `${url}`);
}
return new EventSource(baseURL + `/assistants/${assistant}/events`);
}
Expand Down
52 changes: 29 additions & 23 deletions ui/user/src/lib/services/chat/thread.svelte.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,50 @@ export class Thread {
pending: boolean = $state(false);
readonly #assistant: string;
readonly #onError: ((error: Error) => void) | undefined;
readonly #es: EventSource;
#es: EventSource;
readonly #progresses: Progress[] = [];

constructor(
assistant: string,
opts?: {
task?: {
id: string;
follow?: boolean;
};
runID?: string;
onError?: (error: Error) => void;
onClose?: () => void;
}
) {
const es = newMessageEventSource(assistant, {
task: opts?.task,
runID: opts?.runID
});
es.onmessage = (e) => {
this.handleMessage(e);
};
es.onopen = () => {
console.log('Message EventSource opened');
};
es.addEventListener('close', () => {
console.log('Message EventSource closed');
opts?.onClose?.();
es.close();
});
es.onerror = (e: Event) => {
if (e.eventPhase === EventSource.CLOSED) {
console.log('Message EventSource closed');
}
};

const reconnect = (): EventSource => {
console.log('Message EventSource initializing');
this.replayComplete = false;
const es = newMessageEventSource(assistant, {
task: opts?.task,
runID: opts?.runID
});
es.onmessage = (e) => {
this.handleMessage(e);
};
es.onopen = () => {
console.log('Message EventSource opened');
};
es.addEventListener('close', () => {
console.log('Message EventSource closed by server');
opts?.onClose?.();
es.close();
this.#es = reconnect()
});
es.onerror = (e: Event) => {
if (e.eventPhase === EventSource.CLOSED) {
console.log('Message EventSource closed');
}
};
return es
}

this.#assistant = assistant;
this.#es = es;
this.#es = reconnect();
this.#onError = opts?.onError;
}

Expand Down

0 comments on commit a9d475a

Please sign in to comment.