Skip to content

Commit

Permalink
Workflow lite (#256)
Browse files Browse the repository at this point in the history
Early merge of workflow-lite experiment. This helped expose an issue
with SSE event listeners, so merging this to go fix that separately.
Will further develop this experimental feature after the event fix and
early usage.
  • Loading branch information
bkrabach authored Nov 22, 2024
1 parent bd4d47c commit 6cf0d4f
Show file tree
Hide file tree
Showing 20 changed files with 1,093 additions and 222 deletions.
13 changes: 9 additions & 4 deletions assistants/explorer-assistant/assistant/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from assistant_extensions.artifacts import ArtifactsExtension
from assistant_extensions.artifacts._model import ArtifactsConfigModel
from assistant_extensions.attachments import AttachmentsExtension
from assistant_extensions.workflows import WorkflowsConfigModel, WorkflowsExtension
from content_safety.evaluators import CombinedContentSafetyEvaluator
from openai.types.chat import (
ChatCompletion,
Expand Down Expand Up @@ -79,12 +80,17 @@ async def content_evaluator_factory(context: ConversationContext) -> ContentSafe
)


async def artifact_config_provider(context: AssistantContext) -> ArtifactsConfigModel:
async def artifacts_config_provider(context: AssistantContext) -> ArtifactsConfigModel:
return (await assistant_config.get(context)).extensions_config.artifacts


artifacts_extension = ArtifactsExtension(assistant, artifact_config_provider)
async def workflows_config_provider(context: AssistantContext) -> WorkflowsConfigModel:
return (await assistant_config.get(context)).extensions_config.workflows


artifacts_extension = ArtifactsExtension(assistant, artifacts_config_provider)
attachments_extension = AttachmentsExtension(assistant)
workflows_extension = WorkflowsExtension(assistant, "content_safety", workflows_config_provider)

#
# create the FastAPI app instance
Expand Down Expand Up @@ -134,7 +140,7 @@ async def on_message_created(
metadata: dict[str, Any] = {"debug": {"content_safety": event.data.get(content_safety.metadata_key, {})}}

# Prospector assistant response
await respond_to_conversation(context, config, message, metadata)
await respond_to_conversation(context, config, metadata)


@assistant.events.conversation.on_created
Expand Down Expand Up @@ -172,7 +178,6 @@ async def on_conversation_created(context: ConversationContext) -> None:
async def respond_to_conversation(
context: ConversationContext,
config: AssistantConfigModel,
message: ConversationMessage,
metadata: dict[str, Any] = {},
) -> None:
"""
Expand Down
9 changes: 9 additions & 0 deletions assistants/explorer-assistant/assistant/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import openai_client
from assistant_extensions.artifacts import ArtifactsConfigModel
from assistant_extensions.attachments import AttachmentsConfigModel
from assistant_extensions.workflows import WorkflowsConfigModel
from content_safety.evaluators import CombinedContentSafetyEvaluatorConfig
from pydantic import BaseModel, ConfigDict, Field
from semantic_workbench_assistant.config import UISchema
Expand All @@ -25,6 +26,14 @@


class ExtensionsConfigModel(BaseModel):
workflows: Annotated[
WorkflowsConfigModel,
Field(
title="Workflows Extension Configuration",
description="Configuration for the workflows extension.",
),
] = WorkflowsConfigModel()

attachments: Annotated[
AttachmentsConfigModel,
Field(
Expand Down
215 changes: 115 additions & 100 deletions assistants/explorer-assistant/uv.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions assistants/prospector-assistant/assistant/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ async def send_error_message_on_exception(context: ConversationContext):
# endregion

#
# region Form fill extension helpers
# region Form Fill Extension Helpers
#


Expand Down Expand Up @@ -262,7 +262,7 @@ async def get(filename: str) -> str:


#
# region document agent extension helpers
# region Document Extension Helpers
#


Expand Down
272 changes: 174 additions & 98 deletions assistants/prospector-assistant/uv.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions libraries/python/assistant-extensions/.vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
"**/__pycache__": true
},
"cSpell.words": [
"deepmerge",
"DMAIC",
"endregion",
"Excalidraw",
"openai",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ async def _get_attachment_for_file(
content = ""
error = ""
# process the file to create an attachment
async with context.set_status(f"updating attachment {file.filename} ..."):
async with context.set_status(f"updating attachment {file.filename}..."):
try:
# read the content of the file
file_bytes = await _read_conversation_file(context, file)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from ._model import WorkflowsConfigModel
from ._workflows import WorkflowsExtension, WorkflowsProcessingErrorHandler

__all__ = ["WorkflowsExtension", "WorkflowsConfigModel", "WorkflowsProcessingErrorHandler"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from typing import Annotated, Literal, Union

from pydantic import BaseModel, Field
from semantic_workbench_assistant.config import UISchema


class UserProxyWorkflowDefinition(BaseModel):
class Config:
json_schema_extra = {
"required": ["command", "name", "description", "user_messages"],
}

workflow_type: Annotated[
Literal["user_proxy"],
Field(
description="The type of workflow.",
),
UISchema(widget="hidden"),
] = "user_proxy"
command: Annotated[
str,
Field(
description="The command that will trigger the workflow. The command should be unique and not conflict with other commands and should only include alphanumeric characters and underscores.",
),
] = ""
name: Annotated[
str,
Field(
description="The name of the workflow, to be displayed in the help, logs, and status messages.",
),
] = ""
description: Annotated[
str,
Field(
description="A description of the workflow that will be displayed in the help.",
),
UISchema(widget="textarea"),
] = ""
user_messages: Annotated[
list[str],
Field(
description="A list of user messages that will be sequentially sent to the assistant during the workflow.",
),
UISchema(schema={"items": {"widget": "textarea"}}),
] = []


WorkflowDefinition = Union[UserProxyWorkflowDefinition]


class WorkflowsConfigModel(BaseModel):
enabled: Annotated[
bool,
Field(
description="Enable the workflows feature.",
),
] = False

workflow_definitions: Annotated[
list[WorkflowDefinition],
Field(
description="A list of workflow definitions.",
),
] = []
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import asyncio
import logging
from typing import Any, Awaitable, Callable

import deepmerge
from semantic_workbench_api_model.workbench_model import (
ConversationEvent,
ConversationMessage,
MessageSender,
MessageType,
NewConversationMessage,
)
from semantic_workbench_assistant.assistant_app import AssistantAppProtocol, AssistantContext, ConversationContext

from assistant_extensions.workflows.runners._user_proxy import UserProxyRunner

from ._model import WorkflowsConfigModel

logger = logging.getLogger(__name__)

WorkflowsProcessingErrorHandler = Callable[[ConversationContext, str, Exception], Awaitable]


trigger_command = "workflow"


async def log_and_send_message_on_error(context: ConversationContext, filename: str, e: Exception) -> None:
"""
Default error handler for attachment processing, which logs the exception and sends
a message to the conversation.
"""

logger.exception("exception occurred processing attachment", exc_info=e)
await context.send_messages(
NewConversationMessage(
content=f"There was an error processing the attachment ({filename}): {e}",
message_type=MessageType.notice,
metadata={"attribution": "workflows"},
)
)


class WorkflowsExtension:
def __init__(
self,
assistant: AssistantAppProtocol,
content_safety_metadata_key: str,
config_provider: Callable[[AssistantContext], Awaitable[WorkflowsConfigModel]],
error_handler: WorkflowsProcessingErrorHandler = log_and_send_message_on_error,
) -> None:
"""
WorkflowsExtension enables the assistant to execute pre-configured workflows. Current workflows act
as an auto-proxy for a series of user messages. Future workflows may include more complex interactions.
"""

self._error_handler = error_handler
self._user_proxy_runner = UserProxyRunner(config_provider, error_handler)

@assistant.events.conversation.message.command.on_created
async def on_command_message_created(
context: ConversationContext, event: ConversationEvent, message: ConversationMessage
) -> None:
config = await config_provider(context.assistant)
metadata: dict[str, Any] = {"debug": {"content_safety": event.data.get(content_safety_metadata_key, {})}}

if not config.enabled or message.command_name != f"/{trigger_command}":
return

if len(message.command_args) > 0:
await self.on_command(config, context, message, metadata)
else:
await self.on_help(config, context, metadata)

async def on_help(
self,
config: WorkflowsConfigModel,
context: ConversationContext,
metadata: dict[str, Any] = {},
) -> None:
# Iterate over the workflow definitions and create a list of commands in markdown format
content = "Available workflows:\n"
for workflow in config.workflow_definitions:
content += f"- `{workflow.command}`: {workflow.description}\n"

# send the message
await context.send_messages(
NewConversationMessage(
content=content,
message_type=MessageType.command_response,
metadata=deepmerge.always_merger.merge(
metadata,
{"attribution": "workflows"},
),
)
)

async def on_command(
self,
config: WorkflowsConfigModel,
context: ConversationContext,
message: ConversationMessage,
metadata: dict[str, Any] = {},
) -> None:
# find the workflow definition
workflow_command = message.command_args.split(" ")[0]
workflow_definition = None
for workflow in config.workflow_definitions:
if workflow.command == workflow_command:
workflow_definition = workflow
break

if workflow_definition is None:
await self.on_help(config, context, metadata)
return

# run the workflow in the background
asyncio.create_task(self.run_workflow(context, workflow_definition, message.sender, metadata))

async def run_workflow(
self,
context: ConversationContext,
workflow_definition: Any,
send_as: MessageSender,
metadata: dict[str, Any] = {},
) -> None:
try:
await self._user_proxy_runner.run(context, workflow_definition, send_as, metadata)
except Exception as e:
await self._error_handler(context, workflow_definition.command, e)
Loading

0 comments on commit 6cf0d4f

Please sign in to comment.