From e8e80d1863d7ab899ae2bba1b2279007fd266c7e Mon Sep 17 00:00:00 2001 From: Jerron Lim Date: Sun, 2 Jun 2024 11:10:59 +0800 Subject: [PATCH] Show errors that occur during streaming (#43) --- backend/app/core/graph/build.py | 71 +++++++++++++--------- frontend/src/components/Teams/ChatTeam.tsx | 18 +++++- 2 files changed, 58 insertions(+), 31 deletions(-) diff --git a/backend/app/core/graph/build.py b/backend/app/core/graph/build.py index b8a7ba92..f4e1740a 100644 --- a/backend/app/core/graph/build.py +++ b/backend/app/core/graph/build.py @@ -1,3 +1,4 @@ +import asyncio import json from collections import defaultdict, deque from collections.abc import AsyncGenerator @@ -428,34 +429,46 @@ async def generator( for message in messages ] - memory = await AsyncPostgresSaver.from_conn_string(settings.PG_DATABASE_URI) + try: + memory = await AsyncPostgresSaver.from_conn_string(settings.PG_DATABASE_URI) - if team.workflow == "hierarchical": - teams = convert_hierarchical_team_to_dict(team, members) - team_leader = list(teams.keys())[0] - root = create_hierarchical_graph(teams, leader_name=team_leader, memory=memory) - state = { - "messages": formatted_messages, - "team_name": teams[team_leader].name, - "team_members": teams[team_leader].members, - } - else: - member_dict = convert_sequential_team_to_dict(team) - root = create_sequential_graph(member_dict, memory) - state = { - "messages": formatted_messages, - "team_name": team.name, - "team_members": member_dict, - "next": list(member_dict.values())[0].name, + if team.workflow == "hierarchical": + teams = convert_hierarchical_team_to_dict(team, members) + team_leader = list(teams.keys())[0] + root = create_hierarchical_graph( + teams, leader_name=team_leader, memory=memory + ) + state = { + "messages": formatted_messages, + "team_name": teams[team_leader].name, + "team_members": teams[team_leader].members, + } + else: + member_dict = convert_sequential_team_to_dict(team) + root = create_sequential_graph(member_dict, memory) + state = { + "messages": formatted_messages, + "team_name": team.name, + "team_members": member_dict, + "next": list(member_dict.values())[0].name, + } + async for output in root.astream_events( + state, + version="v1", + include_names=["work", "delegate", "summarise"], + config={"configurable": {"thread_id": thread_id}}, + ): + if output["event"] == "on_chain_end": + output_data = output["data"]["output"] + transformed_output_data = convert_messages_and_tasks_to_dict( + output_data + ) + formatted_output = f"data: {json.dumps(transformed_output_data)}\n\n" + yield formatted_output + except Exception as e: + error_message = { + "error": str(e), + "details": "An error occurred during streaming.", } - async for output in root.astream_events( - state, - version="v1", - include_names=["work", "delegate", "summarise"], - config={"configurable": {"thread_id": thread_id}}, - ): - if output["event"] == "on_chain_end": - output_data = output["data"]["output"] - transformed_output_data = convert_messages_and_tasks_to_dict(output_data) - formatted_output = f"data: {json.dumps(transformed_output_data)}\n\n" - yield formatted_output + yield f"data: {json.dumps(error_message)}\n\n" + await asyncio.sleep(0.1) # Add a small delay to ensure the message is sent diff --git a/frontend/src/components/Teams/ChatTeam.tsx b/frontend/src/components/Teams/ChatTeam.tsx index e5e26f58..689716f4 100644 --- a/frontend/src/components/Teams/ChatTeam.tsx +++ b/frontend/src/components/Teams/ChatTeam.tsx @@ -107,6 +107,10 @@ const MessageBox = ({ message }: { message: Message }) => { You + ) : member === "Error." ? ( + + Error + ) : ( member ) @@ -265,6 +269,16 @@ const ChatTeam = () => { const parsed = JSON.parse(jsonStr) const newMessages: Message[] = [] + if (!parsed) continue + + if ("error" in parsed) { + newMessages.push({ + type: "ai", + content: parsed.error, + member: "Error.", + }) + } + if ("messages" in parsed) { for (const message of parsed.messages) { newMessages.push({ @@ -291,7 +305,7 @@ const ChatTeam = () => { console.error("Failed to parse messages:", error) return showToast( "Something went wrong.", - "Unable to parse messages", + "Error parsing messages.", "error", ) } @@ -312,7 +326,7 @@ const ChatTeam = () => { showToast("Something went wrong.", `${errDetail}`, "error") }, onSuccess: () => { - showToast("Success!", "Streaming completed.", "success") + showToast("Streaming completed", "", "success") }, onSettled: () => { setIsStreaming(false)