From ff9ed4d9c511ba5816b1e015823a80f6147c931a Mon Sep 17 00:00:00 2001 From: Jacob Alber Date: Wed, 12 Feb 2025 10:49:45 -0500 Subject: [PATCH 1/2] fix: gRPC Agent Runtime Serialization Registration We were registering the serializers when we already had a concrete type on the way into Publish or Send on the .NET side. However, in a xLang scenario, messages could originate from e.g. Python before being sent/published from .NET, resulting in no serializer being found. This change adds a second-change registration and lookup when the agent is instantiated based on the IHandle implementations. --- .../Core.Grpc/AgentExtensions.cs | 38 +++++++++++++++++++ .../Core.Grpc/GrpcAgentRuntime.cs | 20 ++++++++-- 2 files changed, 54 insertions(+), 4 deletions(-) create mode 100644 dotnet/src/Microsoft.AutoGen/Core.Grpc/AgentExtensions.cs diff --git a/dotnet/src/Microsoft.AutoGen/Core.Grpc/AgentExtensions.cs b/dotnet/src/Microsoft.AutoGen/Core.Grpc/AgentExtensions.cs new file mode 100644 index 000000000000..816f2625c5f3 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Core.Grpc/AgentExtensions.cs @@ -0,0 +1,38 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// AgentExtensions.cs + +using System.Reflection; +using Google.Protobuf; +using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Core.Grpc; + +namespace Microsoft.AutoGen.Core; + +internal static partial class AgentExtensions +{ + private static readonly Type ProtobufIMessage = typeof(IMessage<>); + private static bool IsProtobufType(this Type type) + { + // TODO: Support the non-generic IMessage as well + Type specializedIMessageType = ProtobufIMessage.MakeGenericType(type); + + // type T needs to derive from IMessage + return specializedIMessageType.IsAssignableFrom(type); + } + + public static void RegisterHandledMessageTypes(this IHostableAgent agent, IProtoSerializationRegistry registry) + { + Type agentRuntimeType = agent.GetType(); + + MethodInfo[] messageHandlers = agentRuntimeType.GetHandlers(); + + foreach (MethodInfo handler in messageHandlers) + { + Type messageType = handler.GetParameters().First().ParameterType; + if (messageType.IsProtobufType() && registry.GetSerializer(messageType) == null) + { + registry.RegisterSerializer(messageType); + } + } + } +} diff --git a/dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs b/dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs index 83c77c5ef770..c3fce75b75fd 100644 --- a/dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs +++ b/dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs @@ -11,9 +11,10 @@ namespace Microsoft.AutoGen.Core.Grpc; -internal sealed class AgentsContainer(IAgentRuntime hostingRuntime) +internal sealed class AgentsContainer(IAgentRuntime hostingRuntime, IProtoSerializationRegistry serializationRegistry) { private readonly IAgentRuntime hostingRuntime = hostingRuntime; + private readonly IProtoSerializationRegistry serializationRegistry = serializationRegistry; private Dictionary agentInstances = new(); public Dictionary Subscriptions = new(); @@ -29,6 +30,10 @@ public async ValueTask EnsureAgentAsync(Contracts.AgentId agentI } agent = await factoryFunc(agentId, this.hostingRuntime); + + // Just-in-Time register the message types so we can deserialize them + agent.RegisterHandledMessageTypes(this.serializationRegistry); + this.agentInstances.Add(agentId, agent); } @@ -92,7 +97,7 @@ public GrpcAgentRuntime(AgentRpc.AgentRpcClient client, this._shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(hostApplicationLifetime.ApplicationStopping); this._messageRouter = new GrpcMessageRouter(client, this, _clientId, logger, this._shutdownCts.Token); - this._agentsContainer = new AgentsContainer(this); + this._agentsContainer = new AgentsContainer(this, this.SerializationRegistry); this.ServiceProvider = serviceProvider; } @@ -231,8 +236,6 @@ private async ValueTask HandlePublish(CloudEvent evt, CancellationToken cancella var messageId = evt.Id; var typeName = evt.Attributes[Constants.DATA_SCHEMA_ATTR].CeString; - var serializer = SerializationRegistry.GetSerializer(typeName) ?? throw new Exception(); - var message = serializer.Deserialize(evt.ProtoData); var messageContext = new MessageContext(messageId, cancellationToken) { @@ -241,6 +244,10 @@ private async ValueTask HandlePublish(CloudEvent evt, CancellationToken cancella IsRpc = false }; + // We may not have a Serializer registered yet, if this is the first time we are instantiating the agent + IProtobufMessageSerializer? serializer = SerializationRegistry.GetSerializer(typeName); + object? message = serializer?.Deserialize(evt.ProtoData); + // Iterate over subscriptions values to find receiving agents foreach (var subscription in this._agentsContainer.Subscriptions.Values) { @@ -248,6 +255,11 @@ private async ValueTask HandlePublish(CloudEvent evt, CancellationToken cancella { var recipient = subscription.MapToAgent(topic); var agent = await this._agentsContainer.EnsureAgentAsync(recipient); + + // give the serializer a second chance to have been registered + serializer ??= SerializationRegistry.GetSerializer(typeName) ?? throw new Exception($"Could not find a serializer for message of type {typeName}"); + message ??= serializer.Deserialize(evt.ProtoData); + await agent.OnMessageAsync(message, messageContext); } } From 51ed705eb3bf1fa3e628061a916bf7dfa480414b Mon Sep 17 00:00:00 2001 From: Jacob Alber Date: Wed, 12 Feb 2025 11:55:11 -0500 Subject: [PATCH 2/2] fix: switch temp sdk version to less restrictive --- .github/workflows/dotnet-build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/dotnet-build.yml b/.github/workflows/dotnet-build.yml index ca55a6f142c7..a485f9ddd799 100644 --- a/.github/workflows/dotnet-build.yml +++ b/.github/workflows/dotnet-build.yml @@ -169,7 +169,7 @@ jobs: dotnet-version: '9.0.x' - name: Install Temp Global.JSON run: | - echo "{\"sdk\": {\"version\": \"9.0.101\"}}" > global.json + echo "{\"sdk\": {\"version\": \"9.0\"}}" > global.json - name: Install .NET Aspire workload run: dotnet workload install aspire - name: Install dev certs