Skip to content

Commit

Permalink
Merge branch 'microsoft:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
bkrabach authored Feb 19, 2025
2 parents e1c6fef + dbdb441 commit e9c7da4
Show file tree
Hide file tree
Showing 30 changed files with 813 additions and 429 deletions.
192 changes: 99 additions & 93 deletions assistants/skill-assistant/assistant/skill_assistant.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
ConversationContext,
)
from skill_library import Engine
from skill_library.skills.common.common_skill import CommonSkill, CommonSkillConfig
from skill_library.skills.posix.posix_skill import PosixSkill, PosixSkillConfig
from skill_library.usage import get_routine_usage
from skill_library.skills.common import CommonSkill, CommonSkillConfig
from skill_library.skills.eval import EvalSkill, EvalSkillConfig
from skill_library.skills.meta import MetaSkill, MetaSkillConfig
from skill_library.skills.posix import PosixSkill, PosixSkillConfig
from skill_library.skills.research import ResearchSkill, ResearchSkillConfig

from assistant.skill_event_mapper import SkillEventMapper
from assistant.workbench_helpers import WorkbenchMessageProvider
Expand Down Expand Up @@ -81,31 +83,15 @@ async def content_evaluator_factory(conversation_context: ConversationContext) -
app = assistant_service.fastapi_app()


# The AssistantApp class provides a set of decorators for adding event handlers
# to respond to conversation events. In VS Code, typing "@assistant." (or the
# name of your AssistantApp instance) will show available events and methods.
#
# See the semantic-workbench-assistant AssistantApp class for more information
# on available events and methods. Examples:
# - @assistant.events.conversation.on_created (event triggered when the
# assistant is added to a conversation)
# - @assistant.events.conversation.participant.on_created (event triggered when
# a participant is added)
# - @assistant.events.conversation.message.on_created (event triggered when a
# new message of any type is created)
# - @assistant.events.conversation.message.chat.on_created (event triggered when
# a new chat message is created)

# This assistant registry is used to manage the assistants for this service and
# This engine registry is used to manage the skill engines for this service and
# to register their event subscribers so we can map events to the workbench.
#
# NOTE: Currently, the skill assistant library doesn't have the notion of
# "conversations" so we map a skill library assistant to a particular
# conversation in the workbench. This means if you have a different conversation
# with the same "skill assistant" it will appear as a different assistant in the
# skill assistant library. We can improve this in the future by adding a
# conversation ID to the skill assistant library and mapping it to a
# conversation in the workbench.
# NOTE: Currently, the skill library doesn't have the notion of "conversations"
# so we map a skill library engine to a particular conversation in the
# workbench. This means if you have a different conversation with the same
# "skill assistant" it will appear as a different engine in the skill assistant
# library. We can improve this in the future by adding a conversation ID to the
# skill library and mapping it to a conversation in the workbench.
engine_registry = SkillEngineRegistry()


Expand All @@ -116,7 +102,7 @@ async def on_conversation_created(conversation_context: ConversationContext) ->
Handle the event triggered when the assistant is added to a conversation.
"""

# send a welcome message to the conversation
# Send a welcome message to the conversation.
config = await assistant_config.get(conversation_context.assistant)
welcome_message = config.welcome_message
await conversation_context.send_messages(
Expand All @@ -128,52 +114,16 @@ async def on_conversation_created(conversation_context: ConversationContext) ->
)


@assistant_service.events.conversation.message.chat.on_created
async def on_message_created(
conversation_context: ConversationContext, event: ConversationEvent, message: ConversationMessage
) -> None:
"""Handle new chat messages"""
logger.debug("Message received", extra_data({"content": message.content}))

config = await assistant_config.get(conversation_context.assistant)
engine = await get_or_register_skill_engine(conversation_context, config)

# Check if routine is running
if engine.is_routine_running():
try:
logger.debug("Resuming routine with message", extra_data({"message": message.content}))
resume_task = asyncio.create_task(engine.resume_routine(message.content))
resume_task.add_done_callback(
lambda t: logger.debug("Routine resumed", extra_data({"success": not t.exception()}))
)
except Exception as e:
logger.error(f"Failed to resume routine: {e}")
finally:
return

# Use a chat driver to respond.
async with conversation_context.set_status("thinking..."):
chat_driver_config = ChatDriverConfig(
openai_client=openai_client.create_client(config.service_config),
model=config.chat_driver_config.openai_model,
instructions=config.chat_driver_config.instructions,
message_provider=WorkbenchMessageProvider(conversation_context.id, conversation_context),
functions=ChatFunctions(engine).list_functions(),
)
chat_driver = ChatDriver(chat_driver_config)
chat_functions = ChatFunctions(engine)
chat_driver_config.functions = [chat_functions.list_routines]

metadata: dict[str, Any] = {"debug": {"content_safety": event.data.get(content_safety.metadata_key, {})}}
await chat_driver.respond(message.content, metadata=metadata or {})


@assistant_service.events.conversation.message.command.on_created
async def on_command_message_created(
conversation_context: ConversationContext, event: ConversationEvent, message: ConversationMessage
) -> None:
"""
Handle the event triggered when a new command message is created in the conversation.
Handle the event triggered when a new command message is created in the
conversation. Commands in the skill assistant currently are oriented around
running skills manually. We will update this in the future to add a few more
commands that we'll register to the chat driver so we can call them
conversationally.
"""

config = await assistant_config.get(conversation_context.assistant)
Expand All @@ -188,7 +138,8 @@ async def on_command_message_created(
```markdown
- __/help__: Display this help message.
- __/list_routines__: List all routines.
- __/run("<name>", ...args)__: Run a routine.
- __/run__("<name>", ...args): Run a routine.
- __/reset__: Reset the assistant.
```
""").strip()
await conversation_context.send_messages(
Expand All @@ -198,6 +149,10 @@ async def on_command_message_created(
),
)
case _:
"""
For every other command we receive, we're going to try to map it to
one of the registered ChatFunctions below and execute the command.
"""
try:
function_string, args, kwargs = ToolFunctions.parse_fn_string(command_string)
if not function_string:
Expand Down Expand Up @@ -225,16 +180,61 @@ async def on_command_message_created(
await conversation_context.send_messages(
NewConversationMessage(
content=str(result),
message_type=MessageType.notice,
message_type=MessageType.note,
),
)


# Get or register an assistant for the conversation.
@assistant_service.events.conversation.message.chat.on_created
async def on_message_created(
conversation_context: ConversationContext, event: ConversationEvent, message: ConversationMessage
) -> None:
"""Handle new chat messages"""
logger.debug("Message received", extra_data({"content": message.content}))

config = await assistant_config.get(conversation_context.assistant)
engine = await get_or_register_skill_engine(conversation_context, config)

# Check if routine is running.
if engine.is_routine_running():
try:
logger.debug("Resuming routine with message", extra_data({"message": message.content}))
resume_task = asyncio.create_task(engine.resume_routine(message.content))
resume_task.add_done_callback(
lambda t: logger.debug("Routine resumed", extra_data({"success": not t.exception()}))
)
except Exception as e:
logger.error(f"Failed to resume routine: {e}")
finally:
return

# Use a chat driver to respond.
async with conversation_context.set_status("thinking..."):
chat_driver_config = ChatDriverConfig(
openai_client=openai_client.create_client(config.service_config),
model=config.chat_driver_config.openai_model,
instructions=config.chat_driver_config.instructions,
message_provider=WorkbenchMessageProvider(conversation_context.id, conversation_context),
functions=ChatFunctions(engine).list_functions(),
)
chat_driver = ChatDriver(chat_driver_config)
chat_functions = ChatFunctions(engine)
chat_driver_config.functions = [chat_functions.list_routines]

metadata: dict[str, Any] = {"debug": {"content_safety": event.data.get(content_safety.metadata_key, {})}}
await chat_driver.respond(message.content, metadata=metadata or {})


async def get_or_register_skill_engine(
conversation_context: ConversationContext, config: AssistantConfigModel
) -> Engine:
# Get an assistant from the registry.
"""
Get or register a skill engine for the conversation. This is used to manage
the skill engines for this service and to register their event subscribers
so we can map events to the workbench.
"""

# Get an engine from the registry.
engine_id = conversation_context.id
engine = engine_registry.get_engine(engine_id)

Expand All @@ -246,12 +246,18 @@ async def get_or_register_skill_engine(
language_model = openai_client.create_client(config.service_config)
message_provider = WorkbenchMessageProvider(engine_id, conversation_context)

# Create the engine and register it. This is where we configure which
# skills the engine can use and their configuration.
engine = Engine(
engine_id=conversation_context.id,
message_history_provider=message_provider.get_history,
drive_root=assistant_drive_root,
metadata_drive_root=assistant_metadata_drive_root,
skills=[
(
MetaSkill,
MetaSkillConfig(name="meta", language_model=language_model, drive=assistant_drive.subdrive("meta")),
),
(
CommonSkill,
CommonSkillConfig(
Expand All @@ -260,6 +266,14 @@ async def get_or_register_skill_engine(
drive=assistant_drive.subdrive("common"),
),
),
(
EvalSkill,
EvalSkillConfig(
name="eval",
language_model=language_model,
drive=assistant_drive.subdrive("eval"),
),
),
(
PosixSkill,
PosixSkillConfig(
Expand All @@ -268,12 +282,14 @@ async def get_or_register_skill_engine(
mount_dir="/mnt/data",
),
),
# GuidedConversationSkillDefinition(
# name="guided_conversation",
# language_model=language_model,
# drive=assistant_drive.subdrive("guided_conversation"),
# chat_driver_config=chat_driver_config,
# ),
(
ResearchSkill,
ResearchSkillConfig(
name="research",
language_model=language_model,
drive=assistant_drive.subdrive("research"),
),
),
],
)

Expand All @@ -284,35 +300,25 @@ async def get_or_register_skill_engine(

class ChatFunctions:
"""
These functions provide usage context and output markdown. It's a layer
closer to the assistant.
These are functions that can be run from the chat.
"""

def __init__(self, engine: Engine) -> None:
self.engine = engine

async def clear_stack(self) -> str:
"""Clears the assistant's routine stack and event queue."""
async def reset(self) -> str:
"""Resets the skill engine run state. Useful for troubleshooting."""
await self.engine.clear(include_drives=False)
return "Assistant stack cleared."

async def list_routines(self) -> str:
"""Lists all the routines available in the assistant."""

routines: list[str] = []
for skill_name, skill in self.engine._skills.items():
for routine_name in skill.list_routines():
routine = skill.get_routine(routine_name)
if not routine:
continue
usage = get_routine_usage(routine, f"{skill_name}.{routine_name}")
routines.append(f"- {usage.to_markdown()}")

routines = self.engine.routines_usage()
if not routines:
return "No routines available."

routine_string = "```markdown\n" + "\n".join(routines) + "\n```"
return routine_string
return "```markdown\n" + routines + "\n```"

async def run(self, designation: str, *args, **kwargs) -> str:
try:
Expand Down
5 changes: 5 additions & 0 deletions libraries/python/openai-client/openai_client/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,11 @@ async def execute_function_string(self, function_string: str, string_response: b

@staticmethod
def parse_fn_string(function_string: str) -> tuple[str | None, list[Any], dict[str, Any]]:
"""
Parse a string representing a function call into its name, positional
arguments, and keyword arguments.
"""

# As a convenience, remove any leading slashes.
function_string = function_string.lstrip("/")

Expand Down
Loading

0 comments on commit e9c7da4

Please sign in to comment.