diff --git a/assistants/prospector-assistant/assistant/agents/document_agent.py b/assistants/prospector-assistant/assistant/agents/document_agent.py index c8e46093..2dce9f90 100644 --- a/assistants/prospector-assistant/assistant/agents/document_agent.py +++ b/assistants/prospector-assistant/assistant/agents/document_agent.py @@ -1,4 +1,5 @@ import logging +from os import path from typing import Any, Callable import deepmerge @@ -15,9 +16,7 @@ MessageType, NewConversationMessage, ) -from semantic_workbench_assistant.assistant_app import ( - ConversationContext, -) +from semantic_workbench_assistant.assistant_app import ConversationContext, storage_directory_for_context from ..config import AssistantConfigModel from .document.config import GuidedConversationAgentConfigModel @@ -81,23 +80,6 @@ async def receive_command( if not command_found: logger.warning(f"Could not find command {message.command_name}") - @classmethod - def set_mode_draft_outline( - cls, - config: AssistantConfigModel, - context: ConversationContext, - message: ConversationMessage, - metadata: dict[str, Any] = {}, - ) -> None: - # Pre-requisites - if cls.state.mode.function: - logger.error(f"Document Agent in mode: {cls.state.mode.function.__name__}. Cannot change modes.") - return - - # Run - cls.state.mode.function = cls._mode_draft_outline - cls.state.mode.is_completed = False - async def respond_to_conversation( self, config: AssistantConfigModel, @@ -119,12 +101,36 @@ async def respond_to_conversation( match self.state.mode.function: case self._mode_draft_outline: logger.info(f"Document Agent in mode: {mode.function.__name__}") - await mode.function() + await mode.function(self, config, context, message, metadata) case _: logger.error("Document Agent failed to find a corresponding mode.") @classmethod - async def _mode_draft_outline(cls) -> None: + def set_mode_draft_outline( + cls, + config: AssistantConfigModel, + context: ConversationContext, + message: ConversationMessage, + metadata: dict[str, Any] = {}, + ) -> None: + # Pre-requisites + if cls.state.mode.function: + logger.error(f"Document Agent in mode: {cls.state.mode.function.__name__}. Cannot change modes.") + return + + # Run + cls.state.mode.function = cls._mode_draft_outline + cls.state.mode.is_completed = False + + @classmethod + async def _mode_draft_outline( + cls, + instance, + config: AssistantConfigModel, + context: ConversationContext, + message: ConversationMessage, + metadata: dict[str, Any] = {}, + ) -> None: # Pre-requisites if cls.state.mode.function != cls._mode_draft_outline or cls.state.mode.is_completed: logger.error( @@ -162,168 +168,198 @@ async def _mode_draft_outline(cls) -> None: # Call step logger.info(f"Document Agent running mode.step: {step.function.__name__}") - step.is_completed = await step.function() + step.is_completed = await step.function(instance, config, context, message, metadata) logger.info("Document Agent mode.step status: %s", "completed" if step.is_completed else "not completed") ### - # step functions for e2e_draft_outline routine. + # step functions for _mode_draft_outline. ### @classmethod - async def _gc_attachment_check(cls) -> bool: - # pretend completed - return True + async def _gc_attachment_check( + cls, + instance, + config: AssistantConfigModel, + context: ConversationContext, + message: ConversationMessage, + metadata: dict[str, Any] = {}, + ) -> bool: + is_completed = await cls._gc_respond_to_conversation(config, gc_config, message, context, metadata) + return is_completed @classmethod - async def _draft_outline(cls) -> bool: - # pretend completed - return True + async def _draft_outline( + cls, + instance, + config: AssistantConfigModel, + context: ConversationContext, + message: ConversationMessage, + metadata: dict[str, Any] = {}, + ) -> bool: + is_completed = await cls.draft_outline(instance, config, context, message, metadata) + return is_completed @classmethod - async def _gc_get_outline_feedback(cls) -> bool: + async def _gc_get_outline_feedback( + cls, + instance, + config: AssistantConfigModel, + context: ConversationContext, + message: ConversationMessage, + metadata: dict[str, Any] = {}, + ) -> bool: # pretend completed return True @classmethod - async def _final_outline(cls) -> bool: + async def _final_outline( + cls, + instance, + config: AssistantConfigModel, + context: ConversationContext, + message: ConversationMessage, + metadata: dict[str, Any] = {}, + ) -> bool: # pretend completed return True + #### + + @classmethod async def _gc_respond_to_conversation( - self, + cls, config: AssistantConfigModel, gc_config: GuidedConversationAgentConfigModel, + message: ConversationMessage, context: ConversationContext, metadata: dict[str, Any] = {}, - ) -> None: + ) -> bool: method_metadata_key = "document_agent_gc_response" is_conversation_over = False - last_user_message = None - while not is_conversation_over: - try: - response_message, is_conversation_over = await GuidedConversationAgent.step_conversation( - config=config, - openai_client=openai_client.create_client(config.service_config), - agent_config=gc_config, - conversation_context=context, - last_user_message=last_user_message, - ) - if response_message is None: - # need to double check this^^ None logic, when it would occur in GC. Make "" for now. - agent_message = "" - else: - agent_message = response_message - - if not is_conversation_over: - # add the completion to the metadata for debugging - deepmerge.always_merger.merge( - metadata, - { - "debug": { - f"{method_metadata_key}": {"response": agent_message}, - } - }, - ) - else: - break - - except Exception as e: - logger.exception(f"exception occurred processing guided conversation: {e}") - agent_message = "An error occurred while processing the guided conversation." + try: + response_message, is_conversation_over = await GuidedConversationAgent.step_conversation( + config=config, + openai_client=openai_client.create_client(config.service_config), + agent_config=gc_config, + conversation_context=context, + last_user_message=message.content, + ) + if response_message is None: + # need to double check this^^ None logic, when it would occur in GC. Make "" for now. + agent_message = "" + else: + agent_message = response_message + + if not is_conversation_over: + # add the completion to the metadata for debugging deepmerge.always_merger.merge( metadata, { "debug": { - f"{method_metadata_key}": { - "error": str(e), - }, + f"{method_metadata_key}": {"response": agent_message}, } }, ) - # send the response to the conversation - await context.send_messages( - NewConversationMessage( - content=agent_message, - message_type=MessageType.chat, - metadata=metadata, + # send the response to the conversation + await context.send_messages( + NewConversationMessage( + content=agent_message, + message_type=MessageType.chat, + metadata=metadata, + ) ) + + except Exception as e: + logger.exception(f"exception occurred processing guided conversation: {e}") + agent_message = "An error occurred while processing the guided conversation." + deepmerge.always_merger.merge( + metadata, + { + "debug": { + f"{method_metadata_key}": { + "error": str(e), + }, + } + }, ) - # async def draft_outline( - # self, - # config: AssistantConfigModel, - # context: ConversationContext, - # message: ConversationMessage, - # metadata: dict[str, Any] = {}, - # ) -> tuple[str, dict[str, Any]]: - # method_metadata_key = "draft_outline" + return is_conversation_over + @classmethod + async def draft_outline( + cls, + instance, + config: AssistantConfigModel, + context: ConversationContext, + message: ConversationMessage, + metadata: dict[str, Any] = {}, + ) -> bool: + method_metadata_key = "draft_outline" + + # get conversation related info + conversation = await context.get_messages(before=message.id) + if message.message_type == MessageType.chat: + conversation.messages.append(message) + participants_list = await context.get_participants(include_inactive=True) + + # get attachments related info + attachment_messages = await instance.attachments_extension.get_completion_messages_for_attachments( + context, config=config.agents_config.attachment_agent + ) + + # get outline related info + outline: str | None = None + if path.exists(storage_directory_for_context(context) / "outline.txt"): + outline = (storage_directory_for_context(context) / "outline.txt").read_text() + + # create chat completion messages + chat_completion_messages: list[ChatCompletionMessageParam] = [] + chat_completion_messages.append(_main_system_message()) + chat_completion_messages.append( + _chat_history_system_message(conversation.messages, participants_list.participants) + ) + chat_completion_messages.extend(attachment_messages) + if outline is not None: + chat_completion_messages.append(_outline_system_message(outline)) + + # make completion call to openai + async with openai_client.create_client(config.service_config) as client: + try: + completion_args = { + "messages": chat_completion_messages, + "model": config.request_config.openai_model, + "response_format": {"type": "text"}, + } + completion = await client.chat.completions.create(**completion_args) + content = completion.choices[0].message.content + _on_success_metadata_update(metadata, method_metadata_key, config, chat_completion_messages, completion) -# -# # get conversation related info -# conversation = await context.get_messages(before=message.id) -# if message.message_type == MessageType.chat: -# conversation.messages.append(message) -# participants_list = await context.get_participants(include_inactive=True) -# -# # get attachments related info -# attachment_messages = await self.attachments_extension.get_completion_messages_for_attachments( -# context, config=config.agents_config.attachment_agent -# ) -# -# # get outline related info -# outline: str | None = None -# if path.exists(storage_directory_for_context(context) / "outline.txt"): -# outline = (storage_directory_for_context(context) / "outline.txt").read_text() -# -# # create chat completion messages -# chat_completion_messages: list[ChatCompletionMessageParam] = [] -# chat_completion_messages.append(_main_system_message()) -# chat_completion_messages.append( -# _chat_history_system_message(conversation.messages, participants_list.participants) -# ) -# chat_completion_messages.extend(attachment_messages) -# if outline is not None: -# chat_completion_messages.append(_outline_system_message(outline)) -# -# # make completion call to openai -# async with openai_client.create_client(config.service_config) as client: -# try: -# completion_args = { -# "messages": chat_completion_messages, -# "model": config.request_config.openai_model, -# "response_format": {"type": "text"}, -# } -# completion = await client.chat.completions.create(**completion_args) -# content = completion.choices[0].message.content -# _on_success_metadata_update(metadata, method_metadata_key, config, chat_completion_messages, completion) -# -# except Exception as e: -# logger.exception(f"exception occurred calling openai chat completion: {e}") -# content = ( -# "An error occurred while calling the OpenAI API. Is it configured correctly?" -# "View the debug inspector for more information." -# ) -# _on_error_metadata_update(metadata, method_metadata_key, config, chat_completion_messages, e) -# -# # store only latest version for now (will keep all versions later as need arises) -# (storage_directory_for_context(context) / "outline.txt").write_text(content) -# -# # send the response to the conversation only if from a command. Otherwise return info to caller. -# message_type = MessageType.chat -# if message.message_type == MessageType.command: -# message_type = MessageType.command -# -# await context.send_messages( -# NewConversationMessage( -# content=content, -# message_type=message_type, -# metadata=metadata, -# ) -# ) -# -# return content, metadata + except Exception as e: + logger.exception(f"exception occurred calling openai chat completion: {e}") + content = ( + "An error occurred while calling the OpenAI API. Is it configured correctly?" + "View the debug inspector for more information." + ) + _on_error_metadata_update(metadata, method_metadata_key, config, chat_completion_messages, e) + + # store only latest version for now (will keep all versions later as need arises) + (storage_directory_for_context(context) / "outline.txt").write_text(content) + + # send the response to the conversation only if from a command. Otherwise return info to caller. + message_type = MessageType.chat + if message.message_type == MessageType.command: + message_type = MessageType.command + + await context.send_messages( + NewConversationMessage( + content=content, + message_type=message_type, + metadata=metadata, + ) + ) + + return True # endregion