diff --git a/assistants/prospector-assistant/assistant/agents/document/config.py b/assistants/prospector-assistant/assistant/agents/document/config.py index ccee9faf..e952633b 100644 --- a/assistants/prospector-assistant/assistant/agents/document/config.py +++ b/assistants/prospector-assistant/assistant/agents/document/config.py @@ -1,59 +1,9 @@ -import json -from typing import Annotated, Any, Dict, List, Type +from typing import Annotated from guided_conversation.utils.resources import ResourceConstraint, ResourceConstraintMode, ResourceConstraintUnit -from pydantic import BaseModel, Field, create_model +from pydantic import BaseModel, Field from semantic_workbench_assistant.config import UISchema -from ... import helpers -from . import config_defaults as config_defaults - -# -# region Helpers -# - -# take a full json schema and return a pydantic model, including support for -# nested objects and typed arrays - - -def json_type_to_python_type(json_type: str) -> Type: - # Mapping JSON types to Python types - type_mapping = {"integer": int, "string": str, "number": float, "boolean": bool, "object": dict, "array": list} - return type_mapping.get(json_type, Any) - - -def create_pydantic_model_from_json_schema(schema: Dict[str, Any], model_name="DynamicModel") -> Type[BaseModel]: - # Nested function to parse properties from the schema - def parse_properties(properties: Dict[str, Any]) -> Dict[str, Any]: - fields = {} - for prop_name, prop_attrs in properties.items(): - prop_type = prop_attrs.get("type") - description = prop_attrs.get("description", None) - - if prop_type == "object": - nested_model = create_pydantic_model_from_json_schema(prop_attrs, model_name=prop_name.capitalize()) - fields[prop_name] = (nested_model, Field(..., description=description)) - elif prop_type == "array": - items = prop_attrs.get("items", {}) - if items.get("type") == "object": - nested_model = create_pydantic_model_from_json_schema(items) - fields[prop_name] = (List[nested_model], Field(..., description=description)) - else: - nested_type = json_type_to_python_type(items.get("type")) - fields[prop_name] = (List[nested_type], Field(..., description=description)) - else: - python_type = json_type_to_python_type(prop_type) - fields[prop_name] = (python_type, Field(..., description=description)) - return fields - - properties = schema.get("properties", {}) - fields = parse_properties(properties) - return create_model(model_name, **fields) - - -# endregion - - # # region Models # @@ -69,7 +19,7 @@ class ResourceConstraintConfigModel(ResourceConstraint): ' "maximum", the agents will try to pace the conversation to use at most the resource quantity.' ), ), - ] = config_defaults.resource_constraint.mode + ] unit: Annotated[ ResourceConstraintUnit, @@ -77,7 +27,7 @@ class ResourceConstraintConfigModel(ResourceConstraint): title="Resource Unit", description="The unit for the resource constraint.", ), - ] = config_defaults.resource_constraint.unit + ] quantity: Annotated[ float, @@ -85,30 +35,15 @@ class ResourceConstraintConfigModel(ResourceConstraint): title="Resource Quantity", description="The quantity for the resource constraint. If <=0, the resource constraint is disabled.", ), - ] = config_defaults.resource_constraint.quantity + ] class GuidedConversationConfigModel(BaseModel): - enabled: Annotated[ - bool, - Field(description=helpers.load_text_include("guided_conversation_agent_enabled.md")), - UISchema(enable_markdown_in_description=True), - ] = False - - artifact: Annotated[ - str, - Field( - title="Artifact", - description="The artifact that the agent will manage.", - ), - UISchema(widget="baseModelEditor"), - ] = json.dumps(config_defaults.ArtifactModel.model_json_schema(), indent=2) - rules: Annotated[ list[str], Field(title="Rules", description="Do's and don'ts that the agent should attempt to follow"), UISchema(schema={"items": {"ui:widget": "textarea", "ui:options": {"rows": 2}}}), - ] = config_defaults.rules + ] conversation_flow: Annotated[ str, @@ -117,7 +52,7 @@ class GuidedConversationConfigModel(BaseModel): description="A loose natural language description of the steps of the conversation", ), UISchema(widget="textarea", schema={"ui:options": {"rows": 10}}, placeholder="[optional]"), - ] = config_defaults.conversation_flow.strip() + ] context: Annotated[ str, @@ -126,7 +61,7 @@ class GuidedConversationConfigModel(BaseModel): description="General background context for the conversation.", ), UISchema(widget="textarea", placeholder="[optional]"), - ] = config_defaults.context.strip() + ] resource_constraint: Annotated[ ResourceConstraintConfigModel, @@ -134,11 +69,7 @@ class GuidedConversationConfigModel(BaseModel): title="Resource Constraint", ), UISchema(schema={"quantity": {"ui:widget": "updown"}}), - ] = ResourceConstraintConfigModel() - - def get_artifact_model(self) -> Type[BaseModel]: - schema = json.loads(self.artifact) - return create_pydantic_model_from_json_schema(schema) + ] # endregion diff --git a/assistants/prospector-assistant/assistant/agents/document/config_defaults.py b/assistants/prospector-assistant/assistant/agents/document/config_defaults.py deleted file mode 100644 index 18c41541..00000000 --- a/assistants/prospector-assistant/assistant/agents/document/config_defaults.py +++ /dev/null @@ -1,62 +0,0 @@ -from guided_conversation.utils.resources import ResourceConstraint, ResourceConstraintMode, ResourceConstraintUnit -from pydantic import BaseModel, Field - - -# Artifact - The artifact is like a form that the agent must complete throughout the conversation. -# It can also be thought of as a working memory for the agent. -# We allow any valid Pydantic BaseModel class to be used. -class ArtifactModel(BaseModel): - student_poem: str = Field(description="The acrostic poem written by the student.") - initial_feedback: str = Field(description="Feedback on the student's final revised poem.") - final_feedback: str = Field(description="Feedback on how the student was able to improve their poem.") - inappropriate_behavior: list[str] = Field( - description="""List any inappropriate behavior the student attempted while chatting with you. \ -It is ok to leave this field Unanswered if there was none.""" - ) - - -# Rules - These are the do's and don'ts that the agent should follow during the conversation. -rules = [ - "DO NOT write the poem for the student.", - "Terminate the conversation immediately if the students asks for harmful or inappropriate content.", -] - -# Conversation Flow (optional) - This defines in natural language the steps of the conversation. -conversation_flow = """1. Start by explaining interactively what an acrostic poem is. -2. Then give the following instructions for how to go ahead and write one: - 1. Choose a word or phrase that will be the subject of your acrostic poem. - 2. Write the letters of your chosen word or phrase vertically down the page. - 3. Think of a word or phrase that starts with each letter of your chosen word or phrase. - 4. Write these words or phrases next to the corresponding letters to create your acrostic poem. -3. Then give the following example of a poem where the word or phrase is HAPPY: - Having fun with friends all day, - Awesome games that we all play. - Pizza parties on the weekend, - Puppies we bend down to tend, - Yelling yay when we win the game -4. Finally have the student write their own acrostic poem using the word or phrase of their choice. Encourage them to be creative and have fun with it. -After they write it, you should review it and give them feedback on what they did well and what they could improve on. -Have them revise their poem based on your feedback and then review it again. -""" - -# Context (optional) - This is any additional information or the circumstances the agent is in that it should be aware of. -# It can also include the high level goal of the conversation if needed. -context = """You are working 1 on 1 a 4th grade student who is chatting with you in the computer lab at school while being supervised by their teacher.""" - - -# Resource Constraints (optional) - This defines the constraints on the conversation such as time or turns. -# It can also help with pacing the conversation, -# For example, here we have set an exact time limit of 10 turns which the agent will try to fill. -resource_constraint = ResourceConstraint( - quantity=3, - unit=ResourceConstraintUnit.TURNS, - mode=ResourceConstraintMode.MAXIMUM, -) - -__all__ = [ - "ArtifactModel", - "rules", - "conversation_flow", - "context", - "resource_constraint", -] diff --git a/assistants/prospector-assistant/assistant/agents/document/gc_draft_content_feedback_config.py b/assistants/prospector-assistant/assistant/agents/document/gc_draft_content_feedback_config.py index e07bcaed..b8f70527 100644 --- a/assistants/prospector-assistant/assistant/agents/document/gc_draft_content_feedback_config.py +++ b/assistants/prospector-assistant/assistant/agents/document/gc_draft_content_feedback_config.py @@ -1,12 +1,6 @@ -import json -from typing import Annotated, Any, Dict, List, Type +from guided_conversation.utils.resources import ResourceConstraintMode, ResourceConstraintUnit +from pydantic import BaseModel, Field -from guided_conversation.utils.resources import ResourceConstraint, ResourceConstraintMode, ResourceConstraintUnit -from pydantic import BaseModel, Field, create_model -from semantic_workbench_assistant.config import UISchema - -from ... import helpers -from . import config_defaults as config_defaults from .config import GuidedConversationConfigModel, ResourceConstraintConfigModel from .guided_conversation import GC_ConversationStatus, GC_UserDecision @@ -89,123 +83,13 @@ class ArtifactModel(BaseModel): they might want or answer questions about it. This may be the first time the user is asking for you help (conversation_status is user_initiated), or the nth time (conversation_status is user_returned).""" - -# Resource Constraints (optional) - This defines the constraints on the conversation such as time or turns. -# It can also help with pacing the conversation, -# For example, here we have set an exact time limit of 10 turns which the agent will try to fill. -resource_constraint = ResourceConstraint( - quantity=5, - unit=ResourceConstraintUnit.TURNS, - mode=ResourceConstraintMode.MAXIMUM, +config = GuidedConversationConfigModel( + rules=rules, + conversation_flow=conversation_flow.strip(), + context=context.strip(), + resource_constraint=ResourceConstraintConfigModel( + unit=ResourceConstraintUnit.TURNS, + mode=ResourceConstraintMode.MAXIMUM, + quantity=5, + ), ) - - -# -# region Helpers -# - -# take a full json schema and return a pydantic model, including support for -# nested objects and typed arrays - - -def json_type_to_python_type(json_type: str) -> Type: - # Mapping JSON types to Python types - type_mapping = {"integer": int, "string": str, "number": float, "boolean": bool, "object": dict, "array": list} - return type_mapping.get(json_type, Any) - - -def create_pydantic_model_from_json_schema(schema: Dict[str, Any], model_name="DynamicModel") -> Type[BaseModel]: - # Nested function to parse properties from the schema - def parse_properties(properties: Dict[str, Any]) -> Dict[str, Any]: - fields = {} - for prop_name, prop_attrs in properties.items(): - prop_type = prop_attrs.get("type") - description = prop_attrs.get("description", None) - - if prop_type == "object": - nested_model = create_pydantic_model_from_json_schema(prop_attrs, model_name=prop_name.capitalize()) - fields[prop_name] = (nested_model, Field(..., description=description)) - elif prop_type == "array": - items = prop_attrs.get("items", {}) - if items.get("type") == "object": - nested_model = create_pydantic_model_from_json_schema(items) - fields[prop_name] = (List[nested_model], Field(..., description=description)) - else: - nested_type = json_type_to_python_type(items.get("type")) - fields[prop_name] = (List[nested_type], Field(..., description=description)) - else: - python_type = json_type_to_python_type(prop_type) - fields[prop_name] = (python_type, Field(..., description=description)) - return fields - - properties = schema.get("properties", {}) - fields = parse_properties(properties) - return create_model(model_name, **fields) - - -# endregion - - -# -# region Models -# - - -class GCDraftContentFeedbackConfigModel(GuidedConversationConfigModel): - enabled: Annotated[ - bool, - Field(description=helpers.load_text_include("guided_conversation_agent_enabled.md")), - UISchema(enable_markdown_in_description=True), - ] = False - - artifact: Annotated[ - str, - Field( - title="Artifact", - description="The artifact that the agent will manage.", - ), - UISchema(widget="baseModelEditor"), - ] = json.dumps(ArtifactModel.model_json_schema(), indent=2) - - rules: Annotated[ - list[str], - Field(title="Rules", description="Do's and don'ts that the agent should attempt to follow"), - UISchema(schema={"items": {"ui:widget": "textarea", "ui:options": {"rows": 2}}}), - ] = rules - - conversation_flow: Annotated[ - str, - Field( - title="Conversation Flow", - description="A loose natural language description of the steps of the conversation", - ), - UISchema(widget="textarea", schema={"ui:options": {"rows": 10}}, placeholder="[optional]"), - ] = conversation_flow.strip() - - context: Annotated[ - str, - Field( - title="Context", - description="General background context for the conversation.", - ), - UISchema(widget="textarea", placeholder="[optional]"), - ] = context.strip() - - resource_constraint: Annotated[ - ResourceConstraintConfigModel, - Field( - title="Resource Constraint", - ), - UISchema(schema={"quantity": {"ui:widget": "updown"}}), - ] = ResourceConstraintConfigModel( - unit=resource_constraint.unit, - quantity=resource_constraint.quantity, - mode=resource_constraint.mode, - ) - - def get_artifact_model(self) -> Type[BaseModel]: - schema = json.loads(self.artifact) - return create_pydantic_model_from_json_schema(schema) - - -# endregion diff --git a/assistants/prospector-assistant/assistant/agents/document/gc_draft_outline_feedback_config.py b/assistants/prospector-assistant/assistant/agents/document/gc_draft_outline_feedback_config.py index 438955af..2d34d39f 100644 --- a/assistants/prospector-assistant/assistant/agents/document/gc_draft_outline_feedback_config.py +++ b/assistants/prospector-assistant/assistant/agents/document/gc_draft_outline_feedback_config.py @@ -1,12 +1,6 @@ -import json -from typing import Annotated, Any, Dict, List, Type +from guided_conversation.utils.resources import ResourceConstraintMode, ResourceConstraintUnit +from pydantic import BaseModel, Field -from guided_conversation.utils.resources import ResourceConstraint, ResourceConstraintMode, ResourceConstraintUnit -from pydantic import BaseModel, Field, create_model -from semantic_workbench_assistant.config import UISchema - -from ... import helpers -from . import config_defaults as config_defaults from .config import GuidedConversationConfigModel, ResourceConstraintConfigModel from .guided_conversation import GC_ConversationStatus, GC_UserDecision @@ -85,123 +79,13 @@ class ArtifactModel(BaseModel): This may be the first time the user is asking for you help (conversation_status is user_initiated), or the nth time (conversation_status is user_returned).""" - -# Resource Constraints (optional) - This defines the constraints on the conversation such as time or turns. -# It can also help with pacing the conversation, -# For example, here we have set an exact time limit of 10 turns which the agent will try to fill. -resource_constraint = ResourceConstraint( - quantity=5, - unit=ResourceConstraintUnit.TURNS, - mode=ResourceConstraintMode.MAXIMUM, +config = GuidedConversationConfigModel( + rules=rules, + conversation_flow=conversation_flow.strip(), + context=context.strip(), + resource_constraint=ResourceConstraintConfigModel( + unit=ResourceConstraintUnit.TURNS, + mode=ResourceConstraintMode.MAXIMUM, + quantity=5, + ), ) - - -# -# region Helpers -# - -# take a full json schema and return a pydantic model, including support for -# nested objects and typed arrays - - -def json_type_to_python_type(json_type: str) -> Type: - # Mapping JSON types to Python types - type_mapping = {"integer": int, "string": str, "number": float, "boolean": bool, "object": dict, "array": list} - return type_mapping.get(json_type, Any) - - -def create_pydantic_model_from_json_schema(schema: Dict[str, Any], model_name="DynamicModel") -> Type[BaseModel]: - # Nested function to parse properties from the schema - def parse_properties(properties: Dict[str, Any]) -> Dict[str, Any]: - fields = {} - for prop_name, prop_attrs in properties.items(): - prop_type = prop_attrs.get("type") - description = prop_attrs.get("description", None) - - if prop_type == "object": - nested_model = create_pydantic_model_from_json_schema(prop_attrs, model_name=prop_name.capitalize()) - fields[prop_name] = (nested_model, Field(..., description=description)) - elif prop_type == "array": - items = prop_attrs.get("items", {}) - if items.get("type") == "object": - nested_model = create_pydantic_model_from_json_schema(items) - fields[prop_name] = (List[nested_model], Field(..., description=description)) - else: - nested_type = json_type_to_python_type(items.get("type")) - fields[prop_name] = (List[nested_type], Field(..., description=description)) - else: - python_type = json_type_to_python_type(prop_type) - fields[prop_name] = (python_type, Field(..., description=description)) - return fields - - properties = schema.get("properties", {}) - fields = parse_properties(properties) - return create_model(model_name, **fields) - - -# endregion - - -# -# region Models -# - - -class GCDraftOutlineFeedbackConfigModel(GuidedConversationConfigModel): - enabled: Annotated[ - bool, - Field(description=helpers.load_text_include("guided_conversation_agent_enabled.md")), - UISchema(enable_markdown_in_description=True), - ] = False - - artifact: Annotated[ - str, - Field( - title="Artifact", - description="The artifact that the agent will manage.", - ), - UISchema(widget="baseModelEditor"), - ] = json.dumps(ArtifactModel.model_json_schema(), indent=2) - - rules: Annotated[ - list[str], - Field(title="Rules", description="Do's and don'ts that the agent should attempt to follow"), - UISchema(schema={"items": {"ui:widget": "textarea", "ui:options": {"rows": 2}}}), - ] = rules - - conversation_flow: Annotated[ - str, - Field( - title="Conversation Flow", - description="A loose natural language description of the steps of the conversation", - ), - UISchema(widget="textarea", schema={"ui:options": {"rows": 10}}, placeholder="[optional]"), - ] = conversation_flow.strip() - - context: Annotated[ - str, - Field( - title="Context", - description="General background context for the conversation.", - ), - UISchema(widget="textarea", placeholder="[optional]"), - ] = context.strip() - - resource_constraint: Annotated[ - ResourceConstraintConfigModel, - Field( - title="Resource Constraint", - ), - UISchema(schema={"quantity": {"ui:widget": "updown"}}), - ] = ResourceConstraintConfigModel( - unit=resource_constraint.unit, - quantity=resource_constraint.quantity, - mode=resource_constraint.mode, - ) - - def get_artifact_model(self) -> Type[BaseModel]: - schema = json.loads(self.artifact) - return create_pydantic_model_from_json_schema(schema) - - -# endregion diff --git a/assistants/prospector-assistant/assistant/agents/document/guided_conversation.py b/assistants/prospector-assistant/assistant/agents/document/guided_conversation.py index 20a4a040..2ecc077f 100644 --- a/assistants/prospector-assistant/assistant/agents/document/guided_conversation.py +++ b/assistants/prospector-assistant/assistant/agents/document/guided_conversation.py @@ -5,6 +5,7 @@ from guided_conversation.guided_conversation_agent import GuidedConversation as GuidedConversationAgent from openai import AsyncOpenAI +from pydantic import BaseModel from semantic_kernel import Kernel from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion from semantic_workbench_assistant.assistant_app import ( @@ -47,7 +48,9 @@ def __init__( config: AssistantConfigModel, openai_client: AsyncOpenAI, agent_config: GuidedConversationConfigModel, + artifact_model: type[BaseModel], conversation_context: ConversationContext, + artifact_updates: dict = {}, ) -> None: self.guided_conversation_agent: GuidedConversationAgent self.conversation_context: ConversationContext = conversation_context @@ -62,73 +65,39 @@ def __init__( ) self.kernel.add_service(chat_service) - self.artifact_model = agent_config.get_artifact_model() + self.artifact_model = artifact_model self.conversation_flow = agent_config.conversation_flow self.context = agent_config.context self.rules = agent_config.rules self.resource_constraint = agent_config.resource_constraint state = _read_guided_conversation_state(conversation_context) - if state: - self.guided_conversation_agent = GuidedConversationAgent.from_json( - json_data=state, - kernel=self.kernel, - artifact=self.artifact_model, # type: ignore - conversation_flow=self.conversation_flow, - context=self.context, - rules=self.rules, - resource_constraint=self.resource_constraint, - service_id=self.service_id, - ) - else: + if not state: self.guided_conversation_agent = GuidedConversationAgent( kernel=self.kernel, - artifact=self.artifact_model, # type: ignore + artifact=self.artifact_model, conversation_flow=self.conversation_flow, context=self.context, rules=self.rules, resource_constraint=self.resource_constraint, service_id=self.service_id, ) - _write_guided_conversation_state(conversation_context, self.guided_conversation_agent.to_json()) - - @staticmethod - def get_state( - conversation_context: ConversationContext, - ) -> dict | None: - """ - Get the state of the guided conversation agent. - """ - return _read_guided_conversation_state(conversation_context) - - def get_artifact_dict(self) -> dict | None: - artifact_dict = None - state_dict = self.get_state(self.conversation_context) - if state_dict is not None: - artifact_item = state_dict.get("artifact") - if artifact_item is not None: - artifact_dict = artifact_item.get("artifact") - return artifact_dict - - def set_artifact_dict(self, artifact_dict: dict) -> None: - state_dict = self.get_state(self.conversation_context) - if state_dict is not None: - artifact_item = state_dict.get("artifact") - if artifact_item is not None: - artifact_item["artifact"] = artifact_dict - # Update storage with new state info - _write_guided_conversation_state(self.conversation_context, state_dict) - # Update GC with new state info - self.guided_conversation_agent = GuidedConversationAgent.from_json( - json_data=state_dict, - kernel=self.kernel, - artifact=self.artifact_model, # type: ignore - conversation_flow=self.conversation_flow, - context=self.context, - rules=self.rules, - resource_constraint=self.resource_constraint, - service_id=self.service_id, - ) + state = self.guided_conversation_agent.to_json() + + if artifact_updates: + state["artifact"]["artifact"].update(artifact_updates) + + self.guided_conversation_agent = GuidedConversationAgent.from_json( + json_data=state, + kernel=self.kernel, + artifact=self.artifact_model, + conversation_flow=self.conversation_flow, + context=self.context, + rules=self.rules, + resource_constraint=self.resource_constraint, + service_id=self.service_id, + ) + return async def step_conversation( self, @@ -168,7 +137,7 @@ async def step_conversation( if conversation_status_str is not None: match conversation_status_str: case GC_ConversationStatus.USER_COMPLETED: - response = final_response + response = final_response or result.ai_message or "" gc_conversation_status = GC_ConversationStatus.USER_COMPLETED match user_decision_str: case GC_UserDecision.UPDATE_OUTLINE: @@ -206,13 +175,13 @@ async def step_conversation( # -def _get_guided_conversation_storage_path(context: ConversationContext, filename: str | None = None) -> Path: +def _get_guided_conversation_storage_path(context: ConversationContext) -> Path: """ Get the path to the directory for storing guided conversation files. """ path = storage_directory_for_context(context) / "guided-conversation" - if filename: - path /= filename + if not path.exists(): + path.mkdir(parents=True) return path @@ -220,19 +189,15 @@ def _write_guided_conversation_state(context: ConversationContext, state: dict) """ Write the state of the guided conversation agent to a file. """ - json_data = json.dumps(state) - path = _get_guided_conversation_storage_path(context) - if not path.exists(): - path.mkdir(parents=True) - path = path / "state.json" - path.write_text(json_data) + path = _get_guided_conversation_storage_path(context) / "state.json" + path.write_text(json.dumps(state)) def _read_guided_conversation_state(context: ConversationContext) -> dict | None: """ Read the state of the guided conversation agent from a file. """ - path = _get_guided_conversation_storage_path(context, "state.json") + path = _get_guided_conversation_storage_path(context) / "state.json" if path.exists(): try: json_data = path.read_text() @@ -246,7 +211,7 @@ def _delete_guided_conversation_state(context: ConversationContext) -> None: """ Delete the file containing state of the guided conversation agent. """ - path = _get_guided_conversation_storage_path(context, "state.json") + path = _get_guided_conversation_storage_path(context) / "state.json" if path.exists(): path.unlink() diff --git a/assistants/prospector-assistant/assistant/agents/document/state.py b/assistants/prospector-assistant/assistant/agents/document/state.py index bd298cd3..16bc717e 100644 --- a/assistants/prospector-assistant/assistant/agents/document/state.py +++ b/assistants/prospector-assistant/assistant/agents/document/state.py @@ -1,18 +1,19 @@ -import json import logging +from abc import abstractmethod from enum import StrEnum from os import path from pathlib import Path -from typing import Any, Callable +from typing import Any, Protocol import deepmerge import openai_client +from assistant.agents.document import gc_draft_content_feedback_config, gc_draft_outline_feedback_config from assistant_extensions.attachments import AttachmentsExtension from openai.types.chat import ( ChatCompletionMessageParam, ChatCompletionSystemMessageParam, ) -from pydantic import BaseModel, Field +from pydantic import BaseModel from semantic_workbench_api_model.workbench_model import ( ConversationMessage, ConversationParticipant, @@ -22,9 +23,6 @@ from semantic_workbench_assistant.assistant_app import ConversationContext, storage_directory_for_context from ...config import AssistantConfigModel -from .config import GuidedConversationConfigModel -from .gc_draft_content_feedback_config import GCDraftContentFeedbackConfigModel -from .gc_draft_outline_feedback_config import GCDraftOutlineFeedbackConfigModel from .guided_conversation import GC_ConversationStatus, GC_UserDecision, GuidedConversation logger = logging.getLogger(__name__) @@ -35,7 +33,6 @@ class StepName(StrEnum): - UNDEFINED = "undefined" DRAFT_OUTLINE = "step_draft_outline" GC_GET_OUTLINE_FEEDBACK = "step_gc_get_outline_feedback" DRAFT_CONTENT = "step_draft_content" @@ -44,128 +41,44 @@ class StepName(StrEnum): class StepStatus(StrEnum): - UNDEFINED = "undefined" - INITIATED = "initiated" NOT_COMPLETED = "not_completed" USER_COMPLETED = "user_completed" USER_EXIT_EARLY = "user_exit_early" -class StepData(BaseModel): - step_name: StepName = StepName.UNDEFINED - run_count: int = 0 - - -class Step(BaseModel): - name: StepName = StepName.UNDEFINED - status: StepStatus = StepStatus.UNDEFINED - gc_conversation_status: GC_ConversationStatus = GC_ConversationStatus.UNDEFINED - gc_user_decision: GC_UserDecision = GC_UserDecision.UNDEFINED - execute: Callable | None = Field(default=None, exclude=True) - - def __init__(self, **data: Any) -> None: - super().__init__(**data) - self._error_check() - - name = data.get("name") - match name: - case StepName.DRAFT_OUTLINE: - self.execute = self._step_draft_outline - case StepName.GC_GET_OUTLINE_FEEDBACK: - self.execute = self._step_gc_get_outline_feedback - case StepName.DRAFT_CONTENT: - self.execute = self._step_draft_content - case StepName.GC_GET_CONTENT_FEEDBACK: - self.execute = self._step_gc_get_content_feedback - case StepName.FINISH: - self.execute = self._step_finish - case _: - print(f"{name} mode.") - - logger.info( - "Document Agent State: Step loaded. StepName: %s, StepStatus: %s", - self.get_name(), - self.get_status(), - ) - - def _error_check(self) -> None: - # name and status should either both be UNDEFINED or both be defined. Always. - if (self.name is StepName.UNDEFINED and self.status is not StepStatus.UNDEFINED) or ( - self.status is StepStatus.UNDEFINED and self.name is not StepName.UNDEFINED - ): - logger.error( - "Document Agent State: Either StepName or StepStatus is UNDEFINED, and the other is not. Both must be UNDEFINED at the same time. StepName: %s, StepStatus: %s", - self.name, - self.status, - ) - - # user decision should be set only if conversation status is completed. otherwise undefined. - if ( - self.gc_conversation_status is GC_ConversationStatus.UNDEFINED - and self.gc_user_decision is not GC_UserDecision.UNDEFINED - ) or ( - self.gc_user_decision is GC_UserDecision.UNDEFINED - and self.gc_conversation_status is GC_ConversationStatus.USER_COMPLETED - ): - logger.error( - "Document Agent State: Either GC conversation status is UNDEFINED, while GC user decision is not. Or GC user decision is UNDEFINED while GC conversation status is COMPLETED. GC conversation status: %s, GC user decision: %s", - self.gc_conversation_status, - self.gc_user_decision, - ) - - def reset(self) -> None: - # TODO: consider if this is the right way to reset a step, fix the # noqa: F841 - self = Step() # noqa: F841 - - def set_name(self, name: StepName) -> None: - if name is StepName.UNDEFINED: # need to reset step - self.reset() - if name is not self.name: # update if new StepName - self = Step(name=name, status=StepStatus.NOT_COMPLETED) - self._error_check() - - def get_name(self) -> StepName: - self._error_check() - return self.name - - def set_status(self, status: StepStatus) -> None: - if status is StepStatus.UNDEFINED: # need to reset mode - self.reset() - self.status = status - self._error_check() - - def get_status(self) -> StepStatus: - self._error_check() - return self.status - - def get_gc_user_decision(self) -> GC_UserDecision: - self._error_check() - return self.gc_user_decision +class StepProtocol(Protocol): + @abstractmethod + async def execute( + self, + run_count: int, + attachments_ext: AttachmentsExtension, + config: AssistantConfigModel, + context: ConversationContext, + message: ConversationMessage | None, + metadata: dict[str, Any] = {}, + ) -> tuple[StepStatus, GC_UserDecision]: ... - def get_gc_conversation_status(self) -> GC_ConversationStatus: - self._error_check() - return self.gc_conversation_status - async def _step_draft_outline( +class StepDraftOutline(StepProtocol): + async def execute( self, - step_data: StepData, + run_count: int, attachments_ext: AttachmentsExtension, config: AssistantConfigModel, context: ConversationContext, message: ConversationMessage | None, metadata: dict[str, Any] = {}, - ) -> StepStatus: - logger.info("Document Agent State: Step executing. StepName: %s", self.get_name()) + ) -> tuple[StepStatus, GC_UserDecision]: method_metadata_key = "_step_draft_outline" # get conversation related info -- for now, if no message, assuming no prior conversation - conversation = None - participants_list = None + participants_list = await context.get_participants(include_inactive=True) if message is not None: conversation = await context.get_messages(before=message.id) if message.message_type == MessageType.chat: conversation.messages.append(message) - participants_list = await context.get_participants(include_inactive=True) + else: + conversation = await context.get_messages() # get attachments related info attachment_messages = await attachments_ext.get_completion_messages_for_attachments( @@ -173,10 +86,7 @@ async def _step_draft_outline( ) # get outline related info - outline: str | None = None - # path = _get_document_agent_conversation_storage_path(context) - if path.exists(storage_directory_for_context(context) / "document_agent/outline.txt"): - outline = (storage_directory_for_context(context) / "document_agent/outline.txt").read_text() + outline = read_document_outline(context) # create chat completion messages chat_completion_messages: list[ChatCompletionMessageParam] = [] @@ -192,112 +102,104 @@ async def _step_draft_outline( # make completion call to openai async with openai_client.create_client(config.service_config) as client: try: - completion_args = { - "messages": chat_completion_messages, - "model": config.request_config.openai_model, - "response_format": {"type": "text"}, - } - completion = await client.chat.completions.create(**completion_args) - message_content = completion.choices[0].message.content + completion = await client.chat.completions.create( + messages=chat_completion_messages, + model=config.request_config.openai_model, + response_format={"type": "text"}, + ) + new_outline = completion.choices[0].message.content _on_success_metadata_update(metadata, method_metadata_key, config, chat_completion_messages, completion) except Exception as e: - logger.exception(f"Document Agent State: Exception occurred calling openai chat completion: {e}") - message_content = ( + logger.exception("Document Agent State: Exception occurred calling openai chat completion") + new_outline = ( "An error occurred while calling the OpenAI API. Is it configured correctly?" "View the debug inspector for more information." ) _on_error_metadata_update(metadata, method_metadata_key, config, chat_completion_messages, e) # store only latest version for now (will keep all versions later as need arises) - (storage_directory_for_context(context) / "document_agent/outline.txt").write_text(message_content) + if new_outline is not None: + write_document_outline(context, new_outline) - # send a command response to the conversation only if from a command. Otherwise return a normal chat message. - message_type = MessageType.chat - if message is not None and message.message_type == MessageType.command: - message_type = MessageType.command + # send a command response to the conversation only if from a command. Otherwise return a normal chat message. + message_type = MessageType.chat + if message is not None and message.message_type == MessageType.command: + message_type = MessageType.command - await context.send_messages( - NewConversationMessage( - content=message_content, - message_type=message_type, - metadata=metadata, + await context.send_messages( + NewConversationMessage( + content=new_outline, + message_type=message_type, + metadata=metadata, + ) ) - ) - logger.info("Document Agent State: Step executed. StepName: %s", self.get_name()) - return StepStatus.USER_COMPLETED + return StepStatus.USER_COMPLETED, GC_UserDecision.UNDEFINED - async def _step_gc_get_outline_feedback( + +class StepGetOutlineFeedback(StepProtocol): + async def execute( self, - step_data: StepData, + run_count: int, attachments_ext: AttachmentsExtension, config: AssistantConfigModel, context: ConversationContext, message: ConversationMessage | None, metadata: dict[str, Any] = {}, - ) -> StepStatus: - logger.info("Document Agent State: Step executing. StepName: %s", self.get_name()) + ) -> tuple[StepStatus, GC_UserDecision]: method_metadata_key = "_step_gc_get_outline_feedback" + # Update artifact + conversation_status_str = GC_ConversationStatus.USER_INITIATED + if run_count > 0: + conversation_status_str = GC_ConversationStatus.USER_RETURNED + + filenames = await attachments_ext.get_attachment_filenames(context) + filenames_str = ", ".join(filenames) + + outline_str = read_document_outline(context) or "" + artifact_updates = { + "conversation_status": conversation_status_str, + "filenames": filenames_str, + "current_outline": outline_str, + } + # Initiate Guided Conversation - gc_outline_feedback_config: GuidedConversationConfigModel = GCDraftOutlineFeedbackConfigModel() guided_conversation = GuidedConversation( config=config, openai_client=openai_client.create_client(config.service_config), - agent_config=gc_outline_feedback_config, + agent_config=gc_draft_outline_feedback_config.config, + artifact_model=gc_draft_outline_feedback_config.ArtifactModel, conversation_context=context, + artifact_updates=artifact_updates, ) - # Update artifact - conversation_status_str = GC_ConversationStatus.UNDEFINED - match step_data.run_count: # could be bool instead. But maybe a run count use later? - case 0: - conversation_status_str = GC_ConversationStatus.USER_INITIATED - case _: - conversation_status_str = GC_ConversationStatus.USER_RETURNED - - filenames = await attachments_ext.get_attachment_filenames(context) - filenames_str = ", ".join(filenames) - - outline_str: str = "" - if path.exists(storage_directory_for_context(context) / "document_agent/outline.txt"): - outline_str = (storage_directory_for_context(context) / "document_agent/outline.txt").read_text() - - artifact_dict = guided_conversation.get_artifact_dict() - if artifact_dict is not None: - artifact_dict["conversation_status"] = conversation_status_str - artifact_dict["filenames"] = filenames_str - artifact_dict["current_outline"] = outline_str - guided_conversation.set_artifact_dict(artifact_dict) - else: - logger.error("Document Agent State: artifact_dict unavailable.") + step_status = StepStatus.NOT_COMPLETED + gc_conversation_status = GC_ConversationStatus.UNDEFINED + gc_user_decision = GC_UserDecision.UNDEFINED # Run conversation step - step_status = StepStatus.UNDEFINED try: user_message = None - if message is not None and self.get_status() is not StepStatus.INITIATED: + if message is not None: user_message = message.content if len(message.filenames) != 0: user_message = user_message + " Newly attached files: " + filenames_str ( response, - self.gc_conversation_status, - self.gc_user_decision, + gc_conversation_status, + gc_user_decision, ) = await guided_conversation.step_conversation( last_user_message=user_message, ) # this could get cleaned up - if self.gc_conversation_status is GC_ConversationStatus.USER_COMPLETED: - if self.gc_user_decision is GC_UserDecision.EXIT_EARLY: + if gc_conversation_status is GC_ConversationStatus.USER_COMPLETED: + step_status = StepStatus.USER_COMPLETED + if gc_user_decision is GC_UserDecision.EXIT_EARLY: step_status = StepStatus.USER_EXIT_EARLY - else: - step_status = StepStatus.USER_COMPLETED - else: - step_status = StepStatus.NOT_COMPLETED # need to update gc state artifact? @@ -305,7 +207,7 @@ async def _step_gc_get_outline_feedback( metadata, { "debug": { - f"{method_metadata_key}": {"response": response}, + f"{method_metadata_key}": guided_conversation.guided_conversation_agent.to_json(), } }, ) @@ -332,29 +234,29 @@ async def _step_gc_get_outline_feedback( ) ) - logger.info("Document Agent State: Step executed. StepName: %s", self.get_name()) - return step_status + return step_status, gc_user_decision + - async def _step_draft_content( +class StepDraftContent(StepProtocol): + async def execute( self, - step_data: StepData, + run_count: int, attachments_ext: AttachmentsExtension, config: AssistantConfigModel, context: ConversationContext, message: ConversationMessage | None, metadata: dict[str, Any] = {}, - ) -> StepStatus: - logger.info("Document Agent State: Step executing. StepName: %s", self.get_name()) + ) -> tuple[StepStatus, GC_UserDecision]: method_metadata_key = "_step_draft_content" # get conversation related info -- for now, if no message, assuming no prior conversation - conversation = None - participants_list = None + participants_list = await context.get_participants(include_inactive=True) if message is not None: conversation = await context.get_messages(before=message.id) if message.message_type == MessageType.chat: conversation.messages.append(message) - participants_list = await context.get_participants(include_inactive=True) + else: + conversation = await context.get_messages() # get attachments related info attachment_messages = await attachments_ext.get_completion_messages_for_attachments( @@ -376,21 +278,19 @@ async def _step_draft_content( if document_outline is not None: chat_completion_messages.append(_outline_system_message(document_outline)) - if path.exists(storage_directory_for_context(context) / "document_agent/content.txt"): - document_content = (storage_directory_for_context(context) / "document_agent/content.txt").read_text() - if document_content is not None: # only grabs previously written content, not all yet. - chat_completion_messages.append(_content_system_message(document_content)) + document_content = read_document_content(context) + if document_content is not None: # only grabs previously written content, not all yet. + chat_completion_messages.append(_content_system_message(document_content)) # make completion call to openai content: str | None = None async with openai_client.create_client(config.service_config) as client: try: - completion_args = { - "messages": chat_completion_messages, - "model": config.request_config.openai_model, - "response_format": {"type": "text"}, - } - completion = await client.chat.completions.create(**completion_args) + completion = await client.chat.completions.create( + messages=chat_completion_messages, + model=config.request_config.openai_model, + response_format={"type": "text"}, + ) content = completion.choices[0].message.content _on_success_metadata_update(metadata, method_metadata_key, config, chat_completion_messages, completion) @@ -404,7 +304,7 @@ async def _step_draft_content( if content is not None: # store only latest version for now (will keep all versions later as need arises) - (storage_directory_for_context(context) / "document_agent/content.txt").write_text(content) + write_document_content(context, content) # send a command response to the conversation only if from a command. Otherwise return a normal chat message. message_type = MessageType.chat @@ -419,84 +319,74 @@ async def _step_draft_content( ) ) - logger.info("Document Agent State: Step executed. StepName: %s", self.get_name()) - return StepStatus.USER_COMPLETED + return StepStatus.USER_COMPLETED, GC_UserDecision.UNDEFINED - async def _step_gc_get_content_feedback( + +class StepGetContentFeedback(StepProtocol): + async def execute( self, - step_data: StepData, + run_count: int, attachments_ext: AttachmentsExtension, config: AssistantConfigModel, context: ConversationContext, message: ConversationMessage | None, metadata: dict[str, Any] = {}, - ) -> StepStatus: - logger.info("Document Agent State: Step executing. StepName: %s", self.get_name()) + ) -> tuple[StepStatus, GC_UserDecision]: method_metadata_key = "_step_gc_get_content_feedback" + # Update artifact + conversation_status_str = GC_ConversationStatus.USER_INITIATED + if run_count > 0: + conversation_status_str = GC_ConversationStatus.USER_RETURNED + + filenames = await attachments_ext.get_attachment_filenames(context) + filenames_str = ", ".join(filenames) + + outline_str = read_document_outline(context) or "" + content_str = read_document_content(context) or "" + + artifact_updates = { + "conversation_status": conversation_status_str, + "filenames": filenames_str, + "approved_outline": outline_str, + "current_content": content_str, + } + # Initiate Guided Conversation - gc_outline_feedback_config: GuidedConversationConfigModel = GCDraftContentFeedbackConfigModel() guided_conversation = GuidedConversation( config=config, openai_client=openai_client.create_client(config.service_config), - agent_config=gc_outline_feedback_config, + agent_config=gc_draft_content_feedback_config.config, + artifact_model=gc_draft_content_feedback_config.ArtifactModel, conversation_context=context, + artifact_updates=artifact_updates, ) - # Update artifact - conversation_status_str = GC_ConversationStatus.UNDEFINED - match step_data.run_count: # could be bool instead. But maybe a run count use later? - case 0: - conversation_status_str = GC_ConversationStatus.USER_INITIATED - case _: - conversation_status_str = GC_ConversationStatus.USER_RETURNED - - filenames = await attachments_ext.get_attachment_filenames(context) - filenames_str = ", ".join(filenames) - - outline_str: str = "" - if path.exists(storage_directory_for_context(context) / "document_agent/outline.txt"): - outline_str = (storage_directory_for_context(context) / "document_agent/outline.txt").read_text() - - content_str: str = "" - if path.exists(storage_directory_for_context(context) / "document_agent/content.txt"): - content_str = (storage_directory_for_context(context) / "document_agent/content.txt").read_text() - - artifact_dict = guided_conversation.get_artifact_dict() - if artifact_dict is not None: - artifact_dict["conversation_status"] = conversation_status_str - artifact_dict["filenames"] = filenames_str - artifact_dict["approved_outline"] = outline_str - artifact_dict["current_content"] = content_str - guided_conversation.set_artifact_dict(artifact_dict) - else: - logger.error("Document Agent State: artifact_dict unavailable.") + step_status = StepStatus.NOT_COMPLETED + gc_conversation_status = GC_ConversationStatus.UNDEFINED + gc_user_decision = GC_UserDecision.UNDEFINED # Run conversation step - step_status = StepStatus.UNDEFINED try: user_message = None - if message is not None and self.get_status() is not StepStatus.INITIATED: + if message is not None: user_message = message.content # if len(message.filenames) != 0: # Not sure we want to support this right now for content/page # user_message = user_message + " Newly attached files: " + filenames_str ( response, - self.gc_conversation_status, - self.gc_user_decision, + gc_conversation_status, + gc_user_decision, ) = await guided_conversation.step_conversation( last_user_message=user_message, ) # this could get cleaned up - if self.gc_conversation_status is GC_ConversationStatus.USER_COMPLETED: - if self.gc_user_decision is GC_UserDecision.EXIT_EARLY: + if gc_conversation_status is GC_ConversationStatus.USER_COMPLETED: + step_status = StepStatus.USER_COMPLETED + if gc_user_decision is GC_UserDecision.EXIT_EARLY: step_status = StepStatus.USER_EXIT_EARLY - else: - step_status = StepStatus.USER_COMPLETED - else: - step_status = StepStatus.NOT_COMPLETED # need to update gc state artifact? @@ -531,20 +421,21 @@ async def _step_gc_get_content_feedback( ) ) - logger.info("Document Agent State: Step executed. StepName: %s", self.get_name()) - return step_status + return step_status, gc_user_decision - async def _step_finish( + +class StepFinish(StepProtocol): + async def execute( self, - step_data: StepData, + run_count: int, attachments_ext: AttachmentsExtension, config: AssistantConfigModel, context: ConversationContext, message: ConversationMessage | None, metadata: dict[str, Any] = {}, - ) -> StepStatus: + ) -> tuple[StepStatus, GC_UserDecision]: # Can do other things here if necessary - return StepStatus.USER_COMPLETED + return StepStatus.USER_COMPLETED, GC_UserDecision.UNDEFINED # endregion @@ -556,157 +447,17 @@ async def _step_finish( class ModeName(StrEnum): - UNDEFINED = "undefined" DRAFT_OUTLINE = "mode_draft_outline" DRAFT_PAPER = "mode_draft_paper" class ModeStatus(StrEnum): - UNDEFINED = "undefined" INITIATED = "initiated" NOT_COMPLETED = "not_completed" USER_COMPLETED = "user_completed" USER_EXIT_EARLY = "user_exit_early" -class Mode(BaseModel): - name: ModeName = ModeName.UNDEFINED - status: ModeStatus = ModeStatus.UNDEFINED - current_step: Step = Step() - get_next_step: Callable | None = Field(default=None, exclude=True) - - def __init__(self, **data: Any) -> None: - super().__init__(**data) - self._error_check() - - name = data.get("name") - match name: - case ModeName.DRAFT_OUTLINE: - self.get_next_step = self._draft_outline_mode_get_next_step - case ModeName.DRAFT_PAPER: - self.get_next_step = self._draft_paper_mode_get_next_step - if self.get_next_step is not None: - if self.get_step().get_name() is StepName.UNDEFINED: - self.set_step(self.get_next_step()) - - logger.info( - "Document Agent State: Mode loaded. ModeName: %s, ModeStatus: %s, Current StepName: %s, Current StepStatus: %s", - self.get_name(), - self.get_status(), - self.get_step().get_name(), - self.get_step().get_status(), - ) - - def _draft_paper_mode_get_next_step(self) -> Step: - current_step_name = self.get_step().get_name() - logger.info("Document Agent State: Getting next step.") - - match current_step_name: - case StepName.UNDEFINED: - current_step_name = StepName.DRAFT_OUTLINE - case StepName.DRAFT_OUTLINE: - current_step_name = StepName.GC_GET_OUTLINE_FEEDBACK - case StepName.GC_GET_OUTLINE_FEEDBACK: - user_decision = self.get_step().get_gc_user_decision() - if user_decision is not GC_UserDecision.UNDEFINED: - match user_decision: - case GC_UserDecision.UPDATE_OUTLINE: - current_step_name = StepName.DRAFT_OUTLINE - case GC_UserDecision.DRAFT_PAPER: - current_step_name = StepName.DRAFT_CONTENT - case GC_UserDecision.EXIT_EARLY: - current_step_name = StepName.FINISH - case StepName.DRAFT_CONTENT: - current_step_name = StepName.GC_GET_CONTENT_FEEDBACK - case StepName.GC_GET_CONTENT_FEEDBACK: - user_decision = self.get_step().get_gc_user_decision() - if user_decision is not GC_UserDecision.UNDEFINED: - match user_decision: - case GC_UserDecision.UPDATE_CONTENT: - current_step_name = StepName.DRAFT_CONTENT - case GC_UserDecision.DRAFT_NEXT_CONTENT: # not implemented yet - current_step_name = StepName.FINISH - case GC_UserDecision.EXIT_EARLY: - current_step_name = StepName.FINISH - case StepName.FINISH: - return Step(name=StepName.UNDEFINED, status=StepStatus.UNDEFINED) - - return Step(name=current_step_name, status=StepStatus.INITIATED) - - def _draft_outline_mode_get_next_step(self) -> Step: - current_step_name = self.get_step().get_name() - logger.info("Document Agent State: Getting next step.") - - match current_step_name: - case StepName.UNDEFINED: - current_step_name = StepName.DRAFT_OUTLINE - case StepName.DRAFT_OUTLINE: - current_step_name = StepName.GC_GET_OUTLINE_FEEDBACK - case StepName.GC_GET_OUTLINE_FEEDBACK: - user_decision = self.get_step().get_gc_user_decision() - if user_decision is not GC_UserDecision.UNDEFINED: - match user_decision: - case GC_UserDecision.UPDATE_OUTLINE: - current_step_name = StepName.DRAFT_OUTLINE - case GC_UserDecision.DRAFT_PAPER: - current_step_name = StepName.FINISH - case GC_UserDecision.EXIT_EARLY: - current_step_name = StepName.FINISH - case StepName.FINISH: - return Step(name=StepName.UNDEFINED, status=StepStatus.UNDEFINED) - - return Step(name=current_step_name, status=StepStatus.INITIATED) - - def _error_check(self) -> None: - # name and status should either both be UNDEFINED or both be defined. Always. - if (self.name is ModeName.UNDEFINED and self.status is not ModeStatus.UNDEFINED) or ( - self.status is ModeStatus.UNDEFINED and self.name is not ModeName.UNDEFINED - ): - logger.error( - "Document Agent State: Either ModeName or ModeStatus is UNDEFINED, and the other is not. Both must be UNDEFINED at the same time. ModeName: %s, ModeStatus: %s", - self.name, - self.status, - ) - - def reset(self) -> None: - # TODO: consider if this is the right way to reset a mode, fix the # noqa: F841 - self = Mode() # noqa: F841 - - def set_name(self, name: ModeName) -> None: - if name is ModeName.UNDEFINED: # need to reset mode - self.reset() - if name is not self.name: # update if new mode name - self = Mode(name=name, status=ModeStatus.NOT_COMPLETED) - self._error_check() - - def get_name(self) -> ModeName: - self._error_check() - return self.name - - def set_status(self, status: ModeStatus) -> None: - if status is ModeStatus.UNDEFINED: # need to reset mode - self.reset() - self.status = status - self._error_check() - - def get_status(self) -> ModeStatus: - self._error_check() - return self.status - - def is_running(self) -> bool: - if self.status is ModeStatus.NOT_COMPLETED: - return True - if self.status is ModeStatus.INITIATED: - return True - return False # UNDEFINED, USER_EXIT_EARLY, USER_COMPLETED - - def set_step(self, step: Step) -> None: - self.current_step = step - - def get_step(self) -> Step: - return self.current_step - - # endregion @@ -716,32 +467,11 @@ def get_step(self) -> Step: class State(BaseModel): - mode: Mode = Mode() - step_data_list: list[StepData] = [] - - def set_mode(self, mode: Mode) -> None: - self.mode = mode - - def set_step_data(self, step_data: StepData) -> None: - step_name = step_data.step_name - for sd in self.step_data_list: - if sd.step_name == step_name: - sd = step_data - return - # did not exist yet, so adding - self.step_data_list.append(step_data) - - def get_step_data_list(self) -> list[StepData]: - return self.step_data_list - - def get_step_data(self, step_name: StepName) -> StepData: - for sd in self.step_data_list: - if sd.step_name == step_name: - return sd - # did not exist yet, so adding - new_step_data = StepData(step_name=step_name) - self.step_data_list.append(new_step_data) - return new_step_data + step_run_count: dict[str, int] = {} + mode_name: ModeName = ModeName.DRAFT_OUTLINE + mode_status: ModeStatus = ModeStatus.INITIATED + current_step_name: StepName = StepName.DRAFT_OUTLINE + current_step_status: StepStatus = StepStatus.NOT_COMPLETED # endregion @@ -751,60 +481,77 @@ def get_step_data(self, step_name: StepName) -> StepData: # -@staticmethod -def mode_prerequisite_check(state: State, correct_mode_name: ModeName) -> bool: - mode_name = state.mode.get_name() - mode_status = state.mode.get_status() - if mode_name is not correct_mode_name or ( - mode_status is not ModeStatus.NOT_COMPLETED and mode_status is not ModeStatus.INITIATED - ): - logger.error( - "Document Agent State: ModeName: %s, ModeStatus: %s, ModeName called: %s.", - mode_name, - mode_status, - correct_mode_name, - ) - return False - return True - - -@staticmethod -def _get_document_agent_conversation_storage_path(context: ConversationContext, filename: str | None = None) -> Path: +def _get_document_agent_conversation_storage_path(context: ConversationContext) -> Path: """ Get the path to the directory for storing files. """ path = storage_directory_for_context(context) / "document_agent" - if filename: - path /= filename + if not path.exists(): + path.mkdir(parents=True) + return path -@staticmethod -def write_document_agent_conversation_state(context: ConversationContext, state_dict: dict) -> None: +def write_document_agent_conversation_state(context: ConversationContext, state: State) -> None: """ Write the state to a file. """ - json_data = json.dumps(state_dict) path = _get_document_agent_conversation_storage_path(context) - if not path.exists(): - path.mkdir(parents=True) path = path / "state.json" - path.write_text(json_data) + path.write_text(state.model_dump_json()) -@staticmethod -def read_document_agent_conversation_state(context: ConversationContext) -> dict | None: +def read_document_agent_conversation_state(context: ConversationContext) -> State: """ Read the state from a file. """ - path = _get_document_agent_conversation_storage_path(context, "state.json") + path = _get_document_agent_conversation_storage_path(context) / "state.json" if path.exists(): try: json_data = path.read_text() - return json.loads(json_data) + return State.model_validate_json(json_data) except Exception: pass - return None + + return State() + + +def read_document_outline(context: ConversationContext) -> str | None: + """ + Read the outline from a file. + """ + path = _get_document_agent_conversation_storage_path(context) / "outline.txt" + if not path.exists(): + return None + + return path.read_text() + + +def write_document_outline(context: ConversationContext, outline: str) -> None: + """ + Write the outline to a file. + """ + path = _get_document_agent_conversation_storage_path(context) / "outline.txt" + path.write_text(outline) + + +def read_document_content(context: ConversationContext) -> str | None: + """ + Read the content from a file. + """ + path = _get_document_agent_conversation_storage_path(context) / "content.txt" + if not path.exists(): + return None + + return path.read_text() + + +def write_document_content(context: ConversationContext, content: str) -> None: + """ + Write the content to a file. + """ + path = _get_document_agent_conversation_storage_path(context) / "content.txt" + path.write_text(content) @staticmethod diff --git a/assistants/prospector-assistant/assistant/agents/document_agent.py b/assistants/prospector-assistant/assistant/agents/document_agent.py index 021dbb94..ac670252 100644 --- a/assistants/prospector-assistant/assistant/agents/document_agent.py +++ b/assistants/prospector-assistant/assistant/agents/document_agent.py @@ -8,14 +8,18 @@ from semantic_workbench_assistant.assistant_app import ConversationContext from ..config import AssistantConfigModel +from .document.guided_conversation import GC_UserDecision from .document.state import ( - Mode, ModeName, ModeStatus, State, + StepDraftContent, + StepDraftOutline, + StepFinish, + StepGetContentFeedback, + StepGetOutlineFeedback, StepName, StepStatus, - mode_prerequisite_check, read_document_agent_conversation_state, write_document_agent_conversation_state, ) @@ -35,23 +39,6 @@ class DocumentAgent: def __init__(self, attachments_extension: AttachmentsExtension) -> None: self._attachments_extension: AttachmentsExtension = attachments_extension - self._state: State | None = None - - def _save_state(self, state: State, context: ConversationContext) -> None: - if state is None: - logger.error("Document Agent: state to save is None. Cannot write to storage.") - return - self._state = state - write_document_agent_conversation_state(context, state.model_dump()) - - def _load_state(self, context: ConversationContext) -> State | None: - state_dict = read_document_agent_conversation_state(context) - if state_dict is not None: - self._state = State(**state_dict) - else: - logger.info("Document Agent: no state found in storage. Returning an initiated state.") - self._state = None - return self._state async def create_document( self, @@ -84,39 +71,29 @@ async def _run( ) -> bool: # Load State logger.info("Document Agent: State loading.") - state = self._load_state(context) - if state is None: - state = State(mode=Mode(name=mode_name, status=ModeStatus.INITIATED)) - self._save_state(state, context) + state = read_document_agent_conversation_state(context) logger.info("Document Agent: State loaded.") - # Prerequisites - result = mode_prerequisite_check(state, mode_name) - if result is False: - logger.error("Document Agent: Mode %s prerequisite check failed. Resetting mode. Returning.", mode_name) - state.mode.reset() - self._save_state(state, context) - return False - - # Execute - logger.info("Document Agent: Mode executing. ModeName: %s", mode_name) - mode_result_status = await self._mode_execute(state, config, context, message, metadata) - logger.info( - "Document Agent: Mode executed. ModeName: %s, Resulting ModeStatus: %s", mode_name, mode_result_status - ) - if mode_result_status is ModeStatus.UNDEFINED: - logger.error( - "Document Agent: Running mode %s resulted in mode status %s. Resetting mode. Returning", + try: + # Execute + logger.info("Document Agent: Mode executing. ModeName: %s", mode_name) + state.mode_status = await self._mode_execute(state, config, context, message, metadata) + logger.info( + "Document Agent: Mode executed. ModeName: %s, Resulting ModeStatus: %s, Resulting StepName: %s, Resulting StepStatus: %s", mode_name, - mode_result_status, + state.mode_status, + state.current_step_name, + state.current_step_status, ) - state.mode.reset() - self._save_state(state, context) + except Exception: + logger.exception("Document Agent: Mode execution failed.") return False + else: - state.mode.set_status(mode_result_status) - self._save_state(state, context) - return True + # Write state after successful execution + write_document_agent_conversation_state(context, state) + + return True async def _mode_execute( self, @@ -126,203 +103,110 @@ async def _mode_execute( message: ConversationMessage | None, metadata: dict[str, Any] = {}, ) -> ModeStatus: - current_step_name = state.mode.get_step().get_name() - current_step_status = state.mode.get_step().get_status() - - while current_step_status is StepStatus.INITIATED or current_step_status is StepStatus.NOT_COMPLETED: + loop_count = 0 + while state.current_step_status is StepStatus.NOT_COMPLETED: + loop_count += 1 # Execute step method logger.info( "Document Agent: Step executing. Current StepName: %s, Current StepStatus: %s", - current_step_name, - current_step_status, + state.current_step_name, + state.current_step_status, ) - execute_step_method = state.mode.get_step().execute - if execute_step_method is not None: - step_data = state.get_step_data(current_step_name) - new_step_status = await execute_step_method( - step_data, self._attachments_extension, config, context, message, metadata - ) - logger.info( - "Document Agent: Step executed. Current StepName: %s, Resulting StepStatus: %s", - current_step_name, - new_step_status, - ) - state.mode.get_step().set_status(new_step_status) - self._save_state(state, context) - else: - logger.error("Document Agent: step.execute not defined for StepName: %s.", current_step_name) - break - # Update step data - run_count_key = "run_count" + match state.current_step_name: + case StepName.DRAFT_OUTLINE: + step = StepDraftOutline() + + case StepName.GC_GET_OUTLINE_FEEDBACK: + step = StepGetOutlineFeedback() + + case StepName.DRAFT_CONTENT: + step = StepDraftContent() + + case StepName.GC_GET_CONTENT_FEEDBACK: + step = StepGetContentFeedback() + + case StepName.FINISH: + step = StepFinish() + + ( + new_step_status, + new_gc_user_decision, + ) = await step.execute( + run_count=state.step_run_count.get(state.current_step_name) or 0, + attachments_ext=self._attachments_extension, + config=config, + context=context, + message=message if loop_count == 1 else None, + metadata=metadata, + ) logger.info( - "Document Agent: StepData updating (%s). Current StepName: %s", run_count_key, current_step_name + "Document Agent: Step executed. Current StepName: %s, Resulting StepStatus: %s", + state.current_step_name, + new_step_status, ) - step_data = state.get_step_data(current_step_name) - step_data.run_count += 1 - state.set_step_data(step_data) - self._save_state(state, context) + state.step_run_count[state.current_step_name] = state.step_run_count.get(state.current_step_name, 0) + 1 + state.current_step_status = new_step_status # Workflow StepStatus check match new_step_status: case StepStatus.NOT_COMPLETED: - state.mode.get_step().set_status(new_step_status) - state.mode.set_status(ModeStatus.NOT_COMPLETED) - self._save_state(state, context) + state.mode_status = ModeStatus.NOT_COMPLETED logger.info( - "Document Agent: Getting more user input. Remaining in step. StepName: %s", current_step_name + "Document Agent: Getting more user input. Remaining in step. StepName: %s", + state.current_step_name, ) break # ok - get more user input case StepStatus.USER_COMPLETED: - if state.mode.get_next_step is not None: - next_step = state.mode.get_next_step() - if next_step.get_name() is not StepName.UNDEFINED: - state.mode.set_step(next_step) - self._save_state(state, context) - current_step_name = state.mode.get_step().get_name() - current_step_status = state.mode.get_step().get_status() # new step is Status.INITIATED - logger.info( - "Document Agent: Moving on to next step. Next StepName: %s, Next StepStatus: %s", - current_step_name, - current_step_status, - ) - continue # ok - don't need user input yet - else: - state.mode.set_step(next_step) - state.mode.set_status(ModeStatus.USER_COMPLETED) - self._save_state(state, context) - logger.info("Document Agent: No more steps in mode. Completed.") - break # ok - all done :) - else: - logger.error( - "Document Agent: mode.get_next_step not defined for Mode Name: %s.", state.mode.get_name() - ) - break + state.mode_status = ModeStatus.USER_COMPLETED + + def get_next_step(current_step_name: StepName, user_decision: GC_UserDecision) -> StepName: + logger.info("Document Agent State: Getting next step.") + + match current_step_name: + case StepName.DRAFT_OUTLINE: + return StepName.GC_GET_OUTLINE_FEEDBACK + case StepName.GC_GET_OUTLINE_FEEDBACK: + match user_decision: + case GC_UserDecision.UPDATE_OUTLINE: + return StepName.DRAFT_OUTLINE + case GC_UserDecision.DRAFT_PAPER: + return StepName.DRAFT_CONTENT + case GC_UserDecision.EXIT_EARLY: + return StepName.FINISH + case _: + raise ValueError("Invalid user decision.") + case StepName.DRAFT_CONTENT: + return StepName.GC_GET_CONTENT_FEEDBACK + case StepName.GC_GET_CONTENT_FEEDBACK: + match user_decision: + case GC_UserDecision.UPDATE_CONTENT: + return StepName.DRAFT_CONTENT + case GC_UserDecision.DRAFT_NEXT_CONTENT: + return StepName.DRAFT_CONTENT + case GC_UserDecision.EXIT_EARLY: + return StepName.FINISH + case _: + raise ValueError("Invalid user decision.") + case StepName.FINISH: + return StepName.FINISH + + next_step = get_next_step(state.current_step_name, new_gc_user_decision) + state.current_step_name = next_step + state.current_step_status = StepStatus.NOT_COMPLETED + logger.info( + "Document Agent: Moving on to next step. Next StepName: %s, Next StepStatus: %s", + state.current_step_name, + state.current_step_status, + ) + continue # ok - don't need user input yet case StepStatus.USER_EXIT_EARLY: - state.mode.get_step().set_status(new_step_status) - state.mode.set_status(ModeStatus.USER_EXIT_EARLY) - self._save_state(state, context) + state.mode_status = ModeStatus.USER_EXIT_EARLY logger.info("Document Agent: User exited early. Completed.") break # ok - done early :) - case _: # UNDEFINED, INITIATED - logger.error( - "Document Agent: step.execute for StepName: %s resulted in StepStatus: %s. Resetting mode %s.", - current_step_name, - new_step_status, - state.mode.get_name(), - ) - state.mode.reset() - self._save_state(state, context) - break # problem - - return state.mode.get_status() + return state.mode_status # endregion - - -# Not currently used -# async def receive_command( -# self, -# config: AssistantConfigModel, -# context: ConversationContext, -# message: ConversationMessage, -# metadata: dict[str, Any] = {}, -# ) -> None: -# self._state = self._load_state(context) -# -# # remove initial "/". This is not intuitive to me. -# msg_command_name = message.command_name -# msg_command_name = msg_command_name.replace("/", "") -# -# # check if available. If not, ignore for now. -# command_found = False -# for command in self.commands: -# if command.__name__ == msg_command_name: -# logger.info("Found command %s", message.command_name) -# command_found = True -# command(config, context, message, metadata) # does not handle command with args or async commands -# break -# if not command_found: -# logger.warning("Could not find command %s", message.command_name) - -# Not currently used -# def _set_mode_draft_paper( -# self, -# config: AssistantConfigModel, -# context: ConversationContext, -# message: ConversationMessage | None, -# metadata: dict[str, Any] = {}, -# ) -> None: -# # Pre-requisites -# if self._state is None: -# logger.error("Document Agent state is None. Returning.") -# return -# -# mode = self._state.mode -# if mode.is_running(): -# logger.warning("Document Agent already in mode: %s. Cannot change modes.", mode.get_name()) -# return -# -# # Run -# self._state.mode = Mode(name=ModeName.DRAFT_PAPER, status=Status.INITIATED) -# self._save_state(context) - -# not used currently -# async def _mode_draft_paper( -# self, -# config: AssistantConfigModel, -# context: ConversationContext, -# message: ConversationMessage | None, -# metadata: dict[str, Any] = {}, -# ) -> Status: -# # Pre-requisites -# if self._state is None: -# logger.error("Document Agent state is None. Returning.") -# return Status.UNDEFINED -# -# mode = self._state.mode -# mode_name = mode.get_name() -# mode_status = mode.get_status() -# -# if mode_name is not ModeName.DRAFT_PAPER or ( -# mode_status is not Status.NOT_COMPLETED and mode_status is not Status.INITIATED -# ): -# logger.error( -# "Document Agent state mode: %s, mode called: %s, state mode completion status: %s. Resetting Mode.", -# mode_name, -# ModeName.DRAFT_PAPER, -# mode_status, -# ) -# self._state.mode.reset() -# self._save_state(context) -# return self._state.mode.get_status() -# -# # Setup on first run. -# if mode_status is Status.INITIATED: -# self._state.mode.set_step_order( -# [ -# {"step_name": StepName.DO_DRAFT_OUTLINE, "run_count": 0}, -# {"step_name": StepName.DO_GC_GET_OUTLINE_FEEDBACK, "run_count": 0}, -# {"step_name": StepName.DP_DRAFT_CONTENT, "run_count": 0}, -# ], -# ) -# logger.info("Document Agent mode (%s) at beginning.", mode_name) -# first_step_name = self._state.mode.get_step_order()[0].get("step_name") -# if not isinstance(first_step_name, StepName): -# logger.error("Document Agent: StepName could not be found in Mode's step order.") -# self._state.mode.reset() -# return self._state.mode.get_status() -# self._state.mode.set_step(Step(name=first_step_name, status=Status.INITIATED)) -# self._save_state(context) -# -# self._step_name_to_method: dict[StepName, Callable] = { -# StepName.DO_DRAFT_OUTLINE: self._step_draft_outline, -# StepName.DO_GC_GET_OUTLINE_FEEDBACK: self._step_gc_get_outline_feedback, -# StepName.DP_DRAFT_CONTENT: self._step_draft_content, -# } -# -# # Run -# return await self._run_mode(config, context, message, metadata) diff --git a/assistants/prospector-assistant/assistant/config.py b/assistants/prospector-assistant/assistant/config.py index 628af71b..56219c16 100644 --- a/assistants/prospector-assistant/assistant/config.py +++ b/assistants/prospector-assistant/assistant/config.py @@ -123,7 +123,7 @@ class AssistantConfigModel(BaseModel): title="Guided Workflow", description="The workflow extension to guide this conversation.", ), - ] = "Form Completion" + ] = "Document Creation" enable_debug_output: Annotated[ bool, diff --git a/assistants/prospector-assistant/assistant/form_fill_extension/inspector.py b/assistants/prospector-assistant/assistant/form_fill_extension/inspector.py index ea4688e1..003b0ef1 100644 --- a/assistants/prospector-assistant/assistant/form_fill_extension/inspector.py +++ b/assistants/prospector-assistant/assistant/form_fill_extension/inspector.py @@ -1,6 +1,5 @@ import contextlib import json -from enum import StrEnum from hashlib import md5 from pathlib import Path from typing import Callable @@ -13,15 +12,12 @@ ) -class StateProjection(StrEnum): +def project_to_yaml(state: dict) -> str: """ - The projection to use when displaying the state. + Project the state to a yaml code block. """ - - original_content = "original_content" - """Return the state as string content.""" - json_to_yaml = "json_to_yaml" - """Return the state as a yaml code block.""" + state_as_yaml = yaml.dump(state, sort_keys=False) + return f"```yaml\n{state_as_yaml}\n```" class FileStateInspector(ReadOnlyAssistantConversationInspectorStateProvider): @@ -34,8 +30,7 @@ def __init__( display_name: str, file_path_source: Callable[[ConversationContext], Path], description: str = "", - projection: StateProjection = StateProjection.json_to_yaml, - select_field: str = "", + projector: Callable[[dict], str | dict] = project_to_yaml, ) -> None: self._state_id = md5( (type(self).__name__ + "_" + display_name).encode("utf-8"), usedforsecurity=False @@ -43,8 +38,7 @@ def __init__( self._display_name = display_name self._file_path_source = file_path_source self._description = description - self._projection = projection - self._select_field = select_field + self._projector = projector @property def state_id(self) -> str: @@ -66,14 +60,6 @@ def read_state(path: Path) -> dict: state = read_state(self._file_path_source(context)) - selected = state.get(self._select_field) if self._select_field else state + projected = self._projector(state) - match self._projection: - case StateProjection.original_content: - return AssistantConversationInspectorStateDataModel(data={"content": selected}) - case StateProjection.json_to_yaml: - state_as_yaml = yaml.dump(selected, sort_keys=False) - # return the state as a yaml code block, as it is easier to read than json - return AssistantConversationInspectorStateDataModel( - data={"content": f"```yaml\n{state_as_yaml}\n```"}, - ) + return AssistantConversationInspectorStateDataModel(data={"content": projected}) diff --git a/assistants/prospector-assistant/assistant/form_fill_extension/steps/_guided_conversation.py b/assistants/prospector-assistant/assistant/form_fill_extension/steps/_guided_conversation.py index 61baecb5..b18b581b 100644 --- a/assistants/prospector-assistant/assistant/form_fill_extension/steps/_guided_conversation.py +++ b/assistants/prospector-assistant/assistant/form_fill_extension/steps/_guided_conversation.py @@ -54,7 +54,7 @@ async def engine( kernel=kernel, service_id=service_id, # context - artifact=artifact_type.model_construct(), + artifact=artifact_type, rules=definition.rules, conversation_flow=definition.conversation_flow, context=definition.context, @@ -69,7 +69,7 @@ async def engine( kernel=kernel, service_id=service_id, # context - artifact=artifact_type.model_construct(), + artifact=artifact_type, rules=definition.rules, conversation_flow=definition.conversation_flow, context=definition.context, diff --git a/assistants/prospector-assistant/assistant/form_fill_extension/steps/fill_form_step.py b/assistants/prospector-assistant/assistant/form_fill_extension/steps/fill_form_step.py index a38b4064..6daefbd8 100644 --- a/assistants/prospector-assistant/assistant/form_fill_extension/steps/fill_form_step.py +++ b/assistants/prospector-assistant/assistant/form_fill_extension/steps/fill_form_step.py @@ -13,7 +13,7 @@ from semantic_workbench_assistant.config import UISchema from .. import state -from ..inspector import FileStateInspector, StateProjection +from ..inspector import FileStateInspector from . import _guided_conversation, _llm from .types import ( Context, @@ -299,11 +299,14 @@ def _get_step_state_file_path(context: ConversationContext) -> Path: return storage_directory_for_context(context, "fill_form_state.json") +def project_populated_form(state: dict) -> str: + return state.get("populated_form_markdown") or "" + + _populated_form_state_inspector = FileStateInspector( display_name="Populated Form", file_path_source=_get_step_state_file_path, - projection=StateProjection.original_content, - select_field="populated_form_markdown", + projector=project_populated_form, ) @@ -352,6 +355,7 @@ def field_value(field_id: str) -> str | list[str]: def field_values(fields: list[state.FormField]) -> str: markdown_fields: list[str] = [] + for field in fields: value = field_value(field.id) diff --git a/libraries/python/guided-conversation/guided_conversation/guided_conversation_agent.py b/libraries/python/guided-conversation/guided_conversation/guided_conversation_agent.py index 2c29ee3b..ea68befb 100644 --- a/libraries/python/guided-conversation/guided_conversation/guided_conversation_agent.py +++ b/libraries/python/guided-conversation/guided_conversation/guided_conversation_agent.py @@ -63,7 +63,7 @@ class GuidedConversation: def __init__( self, kernel: Kernel, - artifact: BaseModel, + artifact: type[BaseModel], rules: list[str], conversation_flow: str | None, context: str | None, @@ -383,7 +383,7 @@ def from_json( cls, json_data: dict, kernel: Kernel, - artifact: BaseModel, + artifact: type[BaseModel], rules: list[str], conversation_flow: str | None, context: str | None, diff --git a/libraries/python/guided-conversation/guided_conversation/plugins/artifact.py b/libraries/python/guided-conversation/guided_conversation/plugins/artifact.py index 9bc05cfd..97198b6a 100644 --- a/libraries/python/guided-conversation/guided_conversation/plugins/artifact.py +++ b/libraries/python/guided-conversation/guided_conversation/plugins/artifact.py @@ -60,7 +60,7 @@ class Artifact: """ def __init__( - self, kernel: Kernel, service_id: str, input_artifact: BaseModel, max_artifact_field_retries: int = 2 + self, kernel: Kernel, service_id: str, input_artifact: type[BaseModel], max_artifact_field_retries: int = 2 ) -> None: """ Initialize the Artifact plugin with the given Pydantic base model. @@ -267,7 +267,7 @@ def get_failed_fields(self) -> list[str]: fields.append(field) return fields - def _initialize_artifact(self, artifact_model: BaseModel) -> BaseModelLLM: + def _initialize_artifact(self, artifact_model: type[BaseModel]) -> BaseModelLLM: """Create a new artifact model based on the one provided by the user with "Unanswered" set for all fields.