diff --git a/.github/workflows/workbench-service.yml b/.github/workflows/workbench-service.yml index b124c64d..a29a94c5 100644 --- a/.github/workflows/workbench-service.yml +++ b/.github/workflows/workbench-service.yml @@ -5,6 +5,7 @@ on: branches: ["main"] paths: - "workbench-service/**" + - "libraries/python/semantic-workbench-assistant/**" - "libraries/python/semantic-workbench-api-model/**" - "tools/docker/**" - ".github/workflows/workbench-service.yml" @@ -13,6 +14,7 @@ on: branches: ["main"] paths: - "workbench-service/**" + - "libraries/python/semantic-workbench-assistant/**" - "libraries/python/semantic-workbench-api-model/**" - "tools/docker/**" - ".github/workflows/workbench-service.yml" diff --git a/README.md b/README.md index 54d1c967..b20fc404 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,7 @@ Workspace files allow us to manage multiple projects within a monorepo more effe - Bring your own llm api keys - Use VS Code > `Run and Debug` (Ctrl/Cmd+Shift+D) > `examples: python-02-simple-chatbot` to start the example chatbot assistant. Either set your keys in your .env file or after creating the assistant as described below, select it and provide the keys in the configuration page. -## Open the Workbench and create an Assistant instance +## Open the Workbench and create an Assistant Open the app in your browser at [`https://localhost:4000`](https://localhost:4000). When you first log into the Semantic Workbench, follow these steps to get started: diff --git a/assistants/skill-assistant/README.md b/assistants/skill-assistant/README.md index 214eb44a..8a50597c 100644 --- a/assistants/skill-assistant/README.md +++ b/assistants/skill-assistant/README.md @@ -4,9 +4,9 @@ The Skill Assistant serves as a demonstration of integrating the Skill Library w ## Overview -[skill_controller.py](assistant/skill_controller.py) file is responsible for managing the assistant instances. It includes functionality to create and retrieve assistants, configure chat drivers, and map skill events to the Semantic Workbench. +[skill_controller.py](assistant/skill_controller.py) file is responsible for managing the assistants. It includes functionality to create and retrieve assistants, configure chat drivers, and map skill events to the Semantic Workbench. -- AssistantRegistry: Manages multiple assistant instances, each associated with a unique conversation. +- AssistantRegistry: Manages multiple assistants, each associated with a unique conversation. - \_event_mapper: Maps skill events to message types understood by the Semantic Workbench. - create_assistant: Defines how to create and configure a new assistant. diff --git a/assistants/skill-assistant/assistant/assistant_registry.py b/assistants/skill-assistant/assistant/assistant_registry.py index 10618f0a..9163f48e 100644 --- a/assistants/skill-assistant/assistant/assistant_registry.py +++ b/assistants/skill-assistant/assistant/assistant_registry.py @@ -14,10 +14,10 @@ # TODO: Put this registry in the skill library. class AssistantRegistry: """ - This class handles the creation and management of skill assistant instances - for this service. Each conversation has its own assistant and we start each - assistant in it's own thread so that all events are able to be - asynchronously passed on to the Semantic Workbench. + This class handles the creation and management of skill assistants for this service. + Each conversation has its own assistant and we start each assistant in it's own + thread so that all events are able to be asynchronously passed on to the Semantic + Workbench. """ def __init__(self) -> None: diff --git a/docs/WORKBENCH_APP.md b/docs/WORKBENCH_APP.md index f484ae13..0ea13f6b 100644 --- a/docs/WORKBENCH_APP.md +++ b/docs/WORKBENCH_APP.md @@ -52,13 +52,13 @@ The Semantic Workbench interface dashboard includes sections for your assistants Select any of your assistants to access and update the assistant's configuration. Select an existing conversation or create a new one to start interacting with your assistants. -# Assistant Instances +# Assistants -Creating and configuring assistants in the Semantic Workbench allows you to utilize different AI functionalities tailored to specific needs. Note that you can create multiple instances of a single assistant service, each with its own configuration. +Creating and configuring assistants in the Semantic Workbench allows you to utilize different AI functionalities tailored to specific needs. Note that you can create multiple assistants, all back by a single assistant service, each with its own configuration. ## Creating a New Assistant -Creating a new assistant in the Semantic Workbench is straightforward. Begin by clicking on the `New Assistant` button on the dashboard. You will be presented with the available assistant services to choose from. Select the one that best suits your needs. You can accept the default name or choose your own then click `Save` to create the instance. +Creating a new assistant in the Semantic Workbench is straightforward. Begin by clicking on the `New Assistant` button on the dashboard. You will be presented with the available assistant services to choose from. Select the one that best suits your needs. You can accept the default name or choose your own then click `Save` to create the assistant. ## Configuring Assistants @@ -91,7 +91,7 @@ You have the ability to invite additional people to either observe or participat ## Creating a New Conversation -To start a new conversation with your assistant, click on its instance and then click `New Conversation`. Provide a title for the conversation and click `Save`. +To start a new conversation with your assistant, click on it and then click `New Conversation`. Provide a title for the conversation and click `Save`. ## Basics of Conversations @@ -152,11 +152,11 @@ When you provide someone with a link to copy a conversation it will copy the con * Second person follows the link later, they get the additional messages and data you added before they followed the link. ### Duplicating conversation -From the conversation list you can also duplicate them. This is useful for experimenting with taking conversations in different directions, or using one as a common base for further explorations. +From the conversation list you can also duplicate them. This is useful for experimenting with taking conversations in different directions, or using one as a common base for further explorations. ![Duplicate Conversation](images/conversation_duplicate.png) -Note that this will also copy the assistant instance the conversation is a part of as there is some state associated between the assistant and the conversation. +Note that this will also copy the assistant that the conversation is a part of as there is some state associated between the assistant and the conversation. ### Exporting and importing conversations diff --git a/libraries/python/semantic-workbench-api-model/semantic_workbench_api_model/assistant_service_client.py b/libraries/python/semantic-workbench-api-model/semantic_workbench_api_model/assistant_service_client.py index 06659d0f..409ab7c4 100644 --- a/libraries/python/semantic-workbench-api-model/semantic_workbench_api_model/assistant_service_client.py +++ b/libraries/python/semantic-workbench-api-model/semantic_workbench_api_model/assistant_service_client.py @@ -78,7 +78,7 @@ def __init__( ) -class AssistantInstanceClient: +class AssistantClient: def __init__(self, httpx_client_factory: Callable[[], httpx.AsyncClient]) -> None: self._client = httpx_client_factory() @@ -157,7 +157,7 @@ async def put_config(self, updated_config: ConfigPutRequestModel) -> ConfigRespo return ConfigResponseModel.model_validate(http_response.json()) @asynccontextmanager - async def get_exported_instance_data(self) -> AsyncGenerator[AsyncIterator[bytes], Any]: + async def get_exported_data(self) -> AsyncGenerator[AsyncIterator[bytes], Any]: try: http_response = await self._client.send(self._client.build_request("GET", "/export-data"), stream=True) except httpx.RequestError as e: @@ -266,7 +266,7 @@ async def __aexit__( async def aclose(self) -> None: await self._client.aclose() - async def put_assistant_instance( + async def put_assistant( self, assistant_id: uuid.UUID, request: AssistantPutRequestModel, @@ -284,7 +284,7 @@ async def put_assistant_instance( if not response.is_success: raise AssistantResponseError(response) - async def delete_assistant_instance(self, assistant_id: uuid.UUID) -> None: + async def delete_assistant(self, assistant_id: uuid.UUID) -> None: try: response = await self._client.delete(f"/{assistant_id}") if response.status_code == httpx.codes.NOT_FOUND: @@ -331,7 +331,7 @@ def _client(self, *additional_paths: str) -> httpx.AsyncClient: def for_service(self) -> AssistantServiceClient: return AssistantServiceClient(httpx_client_factory=self._client) - def for_assistant_instance(self, assistant_id: uuid.UUID) -> AssistantInstanceClient: - return AssistantInstanceClient( + def for_assistant(self, assistant_id: uuid.UUID) -> AssistantClient: + return AssistantClient( httpx_client_factory=lambda: self._client(str(assistant_id)), ) diff --git a/libraries/python/semantic-workbench-api-model/semantic_workbench_api_model/workbench_model.py b/libraries/python/semantic-workbench-api-model/semantic_workbench_api_model/workbench_model.py index 3643d440..36757bea 100644 --- a/libraries/python/semantic-workbench-api-model/semantic_workbench_api_model/workbench_model.py +++ b/libraries/python/semantic-workbench-api-model/semantic_workbench_api_model/workbench_model.py @@ -63,11 +63,6 @@ class ParticipantRole(StrEnum): service = "service" -class ParticipantStatus(BaseModel): - timestamp: datetime.datetime = Field(default_factory=lambda: datetime.datetime.now(datetime.UTC)) - message: str | None = None - - class ConversationPermission(StrEnum): read_write = "read_write" read = "read" @@ -84,6 +79,7 @@ class ConversationParticipant(BaseModel): active_participant: bool online: bool | None = None conversation_permission: ConversationPermission + metadata: dict[str, Any] class ConversationParticipantList(BaseModel): @@ -459,6 +455,7 @@ class UpdateParticipant(BaseModel): status: str | None = None active_participant: bool | None = None + metadata: dict[str, Any] | None = None class ConversationEventType(StrEnum): diff --git a/libraries/python/semantic-workbench-api-model/semantic_workbench_api_model/workbench_service_client.py b/libraries/python/semantic-workbench-api-model/semantic_workbench_api_model/workbench_service_client.py index ca7491f1..3a8ac757 100644 --- a/libraries/python/semantic-workbench-api-model/semantic_workbench_api_model/workbench_service_client.py +++ b/libraries/python/semantic-workbench-api-model/semantic_workbench_api_model/workbench_service_client.py @@ -40,18 +40,18 @@ def from_headers(headers: Mapping[str, str]) -> AssistantServiceRequestHeaders: @dataclass -class AssistantInstanceRequestHeaders: +class AssistantRequestHeaders: assistant_id: uuid.UUID | None def to_headers(self) -> Mapping[str, str]: return {HEADER_ASSISTANT_ID: str(self.assistant_id)} @staticmethod - def from_headers(headers: Mapping[str, str]) -> AssistantInstanceRequestHeaders: + def from_headers(headers: Mapping[str, str]) -> AssistantRequestHeaders: assistant_id: uuid.UUID | None = None with suppress(ValueError): assistant_id = uuid.UUID(headers.get(HEADER_ASSISTANT_ID) or "") - return AssistantInstanceRequestHeaders( + return AssistantRequestHeaders( assistant_id=assistant_id, ) @@ -480,7 +480,7 @@ def __init__( self._assistant_service_id = assistant_service_id self._api_key = api_key - def _client(self, *headers: AssistantServiceRequestHeaders | AssistantInstanceRequestHeaders) -> httpx.AsyncClient: + def _client(self, *headers: AssistantServiceRequestHeaders | AssistantRequestHeaders) -> httpx.AsyncClient: client = httpx.AsyncClient( transport=httpx_transport_factory(), base_url=self._base_url, @@ -505,7 +505,7 @@ def for_conversation(self, assistant_id: str, conversation_id: str) -> Conversat conversation_id=conversation_id, httpx_client_factory=lambda: self._client( AssistantServiceRequestHeaders(assistant_service_id=self._assistant_service_id, api_key=self._api_key), - AssistantInstanceRequestHeaders(assistant_id=uuid.UUID(assistant_id)), + AssistantRequestHeaders(assistant_id=uuid.UUID(assistant_id)), ), ) diff --git a/libraries/python/semantic-workbench-assistant/semantic_workbench_assistant/assistant_service.py b/libraries/python/semantic-workbench-assistant/semantic_workbench_assistant/assistant_service.py index 7e6b63f0..646fedc2 100644 --- a/libraries/python/semantic-workbench-assistant/semantic_workbench_assistant/assistant_service.py +++ b/libraries/python/semantic-workbench-assistant/semantic_workbench_assistant/assistant_service.py @@ -307,8 +307,7 @@ async def get_service_description() -> assistant_model.ServiceInfoModel: @app.put( "/{assistant_id}", description=( - "Connect an assistant instance to the workbench, optionally" - " providing exported-data to restore the assistant" + "Connect an assistant to the workbench, optionally providing exported-data to restore the assistant" ), ) async def put_assistant( @@ -328,21 +327,21 @@ async def put_assistant( @app.get( "/{assistant_id}", - description="Get an assistant instance", + description="Get an assistant", ) async def get_assistant(assistant_id: str) -> assistant_model.AssistantResponseModel: return await service.get_assistant(assistant_id) @app.delete( "/{assistant_id}", - description="Delete an assistant instance", + description="Delete an assistant", ) async def delete_assistant(assistant_id: str) -> None: return await service.delete_assistant(assistant_id) @app.get( "/{assistant_id}/export-data", - description="Export all data for this assistant instance", + description="Export all data for this assistant", ) async def export_assistant_data(assistant_id: str) -> Response: response = await service.export_assistant_data(assistant_id) @@ -356,14 +355,14 @@ async def export_assistant_data(assistant_id: str) -> Response: @app.get( "/{assistant_id}/config", - description="Get config for this assistant instance", + description="Get config for this assistant", ) async def get_config(assistant_id: str) -> assistant_model.ConfigResponseModel: return await service.get_config(assistant_id) @app.put( "/{assistant_id}/config", - description="Set config for this assistant instance", + description="Set config for this assistant", ) async def put_config( assistant_id: str, updated_config: assistant_model.ConfigPutRequestModel @@ -373,7 +372,7 @@ async def put_config( @app.put( "/{assistant_id}/conversations/{conversation_id}", description=( - "Join an assistant instance to a workbench conversation, optionally" + "Join an assistant to a workbench conversation, optionally" " providing exported-data to restore the conversation" ), ) diff --git a/libraries/python/semantic-workbench-assistant/tests/test_assistant_app.py b/libraries/python/semantic-workbench-assistant/tests/test_assistant_app.py index cc1faca4..93479370 100644 --- a/libraries/python/semantic-workbench-assistant/tests/test_assistant_app.py +++ b/libraries/python/semantic-workbench-assistant/tests/test_assistant_app.py @@ -118,11 +118,9 @@ async def on_chat_message( client_builder = assistant_service_client.AssistantServiceClientBuilder("https://fake", "") service_client = client_builder.for_service() - instance_client = client_builder.for_assistant_instance(assistant_id) + instance_client = client_builder.for_assistant(assistant_id) - await service_client.put_assistant_instance( - assistant_id=assistant_id, request=assistant_request, from_export=None - ) + await service_client.put_assistant(assistant_id=assistant_id, request=assistant_request, from_export=None) assert assistant_created_calls == 1 @@ -263,11 +261,9 @@ async def get(self, context: ConversationContext) -> AssistantConversationInspec client_builder = assistant_service_client.AssistantServiceClientBuilder("https://fake", "") service_client = client_builder.for_service() - instance_client = client_builder.for_assistant_instance(assistant_id) + instance_client = client_builder.for_assistant(assistant_id) - await service_client.put_assistant_instance( - assistant_id=assistant_id, request=assistant_request, from_export=None - ) + await service_client.put_assistant(assistant_id=assistant_id, request=assistant_request, from_export=None) await instance_client.put_conversation( request=assistant_model.ConversationPutRequestModel( id=str(conversation_id), @@ -334,11 +330,9 @@ async def import_(self, conversation_context: ConversationContext, stream: IO[by client_builder = assistant_service_client.AssistantServiceClientBuilder("https://fake", "") service_client = client_builder.for_service() - instance_client = client_builder.for_assistant_instance(assistant_id) + instance_client = client_builder.for_assistant(assistant_id) - await service_client.put_assistant_instance( - assistant_id=assistant_id, request=assistant_request, from_export=None - ) + await service_client.put_assistant(assistant_id=assistant_id, request=assistant_request, from_export=None) conversation_id = uuid.uuid4() @@ -415,11 +409,9 @@ class TestConfigModel(BaseModel): client_builder = assistant_service_client.AssistantServiceClientBuilder("https://fake", "") service_client = client_builder.for_service() - instance_client = client_builder.for_assistant_instance(assistant_id) + instance_client = client_builder.for_assistant(assistant_id) - await service_client.put_assistant_instance( - assistant_id=assistant_id, request=assistant_request, from_export=None - ) + await service_client.put_assistant(assistant_id=assistant_id, request=assistant_request, from_export=None) response = await instance_client.get_config() assert response == assistant_model.ConfigResponseModel( @@ -463,7 +455,7 @@ class TestConfigModel(BaseModel): temp_dir_path = pathlib.Path(temp_dir) export_file_path = temp_dir_path / "export.zip" with export_file_path.open("wb") as f: - async with instance_client.get_exported_instance_data() as response: + async with instance_client.get_exported_data() as response: async for chunk in response: f.write(chunk) diff --git a/workbench-app/src/components/Assistants/AssistantCreate.tsx b/workbench-app/src/components/Assistants/AssistantCreate.tsx index 18ba6011..2d8006a2 100644 --- a/workbench-app/src/components/Assistants/AssistantCreate.tsx +++ b/workbench-app/src/components/Assistants/AssistantCreate.tsx @@ -226,7 +226,7 @@ export const AssistantCreate: React.FC = (props) => { }} > - New Instance of Assistant + New Assistant {!manualEntry && ( diff --git a/workbench-app/src/models/ConversationParticipant.ts b/workbench-app/src/models/ConversationParticipant.ts index b078c0a1..b85705b4 100644 --- a/workbench-app/src/models/ConversationParticipant.ts +++ b/workbench-app/src/models/ConversationParticipant.ts @@ -3,6 +3,7 @@ export interface ConversationParticipant { role: 'user' | 'assistant' | 'service'; id: string; + conversationId: string; name: string; image?: string; online?: boolean; @@ -10,4 +11,7 @@ export interface ConversationParticipant { statusTimestamp: string | null; conversationPermission: 'read' | 'read_write'; active: boolean; + metadata: { + [key: string]: any; + }; } diff --git a/workbench-app/src/services/workbench/participant.ts b/workbench-app/src/services/workbench/participant.ts index 721f26c8..9c5ee67b 100644 --- a/workbench-app/src/services/workbench/participant.ts +++ b/workbench-app/src/services/workbench/participant.ts @@ -18,12 +18,12 @@ const participantApi = workbenchApi.injectEndpoints({ }), updateConversationParticipant: builder.mutation< void, - { conversationId: string; participantId: string; status: string } + { conversationId: string; participantId: string; status?: string; metadata?: Record } >({ - query: ({ conversationId, participantId, status }) => ({ + query: ({ conversationId, participantId, status, metadata }) => ({ url: `/conversations/${conversationId}/participants/${participantId}`, method: 'PUT', - body: { status, active_participant: true }, + body: { status, active_participant: true, metadata }, }), invalidatesTags: ['Conversation'], }), @@ -65,6 +65,7 @@ export const transformResponseToConversationParticipant = (response: any): Conve try { return { id: response.id, + conversationId: response.conversation_id, role: response.role, name: response.name, image: response.image ?? undefined, @@ -73,6 +74,7 @@ export const transformResponseToConversationParticipant = (response: any): Conve statusTimestamp: response.status_updated_timestamp, conversationPermission: response.conversation_permission, active: response.active_participant, + metadata: response.metadata, }; } catch (error) { throw new Error(`Failed to transform participant response: ${error}`); diff --git a/workbench-service/migrations/versions/2024_11_25_191056_a106de176394_drop_workflow.py b/workbench-service/migrations/versions/2024_11_25_191056_a106de176394_drop_workflow.py new file mode 100644 index 00000000..69424b4a --- /dev/null +++ b/workbench-service/migrations/versions/2024_11_25_191056_a106de176394_drop_workflow.py @@ -0,0 +1,72 @@ +"""drop workflow + +Revision ID: a106de176394 +Revises: 245baf258e11 +Create Date: 2024-11-25 19:10:56.835186 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "a106de176394" +down_revision: Union[str, None] = "245baf258e11" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.drop_table("workflowuserparticipant") + op.drop_table("workflowrun") + op.drop_table("workflowdefinition") + + with op.batch_alter_table("assistantparticipant") as batch_op: + batch_op.add_column(sa.Column("metadata", sa.JSON(), server_default="{}", nullable=False)) + + with op.batch_alter_table("userparticipant") as batch_op: + batch_op.add_column(sa.Column("metadata", sa.JSON(), server_default="{}", nullable=False)) + + +def downgrade() -> None: + op.drop_column("userparticipant", "metadata") + op.drop_column("assistantparticipant", "metadata") + op.create_table( + "workflowdefinition", + sa.Column("workflow_definition_id", sa.UUID(), autoincrement=False, nullable=False), + sa.Column("data", postgresql.JSON(astext_type=sa.Text()), autoincrement=False, nullable=True), + sa.PrimaryKeyConstraint("workflow_definition_id", name="workflowdefinition_pkey"), + postgresql_ignore_search_path=False, + ) + op.create_table( + "workflowuserparticipant", + sa.Column("workflow_definition_id", sa.UUID(), autoincrement=False, nullable=False), + sa.Column("user_id", sa.VARCHAR(), autoincrement=False, nullable=False), + sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False), + sa.Column("image", sa.VARCHAR(), autoincrement=False, nullable=True), + sa.Column("service_user", sa.BOOLEAN(), autoincrement=False, nullable=False), + sa.Column("active_participant", sa.BOOLEAN(), autoincrement=False, nullable=False), + sa.ForeignKeyConstraint( + ["workflow_definition_id"], + ["workflowdefinition.workflow_definition_id"], + name="fk_workflowuserparticipant_workflowdefinition", + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("workflow_definition_id", "user_id", name="workflowuserparticipant_pkey"), + ) + op.create_table( + "workflowrun", + sa.Column("workflow_run_id", sa.UUID(), autoincrement=False, nullable=False), + sa.Column("workflow_definition_id", sa.UUID(), autoincrement=False, nullable=False), + sa.Column("data", postgresql.JSON(astext_type=sa.Text()), autoincrement=False, nullable=True), + sa.ForeignKeyConstraint( + ["workflow_definition_id"], + ["workflowdefinition.workflow_definition_id"], + name="fk_workflowrun_workflowdefinition", + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("workflow_run_id", name="workflowrun_pkey"), + ) diff --git a/workbench-service/semantic_workbench_service/controller/__init__.py b/workbench-service/semantic_workbench_service/controller/__init__.py index 5ebd50db..552e7ad4 100644 --- a/workbench-service/semantic_workbench_service/controller/__init__.py +++ b/workbench-service/semantic_workbench_service/controller/__init__.py @@ -13,7 +13,6 @@ ) from .file import FileController from .user import UserController -from .workflow import WorkflowController __all__ = [ "AssistantController", @@ -29,6 +28,5 @@ "NotFoundError", "user", "participant", - "WorkflowController", "UserController", ] diff --git a/workbench-service/semantic_workbench_service/controller/assistant.py b/workbench-service/semantic_workbench_service/controller/assistant.py index 8b1dacc9..3014ab46 100644 --- a/workbench-service/semantic_workbench_service/controller/assistant.py +++ b/workbench-service/semantic_workbench_service/controller/assistant.py @@ -121,7 +121,7 @@ async def _put_assistant(self, assistant: db.Assistant, from_export: IO[bytes] | await self._client_pool.service_client( registration=assistant.related_assistant_service_registration, ) - ).put_assistant_instance( + ).put_assistant( assistant_id=assistant.assistant_id, request=AssistantPutRequestModel(assistant_name=assistant.name), from_export=from_export, @@ -138,7 +138,7 @@ async def forward_event_to_assistant(self, assistant_id: uuid.UUID, event: Conve ).one() try: - await (await self._client_pool.assistant_instance_client(assistant)).post_conversation_event(event=event) + await (await self._client_pool.assistant_client(assistant)).post_conversation_event(event=event) except AssistantError as e: if e.status_code != httpx.codes.NOT_FOUND: logger.exception( @@ -190,14 +190,12 @@ async def _remove_assistant_from_conversation( await session.flush() async def disconnect_assistant_from_conversation(self, conversation_id: uuid.UUID, assistant: db.Assistant) -> None: - await (await self._client_pool.assistant_instance_client(assistant)).delete_conversation( - conversation_id=conversation_id - ) + await (await self._client_pool.assistant_client(assistant)).delete_conversation(conversation_id=conversation_id) async def connect_assistant_to_conversation( self, conversation: db.Conversation, assistant: db.Assistant, from_export: IO[bytes] | None ) -> None: - await (await self._client_pool.assistant_instance_client(assistant)).put_conversation( + await (await self._client_pool.assistant_client(assistant)).put_conversation( ConversationPutRequestModel(id=str(conversation.conversation_id), title=conversation.title), from_export=from_export, ) @@ -343,7 +341,7 @@ async def delete_assistant( try: await ( await self._client_pool.service_client(assistant.related_assistant_service_registration) - ).delete_assistant_instance(assistant_id=assistant.assistant_id) + ).delete_assistant(assistant_id=assistant.assistant_id) except AssistantError: logger.exception("error disconnecting assistant") @@ -412,7 +410,7 @@ async def get_assistant_config( principal=user_principal, assistant_id=assistant_id, session=session ) - return await (await self._client_pool.assistant_instance_client(assistant)).get_config() + return await (await self._client_pool.assistant_client(assistant)).get_config() async def update_assistant_config( self, @@ -425,7 +423,7 @@ async def update_assistant_config( principal=user_principal, assistant_id=assistant_id, session=session ) - return await (await self._client_pool.assistant_instance_client(assistant)).put_config(updated_config) + return await (await self._client_pool.assistant_client(assistant)).put_config(updated_config) async def get_assistant_conversation_state_descriptions( self, @@ -444,7 +442,7 @@ async def get_assistant_conversation_state_descriptions( assistant=assistant, conversation_id=conversation_id, session=session ) - return await (await self._client_pool.assistant_instance_client(assistant)).get_state_descriptions( + return await (await self._client_pool.assistant_client(assistant)).get_state_descriptions( conversation_id=conversation_id ) @@ -466,7 +464,7 @@ async def get_assistant_conversation_state( assistant=assistant, conversation_id=conversation_id, session=session ) - return await (await self._client_pool.assistant_instance_client(assistant)).get_state( + return await (await self._client_pool.assistant_client(assistant)).get_state( conversation_id=conversation_id, state_id=state_id ) @@ -489,7 +487,7 @@ async def update_assistant_conversation_state( assistant=assistant, conversation_id=conversation_id, session=session ) - return await (await self._client_pool.assistant_instance_client(assistant)).put_state( + return await (await self._client_pool.assistant_client(assistant)).put_state( conversation_id=conversation_id, state_id=state_id, updated_state=updated_state ) @@ -611,14 +609,14 @@ async def _export( assistants = await session.exec(select(db.Assistant).where(col(db.Assistant.assistant_id).in_(assistant_ids))) for assistant in assistants: - assistant_client = await self._client_pool.assistant_instance_client(assistant) + assistant_client = await self._client_pool.assistant_client(assistant) # export assistant data assistant_dir = export_dir_path / "assistants" / str(assistant.assistant_id) assistant_dir.mkdir(parents=True) with (assistant_dir / AssistantController.EXPORT_ASSISTANT_DATA_FILENAME).open("wb") as assistant_file: - async with assistant_client.get_exported_instance_data() as response: + async with assistant_client.get_exported_data() as response: async for chunk in response: assistant_file.write(chunk) @@ -964,7 +962,7 @@ async def duplicate_conversation( try: # **Export the assistant's conversation data from the original conversation** - assistant_client = await self._client_pool.assistant_instance_client(assistant) + assistant_client = await self._client_pool.assistant_client(assistant) async with assistant_client.get_exported_conversation_data( conversation_id=conversation_id ) as export_response: diff --git a/workbench-service/semantic_workbench_service/controller/assistant_service_client_pool.py b/workbench-service/semantic_workbench_service/controller/assistant_service_client_pool.py index 52cc6707..6051d52a 100644 --- a/workbench-service/semantic_workbench_service/controller/assistant_service_client_pool.py +++ b/workbench-service/semantic_workbench_service/controller/assistant_service_client_pool.py @@ -2,7 +2,7 @@ from typing import Self from semantic_workbench_api_model.assistant_service_client import ( - AssistantInstanceClient, + AssistantClient, AssistantServiceClient, AssistantServiceClientBuilder, ) @@ -14,7 +14,7 @@ class AssistantServiceClientPool: def __init__(self, api_key_store: assistant_api_key.ApiKeyStore) -> None: self._api_key_store = api_key_store self._service_clients: dict[str, AssistantServiceClient] = {} - self._assistant_clients: dict[str, AssistantInstanceClient] = {} + self._assistant_clients: dict[str, AssistantClient] = {} self._client_lock = asyncio.Lock() def __aenter__(self) -> Self: @@ -38,7 +38,7 @@ async def service_client(self, registration: db.AssistantServiceRegistration) -> return self._service_clients[key] - async def assistant_instance_client(self, assistant: db.Assistant) -> AssistantInstanceClient: + async def assistant_client(self, assistant: db.Assistant) -> AssistantClient: assistant_id = assistant.assistant_id url = assistant.related_assistant_service_registration.assistant_service_url key = f"{assistant_id}-{url}" @@ -48,7 +48,7 @@ async def assistant_instance_client(self, assistant: db.Assistant) -> AssistantI if key not in self._assistant_clients: self._assistant_clients[key] = ( await self._client_builder(assistant.related_assistant_service_registration) - ).for_assistant_instance(assistant_id) + ).for_assistant(assistant_id) return self._assistant_clients[key] diff --git a/workbench-service/semantic_workbench_service/controller/conversation.py b/workbench-service/semantic_workbench_service/controller/conversation.py index a366e35b..25ee81ea 100644 --- a/workbench-service/semantic_workbench_service/controller/conversation.py +++ b/workbench-service/semantic_workbench_service/controller/conversation.py @@ -1,7 +1,6 @@ import datetime import logging import uuid -from dataclasses import dataclass from typing import AsyncContextManager, Awaitable, Callable, Iterable, Literal, Sequence import deepmerge @@ -35,12 +34,6 @@ logger = logging.getLogger(__name__) -@dataclass -class MessagePreview: - conversation_id: uuid.UUID - message: ConversationMessage - - class ConversationController: def __init__( self, @@ -52,11 +45,6 @@ def __init__( self._notify_event = notify_event self._assistant_controller = assistant_controller - self._message_previewers: list[Callable[[MessagePreview], Awaitable[None]]] = [] - - def register_message_previewer(self, previewer: Callable[[MessagePreview], Awaitable[None]]) -> None: - self._message_previewers.append(previewer) - async def create_conversation( self, new_conversation: NewConversation, @@ -438,6 +426,12 @@ async def update_user_participant( participant.status = update_participant.status participant.status_updated_datetime = datetime.datetime.now(datetime.UTC) + if update_participant.metadata is not None: + event_type = event_type or ConversationEventType.participant_updated + participant.meta_data = deepmerge.always_merger.merge( + participant.meta_data or {}, update_participant.metadata + ) + if event_type is not None: session.add(participant) await session.commit() @@ -491,6 +485,12 @@ async def update_assistant_participant( participant.status = update_participant.status participant.status_updated_datetime = datetime.datetime.now(datetime.UTC) + if update_participant.metadata is not None: + event_type = event_type or ConversationEventType.participant_updated + participant.meta_data = deepmerge.always_merger.merge( + participant.meta_data or {}, update_participant.metadata + ) + if event_type is not None: session.add(participant) await session.commit() @@ -709,11 +709,6 @@ async def create_conversation_message( message_response = convert.conversation_message_from_db(message, has_debug=bool(message_debug)) - # share message with previewers - message_preview = MessagePreview(conversation_id=conversation_id, message=message_response) - for previewer in self._message_previewers: - await previewer(message_preview) - await self._notify_event( ConversationEventQueueItem( event=ConversationEvent( diff --git a/workbench-service/semantic_workbench_service/controller/convert.py b/workbench-service/semantic_workbench_service/controller/convert.py index f73093b5..baf77fb1 100644 --- a/workbench-service/semantic_workbench_service/controller/convert.py +++ b/workbench-service/semantic_workbench_service/controller/convert.py @@ -27,11 +27,6 @@ ParticipantRole, User, UserList, - WorkflowDefinition, - WorkflowDefinitionList, - WorkflowParticipant, - WorkflowRun, - WorkflowRunList, ) from .. import db @@ -111,6 +106,7 @@ def conversation_participant_from_db_user(model: db.UserParticipant) -> Conversa status=model.status, status_updated_timestamp=model.status_updated_datetime, active_participant=model.active_participant, + metadata=model.meta_data, conversation_permission=ConversationPermission(model.conversation_permission), ) @@ -127,6 +123,7 @@ def conversation_participant_from_db_assistant( status=model.status, status_updated_timestamp=model.status_updated_datetime, active_participant=model.active_participant, + metadata=model.meta_data, online=assistant.related_assistant_service_registration.assistant_service_online if assistant else False, conversation_permission=ConversationPermission.read_write, ) @@ -309,33 +306,3 @@ def file_versions_from_db(file: db.File, versions: Iterable[db.FileVersion]) -> current_version=file.current_version, versions=[file_version_from_db(v) for v in versions], ) - - -def workflow_definition_from_db(model: db.WorkflowDefinition) -> WorkflowDefinition: - return WorkflowDefinition.model_validate({ - "id": model.workflow_definition_id, - **model.data, - }) - - -def workflow_definition_list_from_db(models: Iterable[db.WorkflowDefinition]) -> WorkflowDefinitionList: - return WorkflowDefinitionList(workflow_definitions=[workflow_definition_from_db(model) for model in models]) - - -def workflow_participant_from_db(model: db.WorkflowUserParticipant) -> WorkflowParticipant: - return WorkflowParticipant( - id=model.user_id, - active_participant=model.active_participant, - ) - - -def workflow_run_from_db(model: db.WorkflowRun) -> WorkflowRun: - return WorkflowRun( - id=model.workflow_run_id, - workflow_definition_id=model.workflow_definition_id, - **model.data, - ) - - -def workflow_run_list_from_db(models: Iterable[db.WorkflowRun]) -> WorkflowRunList: - return WorkflowRunList(workflow_runs=[workflow_run_from_db(model) for model in models]) diff --git a/workbench-service/semantic_workbench_service/controller/workflow.py b/workbench-service/semantic_workbench_service/controller/workflow.py deleted file mode 100644 index ef29c208..00000000 --- a/workbench-service/semantic_workbench_service/controller/workflow.py +++ /dev/null @@ -1,1780 +0,0 @@ -import uuid -from typing import Any, AsyncContextManager, Callable, Tuple - -import openai.types.chat -from azure.identity.aio import DefaultAzureCredential, get_bearer_token_provider -from openai import AsyncAzureOpenAI -from semantic_workbench_api_model.assistant_model import ConfigPutRequestModel -from semantic_workbench_api_model.workbench_model import ( - AssistantList, - Conversation, - ConversationMessage, - MessageType, - NewAssistant, - NewConversation, - NewConversationMessage, - NewWorkflowDefinition, - NewWorkflowRun, - ParticipantRole, - UpdateAssistant, - UpdateConversation, - UpdateParticipant, - UpdateWorkflowDefinition, - UpdateWorkflowParticipant, - UpdateWorkflowRun, - UpdateWorkflowRunMappings, - WorkflowAssistantMapping, - WorkflowConversationMapping, - WorkflowDefinition, - WorkflowDefinitionList, - WorkflowParticipant, - WorkflowRun, - WorkflowRunList, - WorkflowTransition, -) -from sqlmodel import select -from sqlmodel.ext.asyncio.session import AsyncSession - -from .. import auth, db, query, service_user_principals, settings -from . import AssistantController, ConversationController, convert, exceptions -from . import user as user_ -from .conversation import MessagePreview - -default_context_transfer_instruction = ( - "Generate content that can be a note pasted in before the first message in a new conversation" - " that is made up of different participants with no context of the previous conversation. This" - " note should not originate from or target any specific participant(s) and should be able to" - " transfer just the context asked for in the ." -) - - -class WorkflowController: - def __init__( - self, - get_session: Callable[[], AsyncContextManager[AsyncSession]], - assistant_controller: AssistantController, - conversation_controller: ConversationController, - ) -> None: - self._get_session = get_session - self._assistant_controller = assistant_controller - self._conversation_controller = conversation_controller - - # - # Public methods - # - - async def preview_message(self, message_preview: MessagePreview) -> None: - if ( - # ignore non-chat messages - message_preview.message.message_type != MessageType.chat - # ignore messages from the workflow itself - or message_preview.message.sender.participant_id == service_user_principals.workflow.user_id - ): - return - - # get workflow run id for conversation - workflow_run_id = await self.get_workflow_run_id_for_conversation( - conversation_id=message_preview.conversation_id, - ) - - if workflow_run_id is None: - # no workflow run found for conversation - return - - # indicate that the message is being evaluated - try: - try: - await self._conversation_controller.add_or_update_conversation_participant( - conversation_id=message_preview.conversation_id, - participant_id=service_user_principals.workflow.user_id, - update_participant=UpdateParticipant( - status="evaluation in progress...", - ), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to update participant status text: {e}", - ) from e # indicate that the message is being evaluated - - # evaluate transitions - try: - await self.perform_transition_if_applicable( - workflow_run_id=uuid.UUID(workflow_run_id), - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to evaluate transitions for conversation: {e}", - ) from e - finally: - # clear participant status text - try: - await self._conversation_controller.add_or_update_conversation_participant( - conversation_id=message_preview.conversation_id, - participant_id=service_user_principals.workflow.user_id, - update_participant=UpdateParticipant(status=None), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to clear participant status text: {e}", - ) from e - - # Workflow Definition - - async def get_workflow_definition(self, workflow_definition_id: uuid.UUID) -> WorkflowDefinition: - async with self._get_session() as session: - workflow_definition = ( - await session.exec( - query.select(db.WorkflowDefinition).where( - db.WorkflowDefinition.workflow_definition_id == workflow_definition_id - ) - ) - ).one_or_none() - if workflow_definition is None: - raise exceptions.NotFoundError() - - return convert.workflow_definition_from_db(model=workflow_definition) - - async def get_workflow_definitions( - self, user_principal: auth.UserPrincipal, include_inactive: bool = False - ) -> WorkflowDefinitionList: - async with self._get_session() as session: - workflow_definitions = await session.exec( - query.select_workflow_definitions_for(user_principal=user_principal, include_inactive=include_inactive) - ) - - return convert.workflow_definition_list_from_db(models=workflow_definitions) - - async def create_workflow_definition( - self, user_principal: auth.UserPrincipal, new_workflow_definition: NewWorkflowDefinition - ) -> WorkflowDefinition: - async with self._get_session() as session: - await user_.add_or_update_user_from(session=session, user_principal=user_principal) - workflow_definition = db.WorkflowDefinition( - data=new_workflow_definition.model_dump(), - ) - session.add(workflow_definition) - session.add( - db.WorkflowUserParticipant( - workflow_definition_id=workflow_definition.workflow_definition_id, - user_id=user_principal.user_id, - ) - ) - await session.commit() - await session.refresh(workflow_definition) - - return convert.workflow_definition_from_db(model=workflow_definition) - - async def update_workflow_definition( - self, - user_principal: auth.UserPrincipal, - workflow_definition_id: uuid.UUID, - update_workflow_definition: UpdateWorkflowDefinition, - ) -> WorkflowDefinition: - async with self._get_session() as session: - workflow_definition = ( - await session.exec( - query.select_workflow_definitions_for(user_principal=user_principal) - .where(db.WorkflowDefinition.workflow_definition_id == workflow_definition_id) - .with_for_update() - ) - ).one_or_none() - if workflow_definition is None: - raise exceptions.NotFoundError() - - data = workflow_definition.data.copy() - updates = update_workflow_definition.model_dump(exclude_unset=True, mode="json") - data.update(updates) - workflow_definition.data = data - - session.add(workflow_definition) - await session.commit() - await session.refresh(workflow_definition) - - return convert.workflow_definition_from_db(model=workflow_definition) - - async def add_or_update_workflow_participant( - self, - workflow_definition_id: uuid.UUID, - participant_id: str, - update_participant: UpdateWorkflowParticipant, - ) -> WorkflowParticipant: - async with self._get_session() as session: - await db.insert_if_not_exists( - session, - db.WorkflowUserParticipant( - workflow_definition_id=workflow_definition_id, - user_id=participant_id, - active_participant=update_participant.active_participant, - ), - ) - participant = ( - await session.exec( - select(db.WorkflowUserParticipant) - .where(db.WorkflowUserParticipant.workflow_definition_id == workflow_definition_id) - .where(db.WorkflowUserParticipant.user_id == participant_id) - .with_for_update() - ) - ).one() - - for key, value in update_participant.model_dump(exclude_unset=True).items(): - setattr(participant, key, value) - - session.add(participant) - - await session.commit() - await session.refresh(participant) - - return convert.workflow_participant_from_db(model=participant) - - async def get_workflow_definition_defaults(self) -> NewWorkflowDefinition: - return NewWorkflowDefinition( - label="New Workflow", - start_state_id="start", - states=[], - transitions=[], - conversation_definitions=[], - assistant_definitions=[], - context_transfer_instruction=default_context_transfer_instruction, - ) - - # Workflow Run - - async def create_workflow_run(self, new_workflow_run: NewWorkflowRun) -> WorkflowRun: - async with self._get_session() as session: - workflow_definition = await self.get_workflow_definition( - workflow_definition_id=new_workflow_run.workflow_definition_id - ) - - workflow_run = db.WorkflowRun( - workflow_definition_id=new_workflow_run.workflow_definition_id, - data={ - "title": new_workflow_run.title, - "current_state_id": workflow_definition.start_state_id, - "conversation_mappings": [], - "assistant_mappings": [], - "metadata": new_workflow_run.metadata, - }, - ) - session.add(workflow_run) - await session.commit() - await session.refresh(workflow_run) - - # initialize workflow - await self.ensure_configuration_for_workflow_state( - workflow_run_id=workflow_run.workflow_run_id, - ) - - return convert.workflow_run_from_db(model=workflow_run) - - async def get_workflow_run(self, workflow_run_id: uuid.UUID) -> WorkflowRun: - async with self._get_session() as session: - workflow_run = ( - await session.exec( - query.select(db.WorkflowRun).where(db.WorkflowRun.workflow_run_id == workflow_run_id) - ) - ).one_or_none() - if workflow_run is None: - raise exceptions.NotFoundError() - - return convert.workflow_run_from_db(model=workflow_run) - - async def get_workflow_runs( - self, user_principal: auth.UserPrincipal, workflow_definition_id: uuid.UUID | None = None - ) -> WorkflowRunList: - async with self._get_session() as session: - workflow_query = query.select_workflow_runs_for(user_principal=user_principal) - if workflow_definition_id is not None: - workflow_query = ( - select(db.WorkflowRun) - .join(db.WorkflowDefinition) - .where(db.WorkflowDefinition.workflow_definition_id == workflow_definition_id) - ) - workflow_runs = await session.exec(workflow_query) - return convert.workflow_run_list_from_db(models=workflow_runs) - - async def update_workflow_run( - self, workflow_run_id: uuid.UUID, update_workflow_run: UpdateWorkflowRun - ) -> WorkflowRun: - async with self._get_session() as session: - workflow_run = ( - await session.exec( - query.select(db.WorkflowRun) - .where(db.WorkflowRun.workflow_run_id == workflow_run_id) - .with_for_update() - ) - ).one_or_none() - if workflow_run is None: - raise exceptions.NotFoundError() - - data = workflow_run.data.copy() - updates = update_workflow_run.model_dump(exclude_unset=True) - data.update(updates) - workflow_run.data = data - - session.add(workflow_run) - await session.commit() - await session.refresh(workflow_run) - - return convert.workflow_run_from_db(model=workflow_run) - - async def get_workflow_run_assistants( - self, user_principal: auth.UserPrincipal, workflow_run_id: uuid.UUID - ) -> AssistantList: - workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id) - assistants = [] - for assistant_mapping in workflow_run.assistant_mappings: - assistant = await self._assistant_controller.get_assistant( - user_principal=user_principal, assistant_id=uuid.UUID(assistant_mapping.assistant_id) - ) - if assistant is not None: - assistants.append(assistant) - return AssistantList(assistants=assistants) - - async def switch_workflow_run_state( - self, workflow_run_id: uuid.UUID, target_state_id: str, metadata: dict[str, Any] | None = None - ) -> WorkflowRun: - # save the data - async with self._get_session() as session: - workflow_run = ( - await session.exec( - query.select(db.WorkflowRun) - .where(db.WorkflowRun.workflow_run_id == workflow_run_id) - .with_for_update() - ) - ).one_or_none() - if workflow_run is None: - raise exceptions.NotFoundError() - - # update workflow run state - data = workflow_run.data.copy() - - # get prior state id for handling state changes - prior_state_id = data["current_state_id"] - # ensure target state is different from current state - if prior_state_id == target_state_id: - raise exceptions.RuntimeError(detail="target state is the same as the current state") - - updates = {"current_state_id": str(target_state_id)} - data.update(updates) - workflow_run.data = data - - session.add(workflow_run) - await session.commit() - await session.refresh(workflow_run) - - # apply state change - try: - await self.handle_workflow_state_changed( - workflow_run_id=workflow_run_id, prior_state_id=prior_state_id, metadata=metadata - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to notify workflow state change: {e}", - ) from e - - # reload workflow run to get updated state - return await self.get_workflow_run(workflow_run_id=workflow_run_id) - - async def delete_workflow_run(self, user_principal: auth.UserPrincipal, workflow_run_id: uuid.UUID) -> None: - # TODO: implement delete logic, including cleaning up all associated conversations - # and assistants hard/soft-delete, etc. - async with self._get_session() as session: - workflow_run = ( - await session.exec( - query.select_workflow_runs_for(user_principal=user_principal).where( - db.WorkflowRun.workflow_run_id == workflow_run_id - ) - ) - ).one_or_none() - if workflow_run is None: - raise exceptions.NotFoundError() - - await session.delete(workflow_run) - await session.commit() - - # - # Private methods - # - - async def handle_workflow_state_changed( - self, - workflow_run_id: uuid.UUID, - prior_state_id: str, - metadata: dict[str, Any] | None = None, - ) -> None: - # ensure conversation and assistant configuration for new state - try: - await self.ensure_configuration_for_workflow_state( - workflow_run_id=workflow_run_id, - prior_state_id=prior_state_id, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to ensure configuration for new state while switching state: {e}", - ) from e - - # get updated workflow run and definition - workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id) - workflow_definition = await self.get_workflow_definition( - workflow_definition_id=workflow_run.workflow_definition_id - ) - - # get prior state info - prior_state = next((state for state in workflow_definition.states if state.id == prior_state_id), None) - if prior_state is None: - raise exceptions.RuntimeError( - detail="prior state not found while notifying state change", - ) - - # get prior conversation id - prior_conversation_id = self.get_conversation_id_for_workflow_state( - workflow_run=workflow_run, - workflow_definition=workflow_definition, - target_state_id=prior_state_id, - ) - - # get current state info - workflow_state = next( - (state for state in workflow_definition.states if state.id == workflow_run.current_state_id), None - ) - if workflow_state is None: - raise exceptions.RuntimeError( - detail="current state not found while notifying state change", - ) - - # send state change message to prior conversation - try: - await self._conversation_controller.create_conversation_message( - conversation_id=uuid.UUID(prior_conversation_id), - new_message=NewConversationMessage( - message_type=MessageType.notice, - content=f"workflow state changed to {workflow_state.label}", - metadata={ - **(metadata or {}), - "workflow_run_updated": str(workflow_run_id), - }, - ), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to send state change message: {e}", - ) from e - - # get current conversation id - current_conversation_id = self.get_conversation_id_for_workflow_state( - workflow_run=workflow_run, - workflow_definition=workflow_definition, - target_state_id=workflow_run.current_state_id, - ) - - if current_conversation_id == prior_conversation_id: - # no conversation change, exit early - return - - # conversation changed, handle conversation change - try: - await self.handle_workflow_conversation_changed( - workflow_run_id=workflow_run_id, - prior_state_id=prior_state_id, - prior_conversation_id=prior_conversation_id, - current_conversation_id=current_conversation_id, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to handle conversation change: {e}", - ) from e - - async def handle_workflow_conversation_changed( - self, - workflow_run_id: uuid.UUID, - prior_state_id: str, - prior_conversation_id: str, - current_conversation_id: str, - ) -> None: - # get workflow run and definition - workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id) - workflow_definition = await self.get_workflow_definition( - workflow_definition_id=workflow_run.workflow_definition_id - ) - - # set all assistants in prior conversation to inactive - try: - await self.deactivate_all_assistants_in_conversation( - workflow_run=workflow_run, - conversation_id=uuid.UUID(prior_conversation_id), - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to set all assistants in current conversation to inactive: {e}", - ) from e - - # get current state info - workflow_state = next( - (state for state in workflow_definition.states if state.id == workflow_run.current_state_id), None - ) - if workflow_state is None: - raise exceptions.RuntimeError( - detail="current state not found while notifying state change", - ) - - # get conversation definition for workflow state - conversation_definition = next( - ( - possible_conversation_definition - for possible_conversation_definition in workflow_definition.conversation_definitions - if possible_conversation_definition.id == workflow_state.conversation_definition_id - ), - None, - ) - if conversation_definition is None: - raise exceptions.RuntimeError( - detail="conversation definition not found while notifying state change", - ) - - # inform the previous conversation of the conversation change - try: - await self._conversation_controller.create_conversation_message( - conversation_id=uuid.UUID(prior_conversation_id), - new_message=NewConversationMessage( - message_type=MessageType.notice, - content=f"workflow conversation changed to {conversation_definition.title}", - metadata={ - "workflow_run_updated": str(workflow_run.id), - }, - ), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to inform previous conversation of conversation change: {e}", - ) from e - - # transfer context from previous conversation to new conversation - try: - await self.transfer_context_if_applicable( - workflow_run_id=workflow_run_id, - source_state_id=prior_state_id, - target_state_id=workflow_run.current_state_id, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to transfer context from previous conversation to new conversation: {e}", - ) from e - - # inform the new conversation of the conversation change - try: - await self._conversation_controller.create_conversation_message( - conversation_id=uuid.UUID(current_conversation_id), - new_message=NewConversationMessage( - message_type=MessageType.notice, - content="workflow conversation changed to here", - metadata={ - "workflow_run_updated": str(workflow_run.id), - }, - ), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to inform new conversation of conversation change: {e}", - ) from e - - async def ensure_configuration_for_workflow_state( - self, workflow_run_id: uuid.UUID, prior_state_id: str | None = None - ) -> None: - # ensure conversation is configured - conversation = await self.ensure_configuration_of_conversation_for_workflow_state( - workflow_run_id=workflow_run_id, - prior_state_id=prior_state_id, - ) - - # ensure assistants are configured - await self.ensure_configuration_of_conversation_assistants_for_workflow_state( - workflow_run_id=workflow_run_id, - conversation=conversation, - ) - - async def ensure_configuration_of_conversation_for_workflow_state( - self, - workflow_run_id: uuid.UUID, - prior_state_id: str | None = None, - ) -> Conversation: - # get workflow run and definition - workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id) - workflow_definition = await self.get_workflow_definition( - workflow_definition_id=workflow_run.workflow_definition_id - ) - - # get current state - workflow_state = next( - (state for state in workflow_definition.states if state.id == workflow_run.current_state_id), None - ) - if workflow_state is None: - raise exceptions.RuntimeError( - detail="current state not found while ensuring conversation configuration", - ) - - # determine if we should force a new conversation instance - if prior_state_id is not None and workflow_state.force_new_conversation_instance: - # we are both coming from a prior state and the current state requires a new conversation instance - conversation = await self.create_conversation_for_workflow_state( - workflow_run_id=workflow_run_id, - prior_state_id=prior_state_id, - ) - # notify the new conversation that it is a new instance - try: - await self._conversation_controller.create_conversation_message( - conversation_id=conversation.id, - new_message=NewConversationMessage( - message_type=MessageType.notice, - content="new conversation instance created...", - ), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail="failed to inform new conversation of new instance creation", - ) from e - return conversation - - # check if conversation exists - conversation_id = next( - ( - possible_conversation.conversation_id - for possible_conversation in workflow_run.conversation_mappings - if possible_conversation.conversation_definition_id == workflow_state.conversation_definition_id - ), - None, - ) - if conversation_id is None: - # no conversation found, create new conversation - return await self.create_conversation_for_workflow_state( - workflow_run_id=workflow_run_id, - prior_state_id=prior_state_id, - ) - - # get existing conversation - try: - conversation = await self._conversation_controller.get_conversation( - conversation_id=uuid.UUID(conversation_id), - principal=service_user_principals.workflow, - latest_message_types=set(), - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to load conversation while ensuring conversation configuration: {e}", - ) from e - if conversation is None: - raise exceptions.RuntimeError( - detail="conversation not loaded while ensuring conversation configuration", - ) - - # get conversation definition - conversation_definition = next( - ( - possible_conversation_definition - for possible_conversation_definition in workflow_definition.conversation_definitions - if possible_conversation_definition.id == workflow_state.conversation_definition_id - ), - None, - ) - if conversation_definition is None: - raise exceptions.RuntimeError( - detail="conversation definition not found while ensuring conversation configuration", - ) - - # update conversation title if it differs from the state - if conversation.title != conversation_definition.title: - conversation = await self.update_conversation_title( - conversation_id=conversation.id, - new_title=conversation_definition.title, - ) - - return conversation - - async def ensure_configuration_of_conversation_assistants_for_workflow_state( - self, - workflow_run_id: uuid.UUID, - conversation: Conversation, - ) -> None: - # get workflow run and definition - workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id) - workflow_definition = await self.get_workflow_definition( - workflow_definition_id=workflow_run.workflow_definition_id - ) - - # get current state info - workflow_state = next( - (state for state in workflow_definition.states if state.id == workflow_run.current_state_id), None - ) - if workflow_state is None: - raise exceptions.RuntimeError( - detail="current state not found while ensuring assistant configuration", - ) - - # get all current conversation participants - try: - conversation_participants = ( - await self._conversation_controller.get_conversation_participants( - conversation_id=conversation.id, - principal=service_user_principals.workflow, - ) - ).participants - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to load conversation participants while ensuring assistant configuration: {e}", - ) from e - - # get the assistants from conversation participants - assistant_participants = [ - possible_assistant - for possible_assistant in conversation_participants - if possible_assistant.role == ParticipantRole.assistant - ] - - # track changes to assistant mappings - updated_assistant_mappings = workflow_run.assistant_mappings.copy() - - # ensure all assistants are configured - for assistant_data in workflow_state.assistant_data_list: - # get assistant definition - assistant_definition = next( - ( - possible_assistant_definition - for possible_assistant_definition in workflow_definition.assistant_definitions - if possible_assistant_definition.id == assistant_data.assistant_definition_id - ), - None, - ) - if assistant_definition is None: - raise exceptions.RuntimeError( - detail="assistant definition not found while ensuring assistant configuration", - ) - - # check if assistant exists - assistant_id = next( - ( - possible_assistant_mapping.assistant_id - for possible_assistant_mapping in workflow_run.assistant_mappings - if possible_assistant_mapping.assistant_definition_id == assistant_data.assistant_definition_id - ), - None, - ) - if assistant_id is None: - # no assistant found, create new assistant - # create new assistant instance - try: - assistant = await self._assistant_controller.create_assistant( - new_assistant=NewAssistant( - name=assistant_definition.name, - metadata={ - "workflow_run_id": str(workflow_run.id), - }, - assistant_service_id=assistant_definition.assistant_service_id, - ), - user_principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to create assistant while ensuring assistant configuration: {e}", - ) from e - # add assistant to workflow run assistant mappings - updated_assistant_mappings.append( - WorkflowAssistantMapping( - assistant_id=str(assistant.id), - assistant_definition_id=assistant_definition.id, - ) - ) - # add assistant to conversation - try: - await self._conversation_controller.add_or_update_conversation_participant( - conversation_id=conversation.id, - participant_id=str(assistant.id), - update_participant=UpdateParticipant( - active_participant=True, - ), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to add assistant to conversation while ensuring assistant configuration: {e}", - ) from e - # inform conversation of assistant joining - try: - await self._conversation_controller.create_conversation_message( - conversation_id=conversation.id, - new_message=NewConversationMessage( - message_type=MessageType.notice, - content=f"{assistant.name} joined conversation...", - ), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to inform conversation of assistant joining: {e}", - ) from e - else: - # load assistant instance - try: - assistant = await self._assistant_controller.get_assistant( - user_principal=service_user_principals.workflow, - assistant_id=uuid.UUID(assistant_id), - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to load assistant while ensuring assistant configuration: {e}", - ) from e - - # check if assistant exist in conversation already - assistant_participant = next( - ( - possible_assistant - for possible_assistant in assistant_participants - if possible_assistant.id == str(assistant_id) - ), - None, - ) - if assistant_participant is None: - # assistant is not in the conversation - # add assistant to conversation - try: - await self._conversation_controller.add_or_update_conversation_participant( - conversation_id=conversation.id, - participant_id=str(assistant.id), - update_participant=UpdateParticipant( - active_participant=True, - ), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=( - f"failed to add assistant to conversation while ensuring assistant configuration: {e}" - ), - ) from e - # inform conversation of assistant joining - try: - await self._conversation_controller.create_conversation_message( - conversation_id=conversation.id, - new_message=NewConversationMessage( - message_type=MessageType.notice, - content=f"{assistant.name} joined conversation...", - ), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to inform conversation of assistant joining: {e}", - ) from e - if assistant is None: - raise exceptions.RuntimeError( - detail="assistant not loaded while ensuring assistant configuration", - ) - - # update assistant name if it differs from the state - if assistant.name != assistant_definition.name: - current_name = assistant.name - # update assistant name - try: - assistant = await self._assistant_controller.update_assistant( - user_principal=service_user_principals.workflow, - assistant_id=assistant.id, - update_assistant=UpdateAssistant( - name=assistant_definition.name, - ), - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to update assistant name while ensuring assistant configuration: {e}", - ) from e - # add assistant name change to conversation - try: - await self._conversation_controller.create_conversation_message( - conversation_id=conversation.id, - new_message=NewConversationMessage( - message_type=MessageType.notice, - content=f"{current_name} changed name to {assistant.name}", - ), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to add assistant name change to conversation: {e}", - ) from e - - # check to see if assistant config differs from the state config - try: - assistant_config = await self._assistant_controller.get_assistant_config( - user_principal=service_user_principals.workflow, - assistant_id=assistant.id, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to load assistant config while ensuring assistant configuration: {e}", - ) from e - if assistant_config.config != assistant_data.config_data: - # update assistant config to match state config - try: - await self._assistant_controller.update_assistant_config( - user_principal=service_user_principals.workflow, - assistant_id=assistant.id, - updated_config=ConfigPutRequestModel( - config=assistant_data.config_data, - ), - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to update assistant config while ensuring assistant configuration: {e}", - ) from e - - # inform conversation of config change - try: - await self._conversation_controller.create_conversation_message( - conversation_id=conversation.id, - new_message=NewConversationMessage( - message_type=MessageType.notice, - content=f"{assistant.name} config updated...", - ), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to inform conversation of assistant config change: {e}", - ) from e - - # update workflow run assistant mappings - try: - workflow_run = await self.update_workflow_run_mappings( - workflow_run_id=workflow_run.id, - update_workflow_run_mappings=UpdateWorkflowRunMappings( - assistant_mappings=updated_assistant_mappings, - ), - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to update workflow run assistant mappings: {e}", - ) from e - - # remove assistants not in the state - for assistant_participant in assistant_participants: - # check if assistant is in the state - assistant_mapping = next( - ( - possible_assistant_mapping - for possible_assistant_mapping in workflow_run.assistant_mappings - if possible_assistant_mapping.assistant_id == assistant_participant.id - ), - None, - ) - if assistant_mapping is None: - raise exceptions.RuntimeError( - detail=f"assistant mapping not found while removing {assistant_participant.name} from conversation", - ) - assistant_data = next( - ( - possible_assistant_data - for possible_assistant_data in workflow_state.assistant_data_list - if possible_assistant_data.assistant_definition_id == assistant_mapping.assistant_definition_id - ), - None, - ) - if assistant_data is None: - # assistant is not in the state - # remove assistant from conversation - try: - await self._conversation_controller.add_or_update_conversation_participant( - conversation_id=conversation.id, - participant_id=assistant_participant.id, - update_participant=UpdateParticipant( - active_participant=False, - ), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=( - ( - f"failed to remove {assistant_participant.name} from conversation" - " while ensuring assistant configuration" - ), - ), - ) from e - # inform conversation of assistant leaving - try: - await self._conversation_controller.create_conversation_message( - conversation_id=conversation.id, - new_message=NewConversationMessage( - message_type=MessageType.notice, - content=f"{assistant_participant.name} left conversation...", - ), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to inform conversation of {assistant_participant.name} leaving", - ) from e - - async def perform_transition_if_applicable( - self, - workflow_run_id: uuid.UUID, - ) -> None: - # get workflow run and definition - workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id) - workflow_definition = await self.get_workflow_definition( - workflow_definition_id=workflow_run.workflow_definition_id - ) - - # get current state info - workflow_state = next( - (state for state in workflow_definition.states if state.id == workflow_run.current_state_id), None - ) - if workflow_state is None: - raise exceptions.RuntimeError( - detail="current state not found while evaluating transitions", - ) - - try: - current_conversation_id = self.get_conversation_id_for_workflow_state( - workflow_run=workflow_run, - workflow_definition=workflow_definition, - target_state_id=workflow_run.current_state_id, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to get conversation id for current state while evaluating transitions: {e}", - ) from e - - # get chat history - try: - chat_history = await self.get_formatted_chat_history( - principal=service_user_principals.workflow, - conversation_id=current_conversation_id, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to get chat history while evaluating transitionsJ: {e}", - ) from e - - # evaluate outlets - for outlet in workflow_state.outlets: - if outlet.prompts.evaluate_transition is None or outlet.prompts.evaluate_transition.strip() == "": - continue - - # evaluate outlet condition - should_transition, metadata = await self.execute_transition_evaluation_query( - transition_evaluation_prompt=outlet.prompts.evaluate_transition, - chat_history=chat_history, - ) - - # create a log message with the evaluation result - try: - await self._conversation_controller.create_conversation_message( - conversation_id=uuid.UUID(current_conversation_id), - new_message=NewConversationMessage( - message_type=MessageType.log, - content=f"Transition evaluation result [outlet: {outlet.label}]: {should_transition}", - metadata=metadata, - ), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to create log message with evaluation result: {e}", - ) from e - - if not should_transition: - continue - - # get transition info - transition = next( - ( - possible_transition - for possible_transition in workflow_definition.transitions - if possible_transition.source_outlet_id == outlet.id - ), - None, - ) - - # determine target state id - if transition is not None: - target_state_id = transition.target_state_id - else: - # no transition found, default to start state - target_state_id = workflow_definition.start_state_id - - # transition to target state - try: - workflow_run = await self.switch_workflow_run_state( - workflow_run_id=workflow_run.id, - target_state_id=target_state_id, - metadata=metadata, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to transition to target state: {e}", - ) from e - - async def transfer_context_if_applicable( - self, - workflow_run_id: uuid.UUID, - source_state_id: str, - target_state_id: str, - ) -> None: - # check if there is a transition to target state - transition = await self.find_transition_by_states( - workflow_run_id=workflow_run_id, - source_state_id=source_state_id, - target_state_id=target_state_id, - ) - - if transition is None: - # no transition found, exit early - return - - # get workflow run and definition - workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id) - workflow_definition = await self.get_workflow_definition( - workflow_definition_id=workflow_run.workflow_definition_id - ) - - # get source state info - source_state = next((state for state in workflow_definition.states if state.id == source_state_id), None) - if source_state is None: - raise exceptions.RuntimeError( - detail="source state not found while transferring context", - ) - - # get outlet for transition - outlet = next( - ( - possible_outlet - for possible_outlet in source_state.outlets - if possible_outlet.id == transition.source_outlet_id - ), - None, - ) - if outlet is None: - raise exceptions.RuntimeError( - detail="outlet not found while transferring context", - ) - - # get context transfer prompt - context_transfer_request = outlet.prompts.context_transfer - if context_transfer_request is None or context_transfer_request.strip() == "": - # no context transfer prompt, exit early - return - - # get conversation id for source state - source_conversation_id = self.get_conversation_id_for_workflow_state( - workflow_run=workflow_run, - workflow_definition=workflow_definition, - target_state_id=source_state_id, - ) - - # get conversation id for target state - target_conversation_id = self.get_conversation_id_for_workflow_state( - workflow_run=workflow_run, - workflow_definition=workflow_definition, - target_state_id=target_state_id, - ) - - if source_conversation_id == target_conversation_id: - # no need to transfer context to the same conversation - return - - # proceed to execute context transfer - - # get chat history - try: - chat_history = await self.get_formatted_chat_history( - principal=service_user_principals.workflow, - conversation_id=source_conversation_id, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to get chat history while transferring context: {e}", - ) from e - - # generate context transfer response - try: - context_transfer_response, metadata = await self.execute_context_transfer_generation( - context_transfer_instruction=workflow_definition.context_transfer_instruction, - context_transfer_request=context_transfer_request, - chat_history=chat_history, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to execute context transfer generation: {e}", - ) from e - - # send context transfer response as message to conversation - try: - await self._conversation_controller.create_conversation_message( - conversation_id=uuid.UUID(target_conversation_id), - new_message=NewConversationMessage( - message_type=MessageType.chat, - content=context_transfer_response, - metadata=metadata, - ), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to send context transfer response as message to conversation: {e}", - ) from e - - async def execute_transition_evaluation_query( - self, - transition_evaluation_prompt: str, - chat_history: list[str], - ) -> Tuple[bool, dict[str, Any]]: - # history message content - history_message_content = "" + "\n".join(chat_history) + "" - - # instruction evaluation prompt - instruction_message_content = ( - "Evaluate the and determine if the following " - f" has occurred: {transition_evaluation_prompt}" - ) - - # format message content - format_message_content = "Answer only with 'true' or 'false'." - - model = settings.workflow.azure_openai_deployment - messages: list[openai.types.chat.ChatCompletionMessageParam] = [ - { - "role": "system", - "name": "system", - "content": history_message_content, - }, - { - "role": "user", - "name": "user", - "content": instruction_message_content, - }, - { - "role": "system", - "name": "system", - "content": format_message_content, - }, - ] - - try: - async with azure_openai_client() as client: - completion = await client.chat.completions.create( - model=model, - temperature=0.0, - messages=messages, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed calling Azure OpenAI service while evaluating transitions: {e}", - ) from e - - completion_content = completion.choices[0].message.content - - value = False if completion_content is None else completion_content.lower().strip() == "true" - - return ( - value, - { - "debug": { - "model": model, - "completion_request": messages, - "completion_response": completion.model_dump(), - } - }, - ) - - async def execute_context_transfer_generation( - self, - context_transfer_instruction: str, - context_transfer_request: str, - chat_history: list[str], - ) -> Tuple[str, dict[str, Any]]: - # history message content - history_message_content = "" + "\n".join(chat_history) + "" - - # instruction evaluation prompt - instruction_message_content = ( - "Generate a response based on the and the following ." - f" {context_transfer_instruction}:" - f" {context_transfer_request}" - ) - - model = settings.workflow.azure_openai_deployment - messages: list[openai.types.chat.ChatCompletionMessageParam] = [ - { - "role": "system", - "name": "system", - "content": history_message_content, - }, - { - "role": "user", - "name": "user", - "content": instruction_message_content, - }, - ] - - try: - async with azure_openai_client() as client: - completion = await client.chat.completions.create( - model=model, - temperature=0.0, - messages=messages, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed calling Azure OpenAI service while generating context transfer: {e}", - ) from e - - completion_content = completion.choices[0].message.content - - return ( - completion_content or "", - { - "debug": { - "model": model, - "completion_request": messages, - "completion_response": completion.model_dump(), - } - }, - ) - - async def get_formatted_chat_history( - self, - principal: auth.ActorPrincipal, - conversation_id: str, - ) -> list[str]: - # build context for evaluating transitions - try: - messages = await self._conversation_controller.get_messages( - conversation_id=uuid.UUID(conversation_id), - principal=principal, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to load messages while evaluating transitions: {e}", - ) from e - - try: - participants = await self._conversation_controller.get_conversation_participants( - conversation_id=uuid.UUID(conversation_id), - principal=principal, - include_inactive=True, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to load participants while evaluating transitions: {e}", - ) from e - - def format_message_for_history(message: ConversationMessage) -> str: - participant = next( - ( - possible_participant - for possible_participant in participants.participants - if possible_participant.id == message.sender.participant_id - ), - None, - ) - if participant is None: - raise exceptions.RuntimeError( - detail="participant not found while formatting message for history", - ) - return f"{participant.name}: {message.content}" - - chat_history = [ - format_message_for_history(message) - for message in messages.messages - if message.message_type == MessageType.chat - ] - - max_tokens = 4000 * 4 # roughly 4 characters per token - # remove from the beginning of the history until the length is less than max_tokens - while len("".join(chat_history)) > max_tokens: - chat_history.pop(0) - - return chat_history - - async def deactivate_all_assistants_in_conversation( - self, workflow_run: WorkflowRun, conversation_id: uuid.UUID - ) -> None: - # get conversation participants - try: - participants = await self._conversation_controller.get_conversation_participants( - conversation_id=conversation_id, - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to get conversation participants while setting all assistants inactive: {e}", - ) from e - - # set all assistants to inactive - for participant in participants.participants: - if not participant.active_participant or participant.id not in [ - mapping.assistant_id for mapping in workflow_run.assistant_mappings - ]: - continue - - try: - await self._conversation_controller.add_or_update_conversation_participant( - conversation_id=conversation_id, - participant_id=participant.id, - update_participant=UpdateParticipant( - active_participant=False, - ), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to set assistant {participant.id} to inactive: {e}", - ) from e - - # inform conversation of assistant leaving - try: - await self._conversation_controller.create_conversation_message( - conversation_id=conversation_id, - new_message=NewConversationMessage( - message_type=MessageType.notice, - content=f"{participant.name} left conversation...", - ), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to inform conversation of assistant leaving: {e}", - ) from e - - async def find_transition_by_states( - self, - workflow_run_id: uuid.UUID, - source_state_id: str, - target_state_id: str, - ) -> WorkflowTransition | None: - # get workflow run and definition - workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id) - workflow_definition = await self.get_workflow_definition( - workflow_definition_id=workflow_run.workflow_definition_id - ) - - # check outlets for transitions to target state - for source_state in workflow_definition.states: - if source_state.id != source_state_id: - continue - - for outlet in source_state.outlets: - # check if outlet has a transition to target state - transition = next( - ( - possible_transition - for possible_transition in workflow_definition.transitions - if possible_transition.source_outlet_id == outlet.id - and possible_transition.target_state_id == target_state_id - ), - None, - ) - if transition is not None: - return transition - - # if no transition found, return a default transition to the start state - # use the start_state_id as id in case it is needed again (should not be) - return WorkflowTransition( - id=workflow_definition.start_state_id, - source_outlet_id=outlet.id, - target_state_id=workflow_definition.start_state_id, - ) - - async def create_conversation_for_workflow_state( - self, - workflow_run_id: uuid.UUID, - prior_state_id: str | None = None, - ) -> Conversation: - # get workflow run and definition - workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id) - workflow_definition = await self.get_workflow_definition( - workflow_definition_id=workflow_run.workflow_definition_id - ) - - # get current state info - workflow_state = next( - (state for state in workflow_definition.states if state.id == workflow_run.current_state_id), None - ) - if workflow_state is None: - raise exceptions.RuntimeError( - detail="current state not found while ensuring conversation configuration", - ) - - # get conversation definition for workflow state - conversation_definition = next( - ( - possible_conversation_definition - for possible_conversation_definition in workflow_definition.conversation_definitions - if possible_conversation_definition.id == workflow_state.conversation_definition_id - ), - None, - ) - if conversation_definition is None: - raise exceptions.RuntimeError( - detail="conversation definition not found while ensuring conversation configuration", - ) - - # create the conversation - try: - conversation = await self._conversation_controller.create_conversation( - new_conversation=NewConversation( - title=conversation_definition.title, - metadata={ - "workflow_run_id": str(workflow_run.id), - }, - ), - user_principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to create conversation while ensuring conversation configuration: {e}", - ) from e - - # add workflow participant so that it can send messages - try: - await self._conversation_controller.add_or_update_conversation_participant( - conversation_id=conversation.id, - participant_id=service_user_principals.workflow.user_id, - update_participant=UpdateParticipant( - active_participant=True, - ), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to add workflow participant to conversation: {e}", - ) from e - - if prior_state_id is not None: - # get conversation id for prior state - prior_conversation_id = self.get_conversation_id_for_workflow_state( - workflow_run=workflow_run, - workflow_definition=workflow_definition, - target_state_id=prior_state_id, - ) - - # if conversation ids are different, copy user participants from prior conversation - if prior_conversation_id != str(conversation.id): - await self.copy_conversation_user_participants( - source_conversation_id=uuid.UUID(prior_conversation_id), - target_conversation_id=conversation.id, - deactivate_participants=True, - ) - - # remove any existing mappings for the conversation definition - workflow_run.conversation_mappings = [ - mapping - for mapping in workflow_run.conversation_mappings - if mapping.conversation_definition_id != conversation_definition.id - ] - # add new mapping - workflow_run.conversation_mappings.append( - WorkflowConversationMapping( - conversation_id=str(conversation.id), - conversation_definition_id=conversation_definition.id, - ) - ) - - # update workflow run conversation mappings - try: - await self.update_workflow_run_mappings( - workflow_run_id=workflow_run.id, - update_workflow_run_mappings=UpdateWorkflowRunMappings( - conversation_mappings=workflow_run.conversation_mappings, - ), - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to update workflow run conversation mappings: {e}", - ) from e - - return conversation - - async def update_conversation_title( - self, - conversation_id: uuid.UUID, - new_title: str, - ) -> Conversation: - # get conversation - try: - conversation = await self._conversation_controller.get_conversation( - conversation_id=conversation_id, - principal=service_user_principals.workflow, - latest_message_types=set(), - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to load conversation while ensuring conversation configuration: {e}", - ) from e - if conversation is None: - raise exceptions.RuntimeError( - detail="conversation not loaded while ensuring conversation configuration", - ) - - # track current conversation title - current_conversation_title = conversation.title - - # update conversation title - try: - conversation = await self._conversation_controller.update_conversation( - conversation_id=conversation.id, - update_conversation=UpdateConversation( - title=new_title, - ), - user_principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to update conversation title while ensuring conversation configuration: {e}", - ) from e - - # add conversation title change to conversation - try: - await self._conversation_controller.create_conversation_message( - conversation_id=conversation.id, - new_message=NewConversationMessage( - message_type=MessageType.notice, - content=f"conversation title changed from {current_conversation_title} to {new_title}", - ), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to add conversation title change to conversation: {e}", - ) from e - - return conversation - - async def copy_conversation_user_participants( - self, - source_conversation_id: uuid.UUID, - target_conversation_id: uuid.UUID, - deactivate_participants: bool = False, - ) -> None: - # get conversation participants - try: - participants = await self._conversation_controller.get_conversation_participants( - conversation_id=source_conversation_id, - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to get conversation participants while copying: {e}", - ) from e - - # copy participants to target conversation - for participant in participants.participants: - if participant.role != ParticipantRole.user: - continue - - try: - await self._conversation_controller.add_or_update_conversation_participant( - conversation_id=target_conversation_id, - participant_id=participant.id, - update_participant=UpdateParticipant( - active_participant=False if deactivate_participants else participant.active_participant, - ), - principal=service_user_principals.workflow, - ) - except Exception as e: - raise exceptions.RuntimeError( - detail=f"failed to copy participant {participant.id} to target conversation: {e}", - ) from e - - async def get_workflow_run_id_for_conversation(self, conversation_id: uuid.UUID) -> str | None: - # get from db direct since we don't need to check permissions and don't have the user principal - async with self._get_session() as session: - possible_conversation = ( - await session.exec(select(db.Conversation).where(db.Conversation.conversation_id == conversation_id)) - ).one_or_none() - if possible_conversation is None: - return None - - if "workflow_run_id" in possible_conversation.meta_data: - return possible_conversation.meta_data["workflow_run_id"] - - async def update_workflow_run_mappings( - self, - workflow_run_id: uuid.UUID, - update_workflow_run_mappings: UpdateWorkflowRunMappings, - ) -> WorkflowRun: - async with self._get_session() as session: - workflow_run = ( - await session.exec( - query.select(db.WorkflowRun) - .where(db.WorkflowRun.workflow_run_id == workflow_run_id) - .with_for_update() - ) - ).one_or_none() - if workflow_run is None: - raise exceptions.NotFoundError() - - data = workflow_run.data.copy() - - if update_workflow_run_mappings.conversation_mappings is not None: - data["conversation_mappings"] = [ - mapping.model_dump() for mapping in update_workflow_run_mappings.conversation_mappings - ] - - if update_workflow_run_mappings.assistant_mappings is not None: - data["assistant_mappings"] = [ - mapping.model_dump() for mapping in update_workflow_run_mappings.assistant_mappings - ] - - workflow_run.data = data - - session.add(workflow_run) - await session.commit() - await session.refresh(workflow_run) - - return convert.workflow_run_from_db(model=workflow_run) - - def get_conversation_id_for_workflow_state( - self, - workflow_run: WorkflowRun, - workflow_definition: WorkflowDefinition, - target_state_id: str, - ) -> str: - target_state = next((state for state in workflow_definition.states if state.id == target_state_id), None) - if target_state is None: - raise exceptions.RuntimeError( - detail="target state not found while getting conversation id", - ) - - conversation_id = next( - ( - possible_conversation.conversation_id - for possible_conversation in workflow_run.conversation_mappings - if possible_conversation.conversation_definition_id == target_state.conversation_definition_id - ), - None, - ) - if conversation_id is None: - raise exceptions.RuntimeError( - detail="conversation mapping not found while getting conversation id", - ) - - return conversation_id - - -_azure_bearer_token_provider = get_bearer_token_provider( - DefaultAzureCredential(), - "https://cognitiveservices.azure.com/.default", -) - - -def azure_openai_client() -> AsyncAzureOpenAI: - return AsyncAzureOpenAI( - azure_endpoint=settings.workflow.azure_openai_endpoint, - azure_deployment=settings.workflow.azure_openai_deployment, - api_version=settings.workflow.azure_openai_api_version, - azure_ad_token_provider=_azure_bearer_token_provider, - ) diff --git a/workbench-service/semantic_workbench_service/db.py b/workbench-service/semantic_workbench_service/db.py index e13b2ddc..288134c4 100644 --- a/workbench-service/semantic_workbench_service/db.py +++ b/workbench-service/semantic_workbench_service/db.py @@ -56,16 +56,6 @@ def on_update(self, session: Session) -> None: participant.service_user = self.service_user session.add(participant) - # update WorkflowUserParticipants for this user - participants = session.exec( - select(WorkflowUserParticipant).where(WorkflowUserParticipant.user_id == self.user_id), - ) - for participant in participants: - participant.name = self.name - participant.image = self.image - participant.service_user = self.service_user - session.add(participant) - class AssistantServiceRegistration(SQLModel, table=True): assistant_service_id: str = Field(primary_key=True) @@ -214,6 +204,9 @@ class AssistantParticipant(SQLModel, table=True): status: str | None = None status_updated_datetime: datetime.datetime = date_time_default_to_now() active_participant: bool = True + meta_data: dict[str, Any] = Field( + sa_column=sqlalchemy.Column("metadata", sqlalchemy.JSON, server_default="{}", nullable=False), default={} + ) # this relationship is needed to enforce correct INSERT order by SQLModel related_conversation: Conversation = Relationship() @@ -254,6 +247,9 @@ class UserParticipant(SQLModel, table=True): status: str | None = None status_updated_datetime: datetime.datetime = date_time_default_to_now() active_participant: bool = True + meta_data: dict[str, Any] = Field( + sa_column=sqlalchemy.Column("metadata", sqlalchemy.JSON, server_default="{}", nullable=False), default={} + ) conversation_permission: str # this relationship is needed to enforce correct INSERT order by SQLModel @@ -372,68 +368,6 @@ class FileVersion(SQLModel, table=True): related_file: File = Relationship() -class WorkflowDefinition(SQLModel, table=True): - workflow_definition_id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True) - data: dict[str, Any] = Field(sa_column=sqlalchemy.Column("data", sqlalchemy.JSON)) - - -class WorkflowUserParticipant(SQLModel, table=True): - workflow_definition_id: uuid.UUID = Field( - sa_column=sqlalchemy.Column( - sqlalchemy.ForeignKey( - "workflowdefinition.workflow_definition_id", - name="fk_workflowuserparticipant_workflowdefinition", - ondelete="CASCADE", - ), - primary_key=True, - ), - ) - user_id: str = Field(primary_key=True) - name: str = "" - image: str | None = None - service_user: bool = False - active_participant: bool = True - - # this relationship is needed to enforce correct INSERT order by SQLModel - related_workflow_definition: WorkflowDefinition = Relationship() - - def on_update(self, session: Session) -> None: - """Update this participant to match the related user, if one exists.""" - user = session.exec(select(User).where(User.user_id == self.user_id)).one_or_none() - if user is None: - return - - sqlalchemy.orm.attributes.set_attribute(self, "name", user.name) - sqlalchemy.orm.attributes.set_attribute(self, "image", user.image) - sqlalchemy.orm.attributes.set_attribute(self, "service_user", user.service_user) - - def on_insert(self, session: Session) -> None: - """Update this participant to match the related user, requiring one to exist.""" - user = session.exec(select(User).where(User.user_id == self.user_id)).one() - - sqlalchemy.orm.attributes.set_attribute(self, "name", user.name) - sqlalchemy.orm.attributes.set_attribute(self, "image", user.image) - sqlalchemy.orm.attributes.set_attribute(self, "service_user", user.service_user) - - -class WorkflowRun(SQLModel, table=True): - workflow_run_id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True) - workflow_definition_id: uuid.UUID = Field( - sa_column=sqlalchemy.Column( - sqlalchemy.ForeignKey( - "workflowdefinition.workflow_definition_id", - name="fk_workflowrun_workflowdefinition", - ondelete="CASCADE", - ), - nullable=False, - ), - ) - data: dict[str, Any] = Field(sa_column=sqlalchemy.Column("data", sqlalchemy.JSON)) - - # this relationship is needed to enforce correct INSERT order by SQLModel - related_workflow_definition: WorkflowDefinition = Relationship() - - NAMING_CONVENTION = { "ix": "ix_%(column_0_label)s", "uq": "uq_%(table_name)s_%(column_0_N_name)s", diff --git a/workbench-service/semantic_workbench_service/middleware.py b/workbench-service/semantic_workbench_service/middleware.py index 5f1ed990..0bae4950 100644 --- a/workbench-service/semantic_workbench_service/middleware.py +++ b/workbench-service/semantic_workbench_service/middleware.py @@ -35,8 +35,8 @@ async def _assistant_service_principal_from_request( assistant_service_id = assistant_service_params.assistant_service_id api_key = assistant_service_params.api_key - assistant_instance_params = workbench_service_client.AssistantInstanceRequestHeaders.from_headers(request.headers) - assistant_id = assistant_instance_params.assistant_id + assistant_params = workbench_service_client.AssistantRequestHeaders.from_headers(request.headers) + assistant_id = assistant_params.assistant_id expected_api_key = await api_key_source(assistant_service_id) if expected_api_key is None: diff --git a/workbench-service/semantic_workbench_service/query.py b/workbench-service/semantic_workbench_service/query.py index 7c8e01a7..83bbf9a0 100644 --- a/workbench-service/semantic_workbench_service/query.py +++ b/workbench-service/semantic_workbench_service/query.py @@ -2,7 +2,7 @@ from semantic_workbench_api_model.workbench_model import MessageType from sqlalchemy import Function -from sqlmodel import String, and_, cast, col, func, literal, or_, select +from sqlmodel import and_, col, func, literal, or_, select from sqlmodel.sql.expression import Select, SelectOfScalar from . import auth, db, settings @@ -20,22 +20,6 @@ def select_assistants_for( return select(db.Assistant).where( or_( db.Assistant.owner_id == user_principal.user_id, - col(db.Assistant.assistant_id).in_( - select(db.Assistant.assistant_id) - .join( - db.WorkflowRun, - cast(json_extract_path(db.Assistant.meta_data, "workflow_run_id"), String) - == cast(db.WorkflowRun.workflow_run_id, String), - ) - .join( - db.WorkflowUserParticipant, - and_( - db.WorkflowUserParticipant.workflow_definition_id == db.WorkflowRun.workflow_definition_id, - db.WorkflowUserParticipant.user_id == user_principal.user_id, - ), - ) - .distinct() - ), and_( include_assistants_from_conversations is True, col(db.Assistant.assistant_id).in_( @@ -237,34 +221,3 @@ def select_conversation_message_debugs_for( .join(db.AssistantParticipant) .where(db.AssistantParticipant.assistant_id == principal.assistant_id) ) - - -def select_workflow_definitions_for( - user_principal: auth.UserPrincipal, - include_inactive: bool = False, -) -> SelectOfScalar[db.WorkflowDefinition]: - query = ( - select(db.WorkflowDefinition) - .join(db.WorkflowUserParticipant) - .where(db.WorkflowUserParticipant.user_id == user_principal.user_id) - ) - if not include_inactive: - query = query.where(col(db.WorkflowUserParticipant.active_participant).is_(True)) - - return query - - -def select_workflow_runs_for( - user_principal: auth.UserPrincipal, - include_inactive: bool = False, -) -> SelectOfScalar[db.WorkflowRun]: - query = ( - select(db.WorkflowRun) - .join(db.WorkflowDefinition) - .join(db.WorkflowUserParticipant) - .where(db.WorkflowUserParticipant.user_id == user_principal.user_id) - ) - if not include_inactive: - query = query.where(col(db.WorkflowUserParticipant.active_participant).is_(True)) - - return query diff --git a/workbench-service/semantic_workbench_service/service.py b/workbench-service/semantic_workbench_service/service.py index c449b3e0..3b6f40ce 100644 --- a/workbench-service/semantic_workbench_service/service.py +++ b/workbench-service/semantic_workbench_service/service.py @@ -68,8 +68,6 @@ NewConversation, NewConversationMessage, NewConversationShare, - NewWorkflowDefinition, - NewWorkflowRun, ParticipantRole, UpdateAssistant, UpdateAssistantServiceRegistration, @@ -77,15 +75,8 @@ UpdateConversation, UpdateParticipant, UpdateUser, - UpdateWorkflowDefinition, - UpdateWorkflowParticipant, - UpdateWorkflowRun, User, UserList, - WorkflowDefinition, - WorkflowDefinitionList, - WorkflowRun, - WorkflowRunList, ) from sqlmodel import col, select from sqlmodel.ext.asyncio.session import AsyncSession @@ -295,12 +286,6 @@ async def _notify_user_event(conversation_id: uuid.UUID) -> None: notify_event=_notify_event, file_storage=files.Storage(settings.storage), ) - workflow_controller = controller.WorkflowController( - get_session=_controller_get_session, - assistant_controller=assistant_controller, - conversation_controller=conversation_controller, - ) - conversation_controller.register_message_previewer(workflow_controller.preview_message) @asynccontextmanager async def _lifespan() -> AsyncIterator[None]: @@ -1087,119 +1072,6 @@ async def list_conversation_share_redemptions_for_user( user_principal=user_principal, ) - @app.post("/workflow-definitions") - async def create_workflow( - user_principal: auth.DependsUserPrincipal, - new_workflow_definition: NewWorkflowDefinition, - ) -> WorkflowDefinition: - return await workflow_controller.create_workflow_definition( - user_principal=user_principal, new_workflow_definition=new_workflow_definition - ) - - @app.patch("/workflow-definitions/{workflow_definition_id}") - async def update_workflow( - user_principal: auth.DependsUserPrincipal, - workflow_definition_id: uuid.UUID, - update_workflow_definition: UpdateWorkflowDefinition, - ) -> WorkflowDefinition: - return await workflow_controller.update_workflow_definition( - user_principal=user_principal, - workflow_definition_id=workflow_definition_id, - update_workflow_definition=update_workflow_definition, - ) - - @app.get("/workflow-definitions/defaults") - async def get_workflow_definition_defaults() -> NewWorkflowDefinition: - return await workflow_controller.get_workflow_definition_defaults() - - @app.get("/workflow-definitions") - async def list_workflows( - user_principal: auth.DependsUserPrincipal, - ) -> WorkflowDefinitionList: - return await workflow_controller.get_workflow_definitions(user_principal=user_principal) - - @app.get("/workflow-definitions/{workflow_definition_id}") - async def get_workflow(workflow_definition_id: uuid.UUID) -> WorkflowDefinition: - return await workflow_controller.get_workflow_definition(workflow_definition_id=workflow_definition_id) - - @app.patch("/workflow-definitions/{workflow_definition_id}/participants/{participant_id}") - @app.put("/workflow-definitions/{workflow_definition_id}/participants/{participant_id}") - async def add_or_update_workflow_participant( - workflow_definition_id: uuid.UUID, - participant_id: str, - update_participant: UpdateWorkflowParticipant, - user_principal: auth.DependsUserPrincipal, - ) -> None: - if participant_id == "me": - participant_id = user_principal.user_id - - await workflow_controller.add_or_update_workflow_participant( - workflow_definition_id=workflow_definition_id, - participant_id=participant_id, - update_participant=update_participant, - ) - - @app.get("/workflow-runs") - async def list_workflow_runs( - user_principal: auth.DependsUserPrincipal, - workflow_definition_id: uuid.UUID | None = None, - ) -> WorkflowRunList: - return await workflow_controller.get_workflow_runs( - user_principal=user_principal, workflow_definition_id=workflow_definition_id - ) - - @app.get("/workflow-runs/{workflow_run_id}") - async def get_workflow_run(workflow_run_id: uuid.UUID) -> WorkflowRun: - return await workflow_controller.get_workflow_run(workflow_run_id=workflow_run_id) - - @app.post("/workflow-runs") - async def create_workflow_run( - new_workflow_run: NewWorkflowRun, - ) -> WorkflowRun: - return await workflow_controller.create_workflow_run( - new_workflow_run=new_workflow_run, - ) - - @app.patch("/workflow-runs/{workflow_run_id}") - async def update_workflow_run( - workflow_run_id: uuid.UUID, - update_workflow_run: UpdateWorkflowRun, - ) -> WorkflowRun: - return await workflow_controller.update_workflow_run( - workflow_run_id=workflow_run_id, - update_workflow_run=update_workflow_run, - ) - - @app.get("/workflow-runs/{workflow_run_id}/assistants") - async def get_workflow_run_assistants( - workflow_run_id: uuid.UUID, - user_principal: auth.DependsUserPrincipal, - ) -> AssistantList: - return await workflow_controller.get_workflow_run_assistants( - user_principal=user_principal, - workflow_run_id=workflow_run_id, - ) - - @app.post("/workflow-runs/{workflow_run_id}/switch-state") - async def switch_workflow_run_state( - workflow_run_id: uuid.UUID, - state_id: str, - ) -> WorkflowRun: - return await workflow_controller.switch_workflow_run_state( - workflow_run_id=workflow_run_id, - target_state_id=state_id, - ) - - @app.delete("/workflow-runs/{workflow_run_id}", status_code=status.HTTP_204_NO_CONTENT) - async def delete_workflow_run( - user_principal: auth.DependsUserPrincipal, - workflow_run_id: uuid.UUID, - ) -> None: - await workflow_controller.delete_workflow_run( - user_principal=user_principal, - workflow_run_id=workflow_run_id, - ) - @app.get("/azure-speech/token") async def get_azure_speech_token() -> dict[str, str]: return azure_speech.get_token() diff --git a/workbench-service/tests/test_files.py b/workbench-service/tests/test_files.py index 504909ba..11208270 100644 --- a/workbench-service/tests/test_files.py +++ b/workbench-service/tests/test_files.py @@ -9,10 +9,13 @@ def test_read_file_not_found(storage_settings: files.StorageSettings) -> None: file_storage = files.Storage(settings=storage_settings) - with pytest.raises(FileNotFoundError), file_storage.read_file( - namespace="conversation_id", - filename="filename", - ) as f: + with ( + pytest.raises(FileNotFoundError), + file_storage.read_file( + namespace="conversation_id", + filename="filename", + ) as f, + ): f.read() diff --git a/workbench-service/tests/test_workbench_service.py b/workbench-service/tests/test_workbench_service.py index b520d8ac..9d636980 100644 --- a/workbench-service/tests/test_workbench_service.py +++ b/workbench-service/tests/test_workbench_service.py @@ -820,7 +820,7 @@ def test_create_assistant_send_assistant_message( assistant_service_id=registration.assistant_service_id, api_key=registration.api_key or "", ).to_headers(), - **workbench_service_client.AssistantInstanceRequestHeaders( + **workbench_service_client.AssistantRequestHeaders( assistant_id=assistant_id, ).to_headers(), } @@ -1417,7 +1417,7 @@ def test_conversation_not_visible_to_non_participants( assistant_service_id=registration.assistant_service_id, api_key=registration.api_key or "", ).to_headers(), - **workbench_service_client.AssistantInstanceRequestHeaders( + **workbench_service_client.AssistantRequestHeaders( assistant_id=assistant_response.id, ).to_headers(), } @@ -1425,59 +1425,6 @@ def test_conversation_not_visible_to_non_participants( assert http_response.status_code == httpx.codes.NOT_FOUND -def test_create_update_workflow(workbench_service: FastAPI, test_user: MockUser) -> None: - with TestClient(app=workbench_service, headers=test_user.authorization_headers) as client: - new_workflow = workbench_model.NewWorkflowDefinition( - label="", - start_state_id="", - states=[], - conversation_definitions=[], - assistant_definitions=[], - transitions=[], - context_transfer_instruction="", - ) - http_response = client.post("/workflow-definitions", json=new_workflow.model_dump(mode="json")) - assert httpx.codes.is_success(http_response.status_code) - - created_workflow = workbench_model.WorkflowDefinition.model_validate(http_response.json()) - assert created_workflow.label == "" - - http_response = client.get(f"/workflow-definitions/{created_workflow.id}") - assert httpx.codes.is_success(http_response.status_code) - - retrieved_workflow = workbench_model.WorkflowDefinition.model_validate(http_response.json()) - - assert retrieved_workflow == created_workflow - - http_response = client.get("/workflow-definitions") - assert httpx.codes.is_success(http_response.status_code) - - retrieved_workflows = workbench_model.WorkflowDefinitionList.model_validate(http_response.json()) - - assert retrieved_workflows.workflow_definitions == [created_workflow] - - updated_workflow = workbench_model.UpdateWorkflowDefinition( - label="updated", - start_state_id="", - states=[], - conversation_definitions=[], - assistant_definitions=[], - transitions=[], - context_transfer_instruction="", - ) - http_response = client.patch( - f"/workflow-definitions/{created_workflow.id}", - json=updated_workflow.model_dump(mode="json"), - ) - - http_response = client.get(f"/workflow-definitions/{created_workflow.id}") - assert httpx.codes.is_success(http_response.status_code) - - retrieved_workflow = workbench_model.WorkflowDefinition.model_validate(http_response.json()) - - assert retrieved_workflow.label == "updated" - - def test_create_assistant_service_registration(workbench_service: FastAPI, test_user: MockUser) -> None: with TestClient(app=workbench_service, headers=test_user.authorization_headers) as client: new_assistant_service = workbench_model.NewAssistantServiceRegistration(