diff --git a/dotnet/src/Microsoft.AutoGen/AgentChat/Abstractions/ChatAgent.cs b/dotnet/src/Microsoft.AutoGen/AgentChat/Abstractions/ChatAgent.cs index 9fd5d5c1e8b0..6cad4f88dfe2 100644 --- a/dotnet/src/Microsoft.AutoGen/AgentChat/Abstractions/ChatAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/AgentChat/Abstractions/ChatAgent.cs @@ -90,13 +90,13 @@ public struct AgentName private static readonly Regex AgentNameRegex = new Regex($"^{IdStartClass}{IdContinueClass}*$", RegexOptions.Compiled | RegexOptions.Singleline); - public string Name { get; } + public string Value { get; } public AgentName(string name) { AgentName.CheckValid(name); - this.Name = name; + this.Value = name; } public static bool IsValid(string name) => AgentNameRegex.IsMatch(name); @@ -110,7 +110,7 @@ public static void CheckValid(string name) } // Implicit cast to string - public static implicit operator string(AgentName agentName) => agentName.Name; + public static implicit operator string(AgentName agentName) => agentName.Value; } /// diff --git a/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/ChatAgentRouter.cs b/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/ChatAgentRouter.cs new file mode 100644 index 000000000000..a195cd4d6743 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/ChatAgentRouter.cs @@ -0,0 +1,98 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ChatAgentRouter.cs + +using Microsoft.AutoGen.AgentChat.Abstractions; +using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Core; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AutoGen.AgentChat.GroupChat; + +public struct AgentChatConfig(IChatAgent chatAgent, string parentTopicType, string outputTopicType) +{ + public string ParticipantTopicType => this.Name; + public string ParentTopicType { get; } = parentTopicType; + public string OutputTopicType { get; } = outputTopicType; + + public IChatAgent ChatAgent { get; } = chatAgent; + + public string Name => this.ChatAgent.Name; + public string Description => this.ChatAgent.Description; +} + +internal sealed class ChatAgentRouter : HostableAgentAdapter, + IHandle, + IHandle, + IHandle, + IHandle +{ + private readonly TopicId parentTopic; + private readonly TopicId outputTopic; + private readonly IChatAgent agent; + + public ChatAgentRouter(AgentInstantiationContext agentCtx, AgentChatConfig config, ILogger? logger = null) : base(agentCtx, config.Description, logger) + { + this.parentTopic = new TopicId(config.ParentTopicType, this.Id.Key); + this.outputTopic = new TopicId(config.OutputTopicType, this.Id.Key); + + this.agent = config.ChatAgent; + } + + public List MessageBuffer { get; private set; } = new(); + + public ValueTask HandleAsync(GroupChatStart item, MessageContext messageContext) + { + if (item.Messages != null) + { + this.MessageBuffer.AddRange(item.Messages); + } + + return ValueTask.CompletedTask; + } + + public ValueTask HandleAsync(GroupChatAgentResponse item, MessageContext messageContext) + { + this.MessageBuffer.Add(item.AgentResponse.Message); + + return ValueTask.CompletedTask; + } + + public async ValueTask HandleAsync(GroupChatRequestPublish item, MessageContext messageContext) + { + Response? response = null; + + // TODO: Is there a better abstraction here than IAsyncEnumerable? Though the akwardness mainly comes from + // the lack of real type unions in C#, which is why we need to create the StreamingFrame type in the first + // place. + await foreach (ChatStreamFrame frame in this.agent.StreamAsync(this.MessageBuffer, messageContext.CancellationToken)) + { + // TODO: call publish message + switch (frame.Type) + { + case ChatStreamFrame.FrameType.Response: + await this.PublishMessageAsync(new GroupChatMessage { Message = frame.Response!.Message }, this.outputTopic); + response = frame.Response; + break; + case ChatStreamFrame.FrameType.InternalMessage: + await this.PublishMessageAsync(new GroupChatMessage { Message = frame.InternalMessage! }, this.outputTopic); + break; + } + } + + if (response == null) + { + throw new InvalidOperationException("The agent did not produce a final response. Check the agent's on_messages_stream method."); + } + + this.MessageBuffer.Clear(); + + await this.PublishMessageAsync(new GroupChatAgentResponse { AgentResponse = response }, this.parentTopic); + } + + public ValueTask HandleAsync(GroupChatReset item, MessageContext messageContext) + { + this.MessageBuffer.Clear(); + return this.agent.ResetAsync(messageContext.CancellationToken); + } +} + diff --git a/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/GroupChatBase.cs b/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/GroupChatBase.cs index 9a1fd536703c..bfc2faa1b422 100644 --- a/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/GroupChatBase.cs +++ b/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/GroupChatBase.cs @@ -1,37 +1,235 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // GroupChatBase.cs +using System.Diagnostics; +using System.Reflection; +using System.Runtime.CompilerServices; using Microsoft.AutoGen.AgentChat.Abstractions; +using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Core; namespace Microsoft.AutoGen.AgentChat.GroupChat; +internal static class AgentsRuntimeExtensions +{ + public static async ValueTask RegisterChatAgentAsync(this IAgentRuntime runtime, AgentChatConfig config) + { + AgentType type = config.Name; + + AgentType resultType = await runtime.RegisterAgentFactoryAsync(type, + (id, runtime) => + { + AgentInstantiationContext agentContext = new AgentInstantiationContext(id, runtime); + return ValueTask.FromResult(new ChatAgentRouter(agentContext, config)); + }); + + await runtime.AddSubscriptionAsync(new TypeSubscription(config.ParticipantTopicType, type)); + await runtime.AddSubscriptionAsync(new TypeSubscription(config.ParentTopicType, type)); + + return resultType; + } + + public static async ValueTask RegisterGroupChatManagerAsync(this IAgentRuntime runtime, GroupChatOptions options, string teamId, Func factory) + where TManager : GroupChatManagerBase + { + AgentType type = GroupChatBase.GroupChatManagerTopicType; + AgentId expectedId = new AgentId(type, teamId); + + AgentType resultType = await runtime.RegisterAgentFactoryAsync(type, + (id, runtime) => + { + Debug.Assert(expectedId == id, $"Expecting the AgentId {expectedId} to be the teamId {id}"); + + AgentInstantiationContext agentContext = new AgentInstantiationContext(id, runtime); + TManager gcm = factory(options); // TODO: Should we allow this to be async? + + return ValueTask.FromResult(new GroupChatHandlerRouter(agentContext, gcm)); + }); + + await runtime.AddSubscriptionAsync(new TypeSubscription(GroupChatBase.GroupChatManagerTopicType, resultType)); + await runtime.AddSubscriptionAsync(new TypeSubscription(options.GroupChatTopicType, resultType)); + + return resultType; + } + + public static async ValueTask RegisterOutputCollectorAsync(this IAgentRuntime runtime, IOutputCollectionSink sink, string outputTopicType) + { + AgentType type = GroupChatBase.CollectorAgentType; + AgentType resultType = await runtime.RegisterAgentFactoryAsync(type, + (id, runtime) => + { + AgentInstantiationContext agentContext = new AgentInstantiationContext(id, runtime); + return ValueTask.FromResult(new OutputCollectorAgent(agentContext, sink)); + }); + + await runtime.AddSubscriptionAsync(new TypeSubscription(outputTopicType, type)); + + return resultType; + } +} + public abstract class GroupChatBase : ITeam where TManager : GroupChatManagerBase { - public GroupChatBase(List participants, ITerminationCondition? terminationCondition = null, int? maxTurns = null) + // TODO: Where do these come from? + internal const string GroupTopicType = "group_topic"; + internal const string OutputTopicType = "output_topic"; + internal const string GroupChatManagerTopicType = "group_chat_manager"; + internal const string CollectorAgentType = "collect_output_messages"; + + private GroupChatOptions GroupChatOptions { get; } + + private readonly List messageThread = new(); + private Dictionary Participants { get; } = new(); + + protected GroupChatBase(List participants, ITerminationCondition? terminationCondition = null, int? maxTurns = null) { - this.TeamId = Guid.NewGuid(); + this.GroupChatOptions = new GroupChatOptions(GroupTopicType, OutputTopicType) + { + TerminationCondition = terminationCondition, + MaxTurns = maxTurns, + }; + + foreach (var participant in participants) + { + AgentChatConfig config = new AgentChatConfig(participant, GroupTopicType, OutputTopicType); + this.Participants[participant.Name] = config; + this.GroupChatOptions.Participants[participant.Name] = (config.ParticipantTopicType, participant.Description); + } + + this.messageThread = new List(); // TODO: Allow injecting this + + this.TeamId = Guid.NewGuid().ToString().ToLowerInvariant(); } - /// - /// Gets the team id. - /// - public Guid TeamId + public string TeamId { get; private set; } - /// />/> + public virtual TManager CreateChatManager(GroupChatOptions options) + { + try + { + if (Activator.CreateInstance(typeof(TManager), options) is TManager result) + { + return result; + }; + } + catch (TargetInvocationException tie) + { + throw new Exception("Could not create chat manager", tie.InnerException); + } + catch (Exception ex) + { + throw new Exception("Could not create chat manager", ex); + } + + throw new Exception("Could not create chat manager; make sure that it contains a ctor() or ctor(GroupChatOptions), or override the CreateChatManager method"); + } + + // TODO: Turn this into an IDisposable-based utility + private int running; // = 0 + private bool EnsureSingleRun() + { + return Interlocked.CompareExchange(ref running, 1, 0) == 0; + } + + private void EndRun() + { + this.running = 0; + } + + public IAsyncEnumerable StreamAsync(string task, CancellationToken cancellationToken) + { + if (String.IsNullOrEmpty(task)) + { + throw new ArgumentNullException(nameof(task)); + } + + // TODO: Send this on + TextMessage taskStart = new() + { + Content = task, + Source = "user" + }; + + return this.StreamAsync(taskStart, cancellationToken); + } + public ValueTask ResetAsync(CancellationToken cancel) { - throw new NotImplementedException(); + return ValueTask.CompletedTask; } - /// />/> - public IAsyncEnumerable StreamAsync(ChatMessage? task, CancellationToken cancellationToken = default) + public async IAsyncEnumerable StreamAsync(ChatMessage? task, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - task = task ?? throw new ArgumentNullException(nameof(task)); + if (task == null) + { + throw new ArgumentNullException(nameof(task)); + } + + if (!this.EnsureSingleRun()) + { + throw new InvalidOperationException("The task is already running."); + } + + // TODO: How do we allow the user to configure this? + //AgentsAppBuilder builder = new AgentsAppBuilder().UseInProcessRuntime(); + InProcessRuntime runtime = new InProcessRuntime(); + + foreach (AgentChatConfig config in this.Participants.Values) + { + await runtime.RegisterChatAgentAsync(config); + } + + await runtime.RegisterGroupChatManagerAsync(this.GroupChatOptions, this.TeamId, this.CreateChatManager); + + OutputSink outputSink = new OutputSink(); + await runtime.RegisterOutputCollectorAsync(outputSink, this.GroupChatOptions.OutputTopicType); + + await runtime.StartAsync(); + + Task shutdownTask = Task.CompletedTask; + + try + { + // TODO: Protos + GroupChatStart taskMessage = new GroupChatStart + { + Messages = [task] + }; + + List runMessages = new(); + + AgentId chatManagerId = new AgentId(GroupChatManagerTopicType, this.TeamId); + await runtime.SendMessageAsync(taskMessage, chatManagerId, cancellationToken: cancellationToken); + + shutdownTask = Task.Run(runtime.RunUntilIdleAsync); + + while (true) + { + OutputSink.SinkFrame frame = await outputSink.WaitForDataAsync(cancellationToken); + runMessages.AddRange(frame.Messages); + + foreach (AgentMessage message in frame.Messages) + { + yield return new TaskFrame(message); + } + + if (frame.IsTerminal) + { + TaskResult result = new TaskResult(runMessages); + yield return new TaskFrame(result); + break; + } + } + } + finally + { + this.EndRun(); - return this.StreamAsync(task, cancellationToken); + await shutdownTask; + } } } diff --git a/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/GroupChatHandlerRouter.cs b/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/GroupChatHandlerRouter.cs new file mode 100644 index 000000000000..9855dcafb7ea --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/GroupChatHandlerRouter.cs @@ -0,0 +1,48 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// GroupChatHandlerRouter.cs + +using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Core; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AutoGen.AgentChat.GroupChat; + +internal delegate ValueTask MessagePublishServicer(GroupChatEventBase event_, string topicType, CancellationToken cancellation = default); + +internal interface IGroupChatHandler : IHandle, IHandle, IHandle +{ + void AttachMessagePublishServicer(MessagePublishServicer? servicer = null); + void DetachMessagePublishServicer() => this.AttachMessagePublishServicer(null); +} + +internal sealed class GroupChatHandlerRouter : HostableAgentAdapter, + IHandle, + IHandle, + IHandle + + where TManager : GroupChatManagerBase, IGroupChatHandler +{ + public const string DefaultDescription = "Group chat manager"; + + private TManager ChatManager { get; } + + public GroupChatHandlerRouter(AgentInstantiationContext agentCtx, TManager chatManager, ILogger? logger = null) : base(agentCtx, DefaultDescription, logger) + { + this.ChatManager = chatManager; + this.ChatManager.AttachMessagePublishServicer(PublishMessageServicer); + } + + private ValueTask PublishMessageServicer(GroupChatEventBase event_, string topicType, CancellationToken cancellation = default) + { + return this.PublishMessageAsync(event_, new TopicId(topicType, this.Id.Key), cancellationToken: cancellation); + } + + public ValueTask HandleAsync(GroupChatStart item, MessageContext messageContext) + => this.ChatManager.HandleAsync(item, messageContext); + + public ValueTask HandleAsync(GroupChatAgentResponse item, MessageContext messageContext) + => this.ChatManager.HandleAsync(item, messageContext); + + public ValueTask HandleAsync(object item, MessageContext messageContext) + => this.ChatManager.HandleAsync(item, messageContext); +} diff --git a/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/GroupChatManagerBase.cs b/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/GroupChatManagerBase.cs index 46165931f2d9..ecd1130fe516 100644 --- a/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/GroupChatManagerBase.cs +++ b/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/GroupChatManagerBase.cs @@ -2,24 +2,54 @@ // GroupChatManagerBase.cs using Microsoft.AutoGen.AgentChat.Abstractions; +using Microsoft.AutoGen.Contracts; namespace Microsoft.AutoGen.AgentChat.GroupChat; -public class GroupChatOptions +public abstract class GroupChatManagerBase : IGroupChatHandler { - public ITerminationCondition? TerminationCondition { get; set; } -} + private GroupChatOptions options; -public abstract class GroupChatManagerBase : IHandleChat, - IHandleChat, - IHandleDefault -{ - public string Description => "Group chat manager"; + // TODO: We may be able to abstract this out at the Core level + private MessagePublishServicer? PublishServicer { get; set; } - // It is kind of annoying that all child classes need to be aware of all of these objects to go up the stack - // TODO: Should we replace all of these with IServiceCollection? - public GroupChatManagerBase() + // It would be so awesome if we could avoid passing GroupChatOptions to the constructor + // and use something like Python's context manager mechanism to pick up the options from + // the logical stack. But that's very difficult in C#, because the user code could do all + // sorts of weird things like shunt the execution to a different thread. We cannot even + // assume that we are in an async context (much less that we are in the same async context) + public GroupChatManagerBase(GroupChatOptions options) : base() { + this.options = options; + + this.MessageThread = new List(); + } + + protected string GroupChatTopicType => this.options.GroupChatTopicType; + protected string OutputTopicType => this.options.OutputTopicType; + + protected Dictionary Participants => this.options.Participants; + + protected ITerminationCondition? TerminationCondition => this.options.TerminationCondition; + protected int? MaxTurns => this.options.MaxTurns; + + private int CurrentTurn { get; set; } + + protected List MessageThread; + + void IGroupChatHandler.AttachMessagePublishServicer(MessagePublishServicer? servicer) + { + this.PublishServicer = servicer; + } + + private ValueTask PublishMessageAsync(GroupChatEventBase message, string topicType, CancellationToken cancellation = default) + { + return this.PublishServicer?.Invoke(message, topicType, cancellation) ?? ValueTask.CompletedTask; + } + + protected ValueTask PublishMessageAsync(ChatMessage message, string topicType, CancellationToken cancellation = default) + { + return this.PublishMessageAsync(new GroupChatMessage { Message = message }, topicType, cancellation); } protected virtual async ValueTask ValidateGroupState(List? messages) @@ -28,17 +58,129 @@ protected virtual async ValueTask ValidateGroupState(List? messages public abstract ValueTask SelectSpeakerAsync(List thread); - public ValueTask HandleAsync(GroupChatStart wireItem, CancellationToken cancellationToken) + public async ValueTask HandleAsync(GroupChatStart item, MessageContext messageContext) { - return ValueTask.CompletedTask; + if (this.TerminationCondition != null && this.TerminationCondition.IsTerminated) + { + // skipReset is used here to match the Python code + await this.TerminateAsync("The chat has already terminated", skipReset: true); + + StopMessage earlyStop = new StopMessage + { + Content = "The chat has already terminated", + Source = GroupChatBase.GroupChatManagerTopicType + }; + + await this.PublishMessageAsync(new GroupChatTermination { Message = earlyStop }, this.OutputTopicType); + + return; + } + + if (item.Messages != null) + { + this.MessageThread.AddRange(item.Messages); + } + + await this.ValidateGroupState(item.Messages); + + if (item.Messages != null) + { + await this.PublishMessageAsync(item, this.OutputTopicType); + await this.PublishMessageAsync(item, this.GroupChatTopicType); + + // item.Messages is IList but we need IList + // Unfortunately, IList does not support type variance, so we have to do this rather ugly thing + // TODO: Check if we really need to have AgentMessage on the interface of ITerminationCondition + List converted = [.. item.Messages.Cast()]; + + if (await this.TerminateIfNeededAsync(converted)) + { + return; + } + } + + await this.ProcessNextSpeakerAsync(); + } + + public async ValueTask HandleAsync(GroupChatAgentResponse item, MessageContext messageContext) + { + List delta = new List(); + + if (item.AgentResponse.InnerMessages != null) + { + this.MessageThread.AddRange(item.AgentResponse.InnerMessages); + delta.AddRange(item.AgentResponse.InnerMessages); + } + + this.MessageThread.Add(item.AgentResponse.Message); + delta.Add(item.AgentResponse.Message); + + if (await this.TerminateIfNeededAsync(delta)) + { + return; + } + + this.CurrentTurn++; + if (this.MaxTurns.HasValue && this.MaxTurns.Value <= this.CurrentTurn) + { + await this.TerminateAsync($"Maximum number of turns ({this.MaxTurns.Value}) reached."); + return; + } + + await this.ProcessNextSpeakerAsync(); + } + + private ValueTask TerminateAsync(string message, bool skipReset = false) + { + StopMessage stopMessage = new StopMessage + { + Content = message, + Source = GroupChatBase.GroupChatManagerTopicType + }; + + return this.TerminateAsync(stopMessage, skipReset); + } + + private async ValueTask TerminateAsync(StopMessage stopMessage, bool skipReset = false) + { + await this.PublishMessageAsync(new GroupChatTermination { Message = stopMessage }, this.OutputTopicType); + + if (!skipReset) + { + this.TerminationCondition?.Reset(); + this.CurrentTurn = 0; + } + } + + private async ValueTask TerminateIfNeededAsync(params IList incomingMessages) + { + if (this.TerminationCondition == null) + { + return false; + } + + StopMessage? stopMessage = await this.TerminationCondition.CheckAndUpdateAsync(incomingMessages); + if (stopMessage != null) + { + await this.TerminateAsync(stopMessage); + + return true; + } + + return false; } - public ValueTask HandleAsync(GroupChatAgentResponse wireItem, CancellationToken cancellationToken) + // TODO: Figure out how to route this to the right method + //private ValueTask ProcessNextSpeakerAsync(params IList incomingMessages) + // => this.ProcessNextSpeakerAsync(incomingMessages); + + private async ValueTask ProcessNextSpeakerAsync() { - return ValueTask.CompletedTask; + string nextSpeakerTopic = await this.SelectSpeakerAsync(this.MessageThread); + await this.PublishMessageAsync(new GroupChatRequestPublish { }, nextSpeakerTopic); } - public ValueTask HandleAsync(object item, CancellationToken cancellationToken) + public ValueTask HandleAsync(object item, MessageContext messageContext) { throw new InvalidOperationException($"Unhandled message in group chat manager: {item.GetType()}"); } diff --git a/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/GroupChatOptions.cs b/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/GroupChatOptions.cs new file mode 100644 index 000000000000..58ba1b428131 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/GroupChatOptions.cs @@ -0,0 +1,17 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// GroupChatOptions.cs + +using Microsoft.AutoGen.AgentChat.Abstractions; + +namespace Microsoft.AutoGen.AgentChat.GroupChat; + +public class GroupChatOptions(string groupTopicType, string outputTopicType) +{ + public string GroupChatTopicType { get; } = groupTopicType; + public string OutputTopicType { get; } = outputTopicType; + + public ITerminationCondition? TerminationCondition { get; set; } + public int? MaxTurns { get; set; } + + public Dictionary Participants { get; } = new Dictionary(); +} diff --git a/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/HostableAgentAdapter.cs b/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/HostableAgentAdapter.cs new file mode 100644 index 000000000000..86f20607cc9d --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/HostableAgentAdapter.cs @@ -0,0 +1,26 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// HostableAgentAdapter.cs + +using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Core; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AutoGen.AgentChat.GroupChat; + +public class AgentInstantiationContext(AgentId id, IAgentRuntime runtime) +{ + public AgentId Id { get; } = id; + public IAgentRuntime Runtime { get; } = runtime; +} + +internal class HostableAgentAdapter : BaseAgent +{ + public HostableAgentAdapter(AgentId id, IAgentRuntime runtime, string description, ILogger? logger = null) : base(id, runtime, description, logger) + { + } + + public HostableAgentAdapter(AgentInstantiationContext agentCtx, string description, ILogger? logger = null) : base(agentCtx.Id, agentCtx.Runtime, description, logger) + { + } +} + diff --git a/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/OutputCollectorAgent.cs b/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/OutputCollectorAgent.cs new file mode 100644 index 000000000000..d17fa266b6aa --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/OutputCollectorAgent.cs @@ -0,0 +1,133 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// OutputCollectorAgent.cs + +using System.Diagnostics; +using Microsoft.AutoGen.AgentChat.GroupChat; +using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Core; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AutoGen.AgentChat.Abstractions; + +internal interface IOutputCollectionSink +{ + void CollectMessage(AgentMessage message); + void Terminate(StopMessage message); +} + +internal sealed class OutputSink : IOutputCollectionSink +{ + public sealed class SinkFrame + { + public StopMessage? Termination { get; set; } + public List Messages { get; } = new(); + + public bool IsTerminal => this.Termination != null; + } + + private readonly object sync = new(); + private SemaphoreSlim semapohre = new SemaphoreSlim(1, 1); + + private SinkFrame? receivingSinkFrame; + + private void RunSynchronized(Action frameAction) + { + // Make sure we do not overlap with Terminate + lock (this.sync) + { + if (this.receivingSinkFrame == null) + { + this.receivingSinkFrame = new SinkFrame(); + } + + frameAction(this.receivingSinkFrame); + } + + semapohre.Release(); + } + + public void CollectMessage(AgentMessage message) + { + this.RunSynchronized( + frame => + { + frame.Messages.Add(message); + }); + } + + public void Terminate(StopMessage message) + { + this.RunSynchronized( + frame => + { + frame.Termination = message; + }); + } + + public async Task WaitForDataAsync(CancellationToken cancellation) + { + while (true) + { + SinkFrame? lastFrame; + lock (this.sync) + { + lastFrame = Interlocked.Exchange(ref this.receivingSinkFrame, null); + + if (lastFrame != null) + { + return lastFrame; + } + } + + await this.semapohre.WaitAsync(cancellation); + } + } +} + +// TODO: Abstract the core logic of this out into the equivalent of ClosureAgent, because that seems like a +// useful facility to have in Core +internal sealed class OutputCollectorAgent : BaseAgent, + IHandle, + IHandle, + IHandle +{ + private IOutputCollectionSink Sink { get; } + + public OutputCollectorAgent(AgentInstantiationContext ctx, IOutputCollectionSink sink, ILogger? logger = null) : base(ctx.Id, ctx.Runtime, string.Empty, logger) + { + this.Sink = sink; + } + + private void ForwardMessageInternal(ChatMessage message, CancellationToken cancel = default) + { + if (!cancel.IsCancellationRequested) + { + this.Sink.CollectMessage(message); + } + } + + public ValueTask HandleAsync(GroupChatStart item, MessageContext context) + { + item.Messages?.ForEach(item => this.ForwardMessageInternal(item, context.CancellationToken)); + + return ValueTask.CompletedTask; + } + + public ValueTask HandleAsync(GroupChatMessage item, MessageContext context) + { + Debug.Assert(item.Message is ChatMessage, "We should never receive internal messages into the output queue?"); + if (item.Message is ChatMessage chatMessage) + { + this.ForwardMessageInternal(chatMessage, context.CancellationToken); + } + + return ValueTask.CompletedTask; + } + + public ValueTask HandleAsync(GroupChatTermination item, MessageContext context) + { + this.Sink.Terminate(item.Message); + + return ValueTask.CompletedTask; + } +} diff --git a/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/RoundRobinGroupChat.cs b/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/RoundRobinGroupChat.cs new file mode 100644 index 000000000000..daa715063096 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/AgentChat/GroupChat/RoundRobinGroupChat.cs @@ -0,0 +1,57 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// RoundRobinGroupChat.cs + +using Microsoft.AutoGen.AgentChat.Abstractions; + +namespace Microsoft.AutoGen.AgentChat.GroupChat; + +/// +/// A group chat manager that selects the next speaker in a round-robin fashion. +/// +public class RoundRobinGroupChatManager : GroupChatManagerBase +{ + private readonly List participantTopicTypes; + private int nextSpeakerIndex; + + public RoundRobinGroupChatManager(GroupChatOptions options) : base(options) + { + this.participantTopicTypes = [.. from candidateTopic in options.Participants.Values + select candidateTopic.TopicType]; + this.nextSpeakerIndex = 0; + } + + public override ValueTask SelectSpeakerAsync(List thread) + { + string result = this.participantTopicTypes[this.nextSpeakerIndex].ToString(); + + this.nextSpeakerIndex = (this.nextSpeakerIndex + 1) % this.participantTopicTypes.Count; + + return ValueTask.FromResult(result); + } +} + +/// +/// A team that runs a group chat with a participants taking turns in a round-robin fashion to publish +/// a message to all. +/// +/// If a single participant is in the team, the participant will be the only speaker. +/// +public class RoundRobinGroupChat : GroupChatBase +{ + /// + /// Initializes a new round-robin group chat. + /// + /// The participants in the group chat. + /// + /// The termination condition for the group chat. Defaults to null. Without a termination + /// condition, the group chat will run indefinitely. + /// + /// + /// The maximum number of turns for the group chat. Defaults to null, meaning no limit. + /// Note that the gets first priority for checking the termination + /// if both are provided. + /// + public RoundRobinGroupChat(List participants, ITerminationCondition? terminationCondition = null, int? maxTurns = null) : base(participants, terminationCondition, maxTurns) + { + } +} diff --git a/dotnet/src/Microsoft.AutoGen/AgentChat/Terminations/StopMessageTermination.cs b/dotnet/src/Microsoft.AutoGen/AgentChat/Terminations/StopMessageTermination.cs new file mode 100644 index 000000000000..194fe1afe289 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/AgentChat/Terminations/StopMessageTermination.cs @@ -0,0 +1,40 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// StopMessageTermination.cs + +using Microsoft.AutoGen.AgentChat.Abstractions; + +namespace Microsoft.AutoGen.AgentChat.Terminations; + +/// +/// Terminate a conversation if a is received. +/// +public class StopMessageTermination : ITerminationCondition +{ + public bool IsTerminated { get; private set; } + + public ValueTask CheckAndUpdateAsync(IList messages) + { + if (this.IsTerminated) + { + throw new TerminatedException(); + } + + foreach (AgentMessage item in messages) + { + if (item is StopMessage) + { + this.IsTerminated = true; + + StopMessage result = new() { Content = "Stop message received", Source = nameof(StopMessageTermination) }; + return ValueTask.FromResult(result); + } + } + + return ValueTask.FromResult(null); + } + + public void Reset() + { + this.IsTerminated = false; + } +} diff --git a/dotnet/src/Microsoft.AutoGen/Contracts/MessageContext.cs b/dotnet/src/Microsoft.AutoGen/Contracts/MessageContext.cs index e99f0c42d941..58b580fee506 100644 --- a/dotnet/src/Microsoft.AutoGen/Contracts/MessageContext.cs +++ b/dotnet/src/Microsoft.AutoGen/Contracts/MessageContext.cs @@ -9,6 +9,9 @@ namespace Microsoft.AutoGen.Contracts; /// public class MessageContext(string messageId, CancellationToken cancellationToken) { + public MessageContext(CancellationToken cancellation) : this(Guid.NewGuid().ToString(), cancellation) + { } + /// /// Gets or sets the unique identifier for this message. /// diff --git a/dotnet/test/Microsoft.AutoGen.AgentChat.Tests/AgentChatSmokeTest.cs b/dotnet/test/Microsoft.AutoGen.AgentChat.Tests/AgentChatSmokeTest.cs new file mode 100644 index 000000000000..018f80cae38a --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.AgentChat.Tests/AgentChatSmokeTest.cs @@ -0,0 +1,106 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// AgentChatSmokeTest.cs + +using Microsoft.AutoGen.AgentChat.Abstractions; +using Microsoft.AutoGen.AgentChat.Agents; +using Microsoft.AutoGen.AgentChat.GroupChat; +using Microsoft.AutoGen.AgentChat.Terminations; +using Xunit; + +namespace Microsoft.AutoGen.AgentChat.Tests; + +public class AgentChatSmokeTest +{ + public class SpeakMessageAgent : ChatAgentBase + { + public SpeakMessageAgent(string name, string description, string content) : base(name, description) + { + this.Content = content; + } + + public string Content { get; private set; } + + public override IEnumerable ProducedMessageTypes => [typeof(HandoffMessage)]; + + public override ValueTask HandleAsync(IEnumerable item, CancellationToken cancellationToken) + { + Response result = new() + { + Message = new TextMessage { Content = this.Content, Source = this.Name } + }; + + return ValueTask.FromResult(result); + } + + public override ValueTask ResetAsync(CancellationToken cancellationToken) + { + return ValueTask.CompletedTask; + } + } + + public class TerminatingAgent : ChatAgentBase + { + public List? IncomingMessages { get; private set; } + + public TerminatingAgent(string name, string description) : base(name, description) + { + } + + public override IEnumerable ProducedMessageTypes => [typeof(StopMessage)]; + + public override ValueTask HandleAsync(IEnumerable item, CancellationToken cancellationToken) + { + this.IncomingMessages = item.ToList(); + + string content = "Terminating"; + if (item.Any()) + { + ChatMessage lastMessage = item.Last(); + + switch (lastMessage) + { + case TextMessage textMessage: + content = $"Terminating; got: {textMessage.Content}"; + break; + case HandoffMessage handoffMessage: + content = $"Terminating; got handoff: {handoffMessage.Context}"; + break; + } + } + + Response result = new() + { + Message = new StopMessage { Content = content, Source = this.Name } + }; + + return ValueTask.FromResult(result); + } + + public override ValueTask ResetAsync(CancellationToken cancellationToken) + { + this.IncomingMessages = null; + + return ValueTask.CompletedTask; + } + } + + [Fact] + public async Task Test_RoundRobin_SpeakAndTerminating() + { + TerminatingAgent terminatingAgent = new("Terminate", "Terminate"); + + ITeam chat = new RoundRobinGroupChat( + [ + new SpeakMessageAgent("Speak", "Speak", "Hello"), + terminatingAgent + ], + terminationCondition: new StopMessageTermination()); + + TaskResult result = await chat.RunAsync(""); + + Assert.Equal(3, result.Messages.Count); + Assert.Equal("", Assert.IsType(result.Messages[0]).Content); + Assert.Equal("Hello", Assert.IsType(result.Messages[1]).Content); + Assert.Equal("Terminating; got: Hello", Assert.IsType(result.Messages[2]).Content); + } +}