Skip to content

Commit

Permalink
feat: Implement group chat, message routing, and termination
Browse files Browse the repository at this point in the history
* Implement message handlers for GroupChatManagerBase
* Implement wrapping the runtime for GroupChatBase
* Define the adapter layer between Core and AgentChat
* Definte a basic StopMessage-based termination condition
* Implement de-minimis smoke test
  • Loading branch information
lokitoth committed Feb 7, 2025
1 parent 74dd993 commit 2406012
Show file tree
Hide file tree
Showing 12 changed files with 899 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ public struct AgentName

private static readonly Regex AgentNameRegex = new Regex($"^{IdStartClass}{IdContinueClass}*$", RegexOptions.Compiled | RegexOptions.Singleline);

public string Name { get; }
public string Value { get; }

public AgentName(string name)
{
AgentName.CheckValid(name);

this.Name = name;
this.Value = name;
}

public static bool IsValid(string name) => AgentNameRegex.IsMatch(name);
Expand All @@ -110,7 +110,7 @@ public static void CheckValid(string name)
}

// Implicit cast to string
public static implicit operator string(AgentName agentName) => agentName.Name;
public static implicit operator string(AgentName agentName) => agentName.Value;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// ChatAgentRouter.cs

using Microsoft.AutoGen.AgentChat.Abstractions;
using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.Core;
using Microsoft.Extensions.Logging;

namespace Microsoft.AutoGen.AgentChat.GroupChat;

public struct AgentChatConfig(IChatAgent chatAgent, string parentTopicType, string outputTopicType)
{
public string ParticipantTopicType => this.Name;
public string ParentTopicType { get; } = parentTopicType;
public string OutputTopicType { get; } = outputTopicType;

public IChatAgent ChatAgent { get; } = chatAgent;

public string Name => this.ChatAgent.Name;
public string Description => this.ChatAgent.Description;
}

internal sealed class ChatAgentRouter : HostableAgentAdapter,
IHandle<GroupChatStart>,
IHandle<GroupChatAgentResponse>,
IHandle<GroupChatRequestPublish>,
IHandle<GroupChatReset>
{
private readonly TopicId parentTopic;
private readonly TopicId outputTopic;
private readonly IChatAgent agent;

public ChatAgentRouter(AgentInstantiationContext agentCtx, AgentChatConfig config, ILogger<BaseAgent>? logger = null) : base(agentCtx, config.Description, logger)
{
this.parentTopic = new TopicId(config.ParentTopicType, this.Id.Key);
this.outputTopic = new TopicId(config.OutputTopicType, this.Id.Key);

this.agent = config.ChatAgent;
}

public List<ChatMessage> MessageBuffer { get; private set; } = new();

public ValueTask HandleAsync(GroupChatStart item, MessageContext messageContext)
{
if (item.Messages != null)
{
this.MessageBuffer.AddRange(item.Messages);
}

return ValueTask.CompletedTask;
}

public ValueTask HandleAsync(GroupChatAgentResponse item, MessageContext messageContext)
{
this.MessageBuffer.Add(item.AgentResponse.Message);

return ValueTask.CompletedTask;
}

public async ValueTask HandleAsync(GroupChatRequestPublish item, MessageContext messageContext)
{
Response? response = null;

// TODO: Is there a better abstraction here than IAsyncEnumerable? Though the akwardness mainly comes from
// the lack of real type unions in C#, which is why we need to create the StreamingFrame type in the first
// place.
await foreach (ChatStreamFrame frame in this.agent.StreamAsync(this.MessageBuffer, messageContext.CancellationToken))
{
// TODO: call publish message
switch (frame.Type)
{
case ChatStreamFrame.FrameType.Response:
await this.PublishMessageAsync(new GroupChatMessage { Message = frame.Response!.Message }, this.outputTopic);
response = frame.Response;
break;
case ChatStreamFrame.FrameType.InternalMessage:
await this.PublishMessageAsync(new GroupChatMessage { Message = frame.InternalMessage! }, this.outputTopic);
break;
}
}

if (response == null)
{
throw new InvalidOperationException("The agent did not produce a final response. Check the agent's on_messages_stream method.");
}

this.MessageBuffer.Clear();

await this.PublishMessageAsync(new GroupChatAgentResponse { AgentResponse = response }, this.parentTopic);
}

public ValueTask HandleAsync(GroupChatReset item, MessageContext messageContext)
{
this.MessageBuffer.Clear();
return this.agent.ResetAsync(messageContext.CancellationToken);
}
}

222 changes: 210 additions & 12 deletions dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/GroupChatBase.cs
Original file line number Diff line number Diff line change
@@ -1,37 +1,235 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// GroupChatBase.cs

using System.Diagnostics;
using System.Reflection;
using System.Runtime.CompilerServices;
using Microsoft.AutoGen.AgentChat.Abstractions;
using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.Core;

namespace Microsoft.AutoGen.AgentChat.GroupChat;

internal static class AgentsRuntimeExtensions
{
public static async ValueTask<AgentType> RegisterChatAgentAsync(this IAgentRuntime runtime, AgentChatConfig config)
{
AgentType type = config.Name;

AgentType resultType = await runtime.RegisterAgentFactoryAsync(type,
(id, runtime) =>
{
AgentInstantiationContext agentContext = new AgentInstantiationContext(id, runtime);
return ValueTask.FromResult<IHostableAgent>(new ChatAgentRouter(agentContext, config));
});

await runtime.AddSubscriptionAsync(new TypeSubscription(config.ParticipantTopicType, type));
await runtime.AddSubscriptionAsync(new TypeSubscription(config.ParentTopicType, type));

return resultType;
}

public static async ValueTask<AgentType> RegisterGroupChatManagerAsync<TManager>(this IAgentRuntime runtime, GroupChatOptions options, string teamId, Func<GroupChatOptions, TManager> factory)
where TManager : GroupChatManagerBase
{
AgentType type = GroupChatBase<TManager>.GroupChatManagerTopicType;
AgentId expectedId = new AgentId(type, teamId);

AgentType resultType = await runtime.RegisterAgentFactoryAsync(type,
(id, runtime) =>
{
Debug.Assert(expectedId == id, $"Expecting the AgentId {expectedId} to be the teamId {id}");

AgentInstantiationContext agentContext = new AgentInstantiationContext(id, runtime);
TManager gcm = factory(options); // TODO: Should we allow this to be async?

return ValueTask.FromResult<IHostableAgent>(new GroupChatHandlerRouter<TManager>(agentContext, gcm));
});

await runtime.AddSubscriptionAsync(new TypeSubscription(GroupChatBase<TManager>.GroupChatManagerTopicType, resultType));
await runtime.AddSubscriptionAsync(new TypeSubscription(options.GroupChatTopicType, resultType));

return resultType;
}

public static async ValueTask<AgentType> RegisterOutputCollectorAsync(this IAgentRuntime runtime, IOutputCollectionSink sink, string outputTopicType)
{
AgentType type = GroupChatBase<GroupChatManagerBase>.CollectorAgentType;
AgentType resultType = await runtime.RegisterAgentFactoryAsync(type,
(id, runtime) =>
{
AgentInstantiationContext agentContext = new AgentInstantiationContext(id, runtime);
return ValueTask.FromResult<IHostableAgent>(new OutputCollectorAgent(agentContext, sink));
});

await runtime.AddSubscriptionAsync(new TypeSubscription(outputTopicType, type));

return resultType;
}
}

public abstract class GroupChatBase<TManager> : ITeam where TManager : GroupChatManagerBase
{
public GroupChatBase(List<IChatAgent> participants, ITerminationCondition? terminationCondition = null, int? maxTurns = null)
// TODO: Where do these come from?
internal const string GroupTopicType = "group_topic";
internal const string OutputTopicType = "output_topic";
internal const string GroupChatManagerTopicType = "group_chat_manager";
internal const string CollectorAgentType = "collect_output_messages";

private GroupChatOptions GroupChatOptions { get; }

private readonly List<AgentMessage> messageThread = new();
private Dictionary<string, AgentChatConfig> Participants { get; } = new();

protected GroupChatBase(List<IChatAgent> participants, ITerminationCondition? terminationCondition = null, int? maxTurns = null)
{
this.TeamId = Guid.NewGuid();
this.GroupChatOptions = new GroupChatOptions(GroupTopicType, OutputTopicType)
{
TerminationCondition = terminationCondition,
MaxTurns = maxTurns,
};

foreach (var participant in participants)
{
AgentChatConfig config = new AgentChatConfig(participant, GroupTopicType, OutputTopicType);
this.Participants[participant.Name] = config;
this.GroupChatOptions.Participants[participant.Name] = (config.ParticipantTopicType, participant.Description);
}

this.messageThread = new List<AgentMessage>(); // TODO: Allow injecting this

this.TeamId = Guid.NewGuid().ToString().ToLowerInvariant();
}

/// <summary>
/// Gets the team id.
/// </summary>
public Guid TeamId
public string TeamId
{
get;
private set;
}

/// <inheritdoc cref="ITeam.ResetAsync(CancellationToken)"/>/>/>
public virtual TManager CreateChatManager(GroupChatOptions options)
{
try
{
if (Activator.CreateInstance(typeof(TManager), options) is TManager result)
{
return result;
};
}
catch (TargetInvocationException tie)
{
throw new Exception("Could not create chat manager", tie.InnerException);
}
catch (Exception ex)
{
throw new Exception("Could not create chat manager", ex);
}

throw new Exception("Could not create chat manager; make sure that it contains a ctor() or ctor(GroupChatOptions), or override the CreateChatManager method");
}

// TODO: Turn this into an IDisposable-based utility
private int running; // = 0
private bool EnsureSingleRun()
{
return Interlocked.CompareExchange(ref running, 1, 0) == 0;
}

private void EndRun()
{
this.running = 0;
}

public IAsyncEnumerable<TaskFrame> StreamAsync(string task, CancellationToken cancellationToken)
{
if (String.IsNullOrEmpty(task))
{
throw new ArgumentNullException(nameof(task));
}

// TODO: Send this on
TextMessage taskStart = new()
{
Content = task,
Source = "user"
};

return this.StreamAsync(taskStart, cancellationToken);
}

public ValueTask ResetAsync(CancellationToken cancel)
{
throw new NotImplementedException();
return ValueTask.CompletedTask;
}

/// <inheritdoc cref="ITaskRunner.StreamAsync(ChatMessage?, CancellationToken)"/>/>/>
public IAsyncEnumerable<TaskFrame> StreamAsync(ChatMessage? task, CancellationToken cancellationToken = default)
public async IAsyncEnumerable<TaskFrame> StreamAsync(ChatMessage? task, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
task = task ?? throw new ArgumentNullException(nameof(task));
if (task == null)
{
throw new ArgumentNullException(nameof(task));
}

if (!this.EnsureSingleRun())
{
throw new InvalidOperationException("The task is already running.");
}

// TODO: How do we allow the user to configure this?
//AgentsAppBuilder builder = new AgentsAppBuilder().UseInProcessRuntime();
InProcessRuntime runtime = new InProcessRuntime();

foreach (AgentChatConfig config in this.Participants.Values)
{
await runtime.RegisterChatAgentAsync(config);
}

await runtime.RegisterGroupChatManagerAsync(this.GroupChatOptions, this.TeamId, this.CreateChatManager);

OutputSink outputSink = new OutputSink();
await runtime.RegisterOutputCollectorAsync(outputSink, this.GroupChatOptions.OutputTopicType);

await runtime.StartAsync();

Task shutdownTask = Task.CompletedTask;

try
{
// TODO: Protos
GroupChatStart taskMessage = new GroupChatStart
{
Messages = [task]
};

List<AgentMessage> runMessages = new();

AgentId chatManagerId = new AgentId(GroupChatManagerTopicType, this.TeamId);
await runtime.SendMessageAsync(taskMessage, chatManagerId, cancellationToken: cancellationToken);

shutdownTask = Task.Run(runtime.RunUntilIdleAsync);

while (true)
{
OutputSink.SinkFrame frame = await outputSink.WaitForDataAsync(cancellationToken);
runMessages.AddRange(frame.Messages);

foreach (AgentMessage message in frame.Messages)
{
yield return new TaskFrame(message);
}

if (frame.IsTerminal)
{
TaskResult result = new TaskResult(runMessages);
yield return new TaskFrame(result);
break;
}
}
}
finally
{
this.EndRun();

return this.StreamAsync(task, cancellationToken);
await shutdownTask;
}
}
}
Loading

0 comments on commit 2406012

Please sign in to comment.