From 3ac78aa0f48bc7f458bc2209b60401efd84339a0 Mon Sep 17 00:00:00 2001 From: beckjin <1798410296@qq.com> Date: Fri, 17 Sep 2021 16:28:47 +0800 Subject: [PATCH] Support gRPC streaming --- .gitignore | 1 + .../Controllers/ValuesController.cs | 21 ++ .../Services/GreeterGrpcService.cs | 61 ++++++ sample/grpc/SkyApm.Sample.GrpcProto/Hello.cs | 120 ++++++++++-- .../grpc/SkyApm.Sample.GrpcProto/HelloGrpc.cs | 185 +++++++++++++----- .../grpc/SkyApm.Sample.GrpcProto/hello.proto | 29 +++ .../SkyApm.Sample.GrpcServer/GreeterImpl.cs | 39 +++- .../grpc/SkyApm.Sample.GrpcServer/skyapm.json | 6 +- .../Client/ClientDiagnosticInterceptor.cs | 65 ++++-- .../Server/ServerDiagnosticInterceptor.cs | 60 +++++- 10 files changed, 495 insertions(+), 92 deletions(-) create mode 100644 sample/grpc/SkyApm.Sample.GrpcProto/hello.proto diff --git a/.gitignore b/.gitignore index 0f91eed9..0d6778eb 100644 --- a/.gitignore +++ b/.gitignore @@ -263,6 +263,7 @@ BenchmarkDotNet.Artifacts/ tools/ *.proto +!hello.proto generated/ generated-v3/ diff --git a/sample/SkyApm.Sample.Frontend/Controllers/ValuesController.cs b/sample/SkyApm.Sample.Frontend/Controllers/ValuesController.cs index b053089e..36adaaf0 100644 --- a/sample/SkyApm.Sample.Frontend/Controllers/ValuesController.cs +++ b/sample/SkyApm.Sample.Frontend/Controllers/ValuesController.cs @@ -75,6 +75,27 @@ public async Task SayHelloWithExceptionAsync(string name) return Ok(message); } + [HttpGet("greeter/clientstreaming")] + public async Task SayHelloByClientStreamingAsync() + { + var message = await _greeter.SayHelloByClientStreamingAsync(); + return Ok(message); + } + + [HttpGet("greeter/serverstreaming")] + public async Task SayHelloByServerStreamingAsync() + { + var message = await _greeter.SayHelloByServerStreamingAsync(); + return Ok(message); + } + + [HttpGet("greeter/duplexstreaming")] + public async Task SayHelloByDuplexStreamingAsync() + { + var message = await _greeter.SayHelloByDuplexStreamingAsync(); + return Ok(message); + } + [HttpGet("hellojava")] public async Task HelloJava() { diff --git a/sample/SkyApm.Sample.Frontend/Services/GreeterGrpcService.cs b/sample/SkyApm.Sample.Frontend/Services/GreeterGrpcService.cs index 834bc4a3..e4a17f4d 100644 --- a/sample/SkyApm.Sample.Frontend/Services/GreeterGrpcService.cs +++ b/sample/SkyApm.Sample.Frontend/Services/GreeterGrpcService.cs @@ -43,5 +43,66 @@ public async Task SayHelloWithExceptionAsync(string name) var reply = await _client.SayHelloWithExceptionAsync(new HelloRequest { Name = name }); return reply.Message; } + + public async Task SayHelloByClientStreamingAsync() + { + using (var call = _client.SayHelloByClientStreaming()) + { + for (int i = 0; i < 10; i++) + { + await call.RequestStream.WriteAsync(new HelloRequest + { + Name = $"hello-{i}" + }); + } + await call.RequestStream.CompleteAsync(); + var response = await call.ResponseAsync; + return response.Message; + } + } + + public async Task SayHelloByServerStreamingAsync() + { + using (var call = _client.SayHelloByServerStreaming(new HelloRequest { Name = "hello" })) + { + var iterator = call.ResponseStream; + var list = new List(); + while (await iterator.MoveNext()) + { + list.Add(iterator.Current.Message); + } + return string.Join(",", list); + } + } + + + public async Task SayHelloByDuplexStreamingAsync() + { + using (var call = _client.SayHelloByDuplexStreaming()) + { + var list = new List(); + + var responseTask = Task.Run(async () => + { + var iterator = call.ResponseStream; + while (await iterator.MoveNext()) + { + list.Add(iterator.Current.Message); + } + }); + + for (int i = 0; i < 10; i++) + { + await call.RequestStream.WriteAsync(new HelloRequest + { + Name = $"hello-{i}" + }); + } + await call.RequestStream.CompleteAsync(); + await responseTask; + + return string.Join(",", list); + } + } } } diff --git a/sample/grpc/SkyApm.Sample.GrpcProto/Hello.cs b/sample/grpc/SkyApm.Sample.GrpcProto/Hello.cs index 2eb5d69a..0e1c3562 100644 --- a/sample/grpc/SkyApm.Sample.GrpcProto/Hello.cs +++ b/sample/grpc/SkyApm.Sample.GrpcProto/Hello.cs @@ -24,27 +24,34 @@ public static partial class HelloReflection { static HelloReflection() { byte[] descriptorData = global::System.Convert.FromBase64String( string.Concat( - "CgtoZWxsby5wcm90bxIFR3JlZXQiHAoMSGVsbG9SZXF1ZXN0EgwKBG5hbWUY", - "ASABKAkiHQoKSGVsbG9SZXBseRIPCgdtZXNzYWdlGAEgASgJMn4KB0dyZWV0", - "ZXISMgoIU2F5SGVsbG8SEy5HcmVldC5IZWxsb1JlcXVlc3QaES5HcmVldC5I", - "ZWxsb1JlcGx5Ej8KFVNheUhlbGxvV2l0aEV4Y2VwdGlvbhITLkdyZWV0Lkhl", - "bGxvUmVxdWVzdBoRLkdyZWV0LkhlbGxvUmVwbHlCDqoCC0dycGNHcmVldGVy", - "YgZwcm90bzM=")); + "CgtoZWxsby5wcm90bxILR3JwY0dyZWV0ZXIiHAoMSGVsbG9SZXF1ZXN0EgwK", + "BG5hbWUYASABKAkiHQoKSGVsbG9SZXBseRIPCgdtZXNzYWdlGAEgASgJMpsD", + "CgdHcmVldGVyEkAKCFNheUhlbGxvEhkuR3JwY0dyZWV0ZXIuSGVsbG9SZXF1", + "ZXN0GhcuR3JwY0dyZWV0ZXIuSGVsbG9SZXBseSIAEk0KFVNheUhlbGxvV2l0", + "aEV4Y2VwdGlvbhIZLkdycGNHcmVldGVyLkhlbGxvUmVxdWVzdBoXLkdycGNH", + "cmVldGVyLkhlbGxvUmVwbHkiABJTChlTYXlIZWxsb0J5U2VydmVyU3RyZWFt", + "aW5nEhkuR3JwY0dyZWV0ZXIuSGVsbG9SZXF1ZXN0GhcuR3JwY0dyZWV0ZXIu", + "SGVsbG9SZXBseSIAMAESUwoZU2F5SGVsbG9CeUNsaWVudFN0cmVhbWluZxIZ", + "LkdycGNHcmVldGVyLkhlbGxvUmVxdWVzdBoXLkdycGNHcmVldGVyLkhlbGxv", + "UmVwbHkiACgBElUKGVNheUhlbGxvQnlEdXBsZXhTdHJlYW1pbmcSGS5HcnBj", + "R3JlZXRlci5IZWxsb1JlcXVlc3QaFy5HcnBjR3JlZXRlci5IZWxsb1JlcGx5", + "IgAoATABYgZwcm90bzM=")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { }, - new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] { - new pbr::GeneratedClrTypeInfo(typeof(global::GrpcGreeter.HelloRequest), global::GrpcGreeter.HelloRequest.Parser, new[]{ "Name" }, null, null, null), - new pbr::GeneratedClrTypeInfo(typeof(global::GrpcGreeter.HelloReply), global::GrpcGreeter.HelloReply.Parser, new[]{ "Message" }, null, null, null) + new pbr::GeneratedClrTypeInfo(null, null, new pbr::GeneratedClrTypeInfo[] { + new pbr::GeneratedClrTypeInfo(typeof(global::GrpcGreeter.HelloRequest), global::GrpcGreeter.HelloRequest.Parser, new[]{ "Name" }, null, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::GrpcGreeter.HelloReply), global::GrpcGreeter.HelloReply.Parser, new[]{ "Message" }, null, null, null, null) })); } #endregion } #region Messages - /// - /// The request message containing the user's name. - /// - public sealed partial class HelloRequest : pb::IMessage { + public sealed partial class HelloRequest : pb::IMessage + #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE + , pb::IBufferMessage + #endif + { private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new HelloRequest()); private pb::UnknownFieldSet _unknownFields; [global::System.Diagnostics.DebuggerNonUserCodeAttribute] @@ -123,6 +130,9 @@ public override string ToString() { [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public void WriteTo(pb::CodedOutputStream output) { + #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE + output.WriteRawMessage(this); + #else if (Name.Length != 0) { output.WriteRawTag(10); output.WriteString(Name); @@ -130,8 +140,22 @@ public void WriteTo(pb::CodedOutputStream output) { if (_unknownFields != null) { _unknownFields.WriteTo(output); } + #endif } + #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + void pb::IBufferMessage.InternalWriteTo(ref pb::WriteContext output) { + if (Name.Length != 0) { + output.WriteRawTag(10); + output.WriteString(Name); + } + if (_unknownFields != null) { + _unknownFields.WriteTo(ref output); + } + } + #endif + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public int CalculateSize() { int size = 0; @@ -157,6 +181,9 @@ public void MergeFrom(HelloRequest other) { [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public void MergeFrom(pb::CodedInputStream input) { + #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE + input.ReadRawMessage(this); + #else uint tag; while ((tag = input.ReadTag()) != 0) { switch(tag) { @@ -169,14 +196,34 @@ public void MergeFrom(pb::CodedInputStream input) { } } } + #endif + } + + #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + void pb::IBufferMessage.InternalMergeFrom(ref pb::ParseContext input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + _unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, ref input); + break; + case 10: { + Name = input.ReadString(); + break; + } + } + } } + #endif } - /// - /// The response message containing the greetings. - /// - public sealed partial class HelloReply : pb::IMessage { + public sealed partial class HelloReply : pb::IMessage + #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE + , pb::IBufferMessage + #endif + { private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new HelloReply()); private pb::UnknownFieldSet _unknownFields; [global::System.Diagnostics.DebuggerNonUserCodeAttribute] @@ -255,6 +302,9 @@ public override string ToString() { [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public void WriteTo(pb::CodedOutputStream output) { + #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE + output.WriteRawMessage(this); + #else if (Message.Length != 0) { output.WriteRawTag(10); output.WriteString(Message); @@ -262,8 +312,22 @@ public void WriteTo(pb::CodedOutputStream output) { if (_unknownFields != null) { _unknownFields.WriteTo(output); } + #endif } + #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + void pb::IBufferMessage.InternalWriteTo(ref pb::WriteContext output) { + if (Message.Length != 0) { + output.WriteRawTag(10); + output.WriteString(Message); + } + if (_unknownFields != null) { + _unknownFields.WriteTo(ref output); + } + } + #endif + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public int CalculateSize() { int size = 0; @@ -289,6 +353,9 @@ public void MergeFrom(HelloReply other) { [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public void MergeFrom(pb::CodedInputStream input) { + #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE + input.ReadRawMessage(this); + #else uint tag; while ((tag = input.ReadTag()) != 0) { switch(tag) { @@ -301,7 +368,26 @@ public void MergeFrom(pb::CodedInputStream input) { } } } + #endif + } + + #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + void pb::IBufferMessage.InternalMergeFrom(ref pb::ParseContext input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + _unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, ref input); + break; + case 10: { + Message = input.ReadString(); + break; + } + } + } } + #endif } diff --git a/sample/grpc/SkyApm.Sample.GrpcProto/HelloGrpc.cs b/sample/grpc/SkyApm.Sample.GrpcProto/HelloGrpc.cs index ee5e0b3b..2d45f0c0 100644 --- a/sample/grpc/SkyApm.Sample.GrpcProto/HelloGrpc.cs +++ b/sample/grpc/SkyApm.Sample.GrpcProto/HelloGrpc.cs @@ -8,29 +8,87 @@ using grpc = global::Grpc.Core; namespace GrpcGreeter { - /// - /// The greeting service definition. - /// public static partial class Greeter { - static readonly string __ServiceName = "Greet.Greeter"; + static readonly string __ServiceName = "GrpcGreeter.Greeter"; - static readonly grpc::Marshaller __Marshaller_Greet_HelloRequest = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::GrpcGreeter.HelloRequest.Parser.ParseFrom); - static readonly grpc::Marshaller __Marshaller_Greet_HelloReply = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::GrpcGreeter.HelloReply.Parser.ParseFrom); + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + static void __Helper_SerializeMessage(global::Google.Protobuf.IMessage message, grpc::SerializationContext context) + { + #if !GRPC_DISABLE_PROTOBUF_BUFFER_SERIALIZATION + if (message is global::Google.Protobuf.IBufferMessage) + { + context.SetPayloadLength(message.CalculateSize()); + global::Google.Protobuf.MessageExtensions.WriteTo(message, context.GetBufferWriter()); + context.Complete(); + return; + } + #endif + context.Complete(global::Google.Protobuf.MessageExtensions.ToByteArray(message)); + } + + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + static class __Helper_MessageCache + { + public static readonly bool IsBufferMessage = global::System.Reflection.IntrospectionExtensions.GetTypeInfo(typeof(global::Google.Protobuf.IBufferMessage)).IsAssignableFrom(typeof(T)); + } + + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + static T __Helper_DeserializeMessage(grpc::DeserializationContext context, global::Google.Protobuf.MessageParser parser) where T : global::Google.Protobuf.IMessage + { + #if !GRPC_DISABLE_PROTOBUF_BUFFER_SERIALIZATION + if (__Helper_MessageCache.IsBufferMessage) + { + return parser.ParseFrom(context.PayloadAsReadOnlySequence()); + } + #endif + return parser.ParseFrom(context.PayloadAsNewBuffer()); + } + + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + static readonly grpc::Marshaller __Marshaller_GrpcGreeter_HelloRequest = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::GrpcGreeter.HelloRequest.Parser)); + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + static readonly grpc::Marshaller __Marshaller_GrpcGreeter_HelloReply = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::GrpcGreeter.HelloReply.Parser)); + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] static readonly grpc::Method __Method_SayHello = new grpc::Method( grpc::MethodType.Unary, __ServiceName, "SayHello", - __Marshaller_Greet_HelloRequest, - __Marshaller_Greet_HelloReply); + __Marshaller_GrpcGreeter_HelloRequest, + __Marshaller_GrpcGreeter_HelloReply); + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] static readonly grpc::Method __Method_SayHelloWithException = new grpc::Method( grpc::MethodType.Unary, __ServiceName, "SayHelloWithException", - __Marshaller_Greet_HelloRequest, - __Marshaller_Greet_HelloReply); + __Marshaller_GrpcGreeter_HelloRequest, + __Marshaller_GrpcGreeter_HelloReply); + + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + static readonly grpc::Method __Method_SayHelloByServerStreaming = new grpc::Method( + grpc::MethodType.ServerStreaming, + __ServiceName, + "SayHelloByServerStreaming", + __Marshaller_GrpcGreeter_HelloRequest, + __Marshaller_GrpcGreeter_HelloReply); + + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + static readonly grpc::Method __Method_SayHelloByClientStreaming = new grpc::Method( + grpc::MethodType.ClientStreaming, + __ServiceName, + "SayHelloByClientStreaming", + __Marshaller_GrpcGreeter_HelloRequest, + __Marshaller_GrpcGreeter_HelloReply); + + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + static readonly grpc::Method __Method_SayHelloByDuplexStreaming = new grpc::Method( + grpc::MethodType.DuplexStreaming, + __ServiceName, + "SayHelloByDuplexStreaming", + __Marshaller_GrpcGreeter_HelloRequest, + __Marshaller_GrpcGreeter_HelloReply); /// Service descriptor public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor @@ -42,22 +100,36 @@ public static partial class Greeter [grpc::BindServiceMethod(typeof(Greeter), "BindService")] public abstract partial class GreeterBase { - /// - /// Sends a greeting - /// - /// The request received from the client. - /// The context of the server-side call handler being invoked. - /// The response to send back to the client (wrapped by a task). + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] public virtual global::System.Threading.Tasks.Task SayHello(global::GrpcGreeter.HelloRequest request, grpc::ServerCallContext context) { throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, "")); } + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] public virtual global::System.Threading.Tasks.Task SayHelloWithException(global::GrpcGreeter.HelloRequest request, grpc::ServerCallContext context) { throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, "")); } + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + public virtual global::System.Threading.Tasks.Task SayHelloByServerStreaming(global::GrpcGreeter.HelloRequest request, grpc::IServerStreamWriter responseStream, grpc::ServerCallContext context) + { + throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, "")); + } + + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + public virtual global::System.Threading.Tasks.Task SayHelloByClientStreaming(grpc::IAsyncStreamReader requestStream, grpc::ServerCallContext context) + { + throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, "")); + } + + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + public virtual global::System.Threading.Tasks.Task SayHelloByDuplexStreaming(grpc::IAsyncStreamReader requestStream, grpc::IServerStreamWriter responseStream, grpc::ServerCallContext context) + { + throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, "")); + } + } /// Client for Greeter @@ -65,85 +137,100 @@ public partial class GreeterClient : grpc::ClientBase { /// Creates a new client for Greeter /// The channel to use to make remote calls. + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] public GreeterClient(grpc::ChannelBase channel) : base(channel) { } /// Creates a new client for Greeter that uses a custom CallInvoker. /// The callInvoker to use to make remote calls. + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] public GreeterClient(grpc::CallInvoker callInvoker) : base(callInvoker) { } /// Protected parameterless constructor to allow creation of test doubles. + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] protected GreeterClient() : base() { } /// Protected constructor to allow creation of configured clients. /// The client configuration. + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] protected GreeterClient(ClientBaseConfiguration configuration) : base(configuration) { } - /// - /// Sends a greeting - /// - /// The request to send to the server. - /// The initial metadata to send with the call. This parameter is optional. - /// An optional deadline for the call. The call will be cancelled if deadline is hit. - /// An optional token for canceling the call. - /// The response received from the server. + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] public virtual global::GrpcGreeter.HelloReply SayHello(global::GrpcGreeter.HelloRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken)) { return SayHello(request, new grpc::CallOptions(headers, deadline, cancellationToken)); } - /// - /// Sends a greeting - /// - /// The request to send to the server. - /// The options for the call. - /// The response received from the server. + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] public virtual global::GrpcGreeter.HelloReply SayHello(global::GrpcGreeter.HelloRequest request, grpc::CallOptions options) { return CallInvoker.BlockingUnaryCall(__Method_SayHello, null, options, request); } - /// - /// Sends a greeting - /// - /// The request to send to the server. - /// The initial metadata to send with the call. This parameter is optional. - /// An optional deadline for the call. The call will be cancelled if deadline is hit. - /// An optional token for canceling the call. - /// The call object. + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] public virtual grpc::AsyncUnaryCall SayHelloAsync(global::GrpcGreeter.HelloRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken)) { return SayHelloAsync(request, new grpc::CallOptions(headers, deadline, cancellationToken)); } - /// - /// Sends a greeting - /// - /// The request to send to the server. - /// The options for the call. - /// The call object. + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] public virtual grpc::AsyncUnaryCall SayHelloAsync(global::GrpcGreeter.HelloRequest request, grpc::CallOptions options) { return CallInvoker.AsyncUnaryCall(__Method_SayHello, null, options, request); } + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] public virtual global::GrpcGreeter.HelloReply SayHelloWithException(global::GrpcGreeter.HelloRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken)) { return SayHelloWithException(request, new grpc::CallOptions(headers, deadline, cancellationToken)); } + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] public virtual global::GrpcGreeter.HelloReply SayHelloWithException(global::GrpcGreeter.HelloRequest request, grpc::CallOptions options) { return CallInvoker.BlockingUnaryCall(__Method_SayHelloWithException, null, options, request); } + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] public virtual grpc::AsyncUnaryCall SayHelloWithExceptionAsync(global::GrpcGreeter.HelloRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken)) { return SayHelloWithExceptionAsync(request, new grpc::CallOptions(headers, deadline, cancellationToken)); } + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] public virtual grpc::AsyncUnaryCall SayHelloWithExceptionAsync(global::GrpcGreeter.HelloRequest request, grpc::CallOptions options) { return CallInvoker.AsyncUnaryCall(__Method_SayHelloWithException, null, options, request); } + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + public virtual grpc::AsyncServerStreamingCall SayHelloByServerStreaming(global::GrpcGreeter.HelloRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken)) + { + return SayHelloByServerStreaming(request, new grpc::CallOptions(headers, deadline, cancellationToken)); + } + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + public virtual grpc::AsyncServerStreamingCall SayHelloByServerStreaming(global::GrpcGreeter.HelloRequest request, grpc::CallOptions options) + { + return CallInvoker.AsyncServerStreamingCall(__Method_SayHelloByServerStreaming, null, options, request); + } + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + public virtual grpc::AsyncClientStreamingCall SayHelloByClientStreaming(grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken)) + { + return SayHelloByClientStreaming(new grpc::CallOptions(headers, deadline, cancellationToken)); + } + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + public virtual grpc::AsyncClientStreamingCall SayHelloByClientStreaming(grpc::CallOptions options) + { + return CallInvoker.AsyncClientStreamingCall(__Method_SayHelloByClientStreaming, null, options); + } + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + public virtual grpc::AsyncDuplexStreamingCall SayHelloByDuplexStreaming(grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken)) + { + return SayHelloByDuplexStreaming(new grpc::CallOptions(headers, deadline, cancellationToken)); + } + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + public virtual grpc::AsyncDuplexStreamingCall SayHelloByDuplexStreaming(grpc::CallOptions options) + { + return CallInvoker.AsyncDuplexStreamingCall(__Method_SayHelloByDuplexStreaming, null, options); + } /// Creates a new instance of client from given ClientBaseConfiguration. + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] protected override GreeterClient NewInstance(ClientBaseConfiguration configuration) { return new GreeterClient(configuration); @@ -152,21 +239,29 @@ protected override GreeterClient NewInstance(ClientBaseConfiguration configurati /// Creates service definition that can be registered with a server /// An object implementing the server-side handling logic. + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] public static grpc::ServerServiceDefinition BindService(GreeterBase serviceImpl) { return grpc::ServerServiceDefinition.CreateBuilder() .AddMethod(__Method_SayHello, serviceImpl.SayHello) - .AddMethod(__Method_SayHelloWithException, serviceImpl.SayHelloWithException).Build(); + .AddMethod(__Method_SayHelloWithException, serviceImpl.SayHelloWithException) + .AddMethod(__Method_SayHelloByServerStreaming, serviceImpl.SayHelloByServerStreaming) + .AddMethod(__Method_SayHelloByClientStreaming, serviceImpl.SayHelloByClientStreaming) + .AddMethod(__Method_SayHelloByDuplexStreaming, serviceImpl.SayHelloByDuplexStreaming).Build(); } /// Register service method with a service binder with or without implementation. Useful when customizing the service binding logic. /// Note: this method is part of an experimental API that can change or be removed without any prior notice. /// Service methods will be bound by calling AddMethod on this object. /// An object implementing the server-side handling logic. + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] public static void BindService(grpc::ServiceBinderBase serviceBinder, GreeterBase serviceImpl) { serviceBinder.AddMethod(__Method_SayHello, serviceImpl == null ? null : new grpc::UnaryServerMethod(serviceImpl.SayHello)); serviceBinder.AddMethod(__Method_SayHelloWithException, serviceImpl == null ? null : new grpc::UnaryServerMethod(serviceImpl.SayHelloWithException)); + serviceBinder.AddMethod(__Method_SayHelloByServerStreaming, serviceImpl == null ? null : new grpc::ServerStreamingServerMethod(serviceImpl.SayHelloByServerStreaming)); + serviceBinder.AddMethod(__Method_SayHelloByClientStreaming, serviceImpl == null ? null : new grpc::ClientStreamingServerMethod(serviceImpl.SayHelloByClientStreaming)); + serviceBinder.AddMethod(__Method_SayHelloByDuplexStreaming, serviceImpl == null ? null : new grpc::DuplexStreamingServerMethod(serviceImpl.SayHelloByDuplexStreaming)); } } diff --git a/sample/grpc/SkyApm.Sample.GrpcProto/hello.proto b/sample/grpc/SkyApm.Sample.GrpcProto/hello.proto new file mode 100644 index 00000000..0c16542d --- /dev/null +++ b/sample/grpc/SkyApm.Sample.GrpcProto/hello.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; + +package GrpcGreeter ; + +service Greeter { + + rpc SayHello (HelloRequest) returns (HelloReply) { + } + + rpc SayHelloWithException (HelloRequest) returns (HelloReply) { + } + + rpc SayHelloByServerStreaming (HelloRequest) returns (stream HelloReply) { + } + + rpc SayHelloByClientStreaming (stream HelloRequest) returns (HelloReply) { + } + + rpc SayHelloByDuplexStreaming (stream HelloRequest) returns (stream HelloReply) { + } +} + +message HelloRequest { + string name = 1; +} + +message HelloReply { + string message = 1; +} \ No newline at end of file diff --git a/sample/grpc/SkyApm.Sample.GrpcServer/GreeterImpl.cs b/sample/grpc/SkyApm.Sample.GrpcServer/GreeterImpl.cs index f1781bfb..5adba259 100644 --- a/sample/grpc/SkyApm.Sample.GrpcServer/GreeterImpl.cs +++ b/sample/grpc/SkyApm.Sample.GrpcServer/GreeterImpl.cs @@ -3,7 +3,6 @@ using System; using System.Collections.Generic; using System.Net.Http; -using System.Text; using System.Threading.Tasks; namespace SkyApm.Sample.GrpcServer @@ -24,5 +23,43 @@ public override Task SayHelloWithException(HelloRequest request, Ser { throw new Exception("grpc server throw exception !!!"); } + + public override async Task SayHelloByClientStreaming(IAsyncStreamReader requestStream, ServerCallContext context) + { + var names = new List(); + while (await requestStream.MoveNext()) + { + names.Add(requestStream.Current.Name); + } + return new HelloReply { Message = string.Join(",", names) }; + } + + public override async Task SayHelloByServerStreaming(HelloRequest request, IServerStreamWriter responseStream, ServerCallContext context) + { + var count = 10; + while (count > 0) + { + count--; + await responseStream.WriteAsync(new HelloReply + { + Message = $"{request.Name}-{count}" + }); + } + } + + public override async Task SayHelloByDuplexStreaming(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) + { + var httpClient = new HttpClient(); + var result = await httpClient.GetAsync("http://www.baidu.com"); + Console.WriteLine(result.Content.Headers); + + while (await requestStream.MoveNext()) + { + await responseStream.WriteAsync(new HelloReply + { + Message = requestStream.Current.Name + }); + } + } } } diff --git a/sample/grpc/SkyApm.Sample.GrpcServer/skyapm.json b/sample/grpc/SkyApm.Sample.GrpcServer/skyapm.json index 5527ab90..ab6782cb 100644 --- a/sample/grpc/SkyApm.Sample.GrpcServer/skyapm.json +++ b/sample/grpc/SkyApm.Sample.GrpcServer/skyapm.json @@ -3,7 +3,7 @@ "ServiceName": "grpc-greeter-server", "Namespace": "", "HeaderVersions": [ - "sw6" + "sw8" ], "Sampling": { "SamplePer3Secs": -1, @@ -15,11 +15,11 @@ }, "Transport": { "Interval": 3000, - "ProtocolVersion": "v6", + "ProtocolVersion": "v8", "QueueSize": 30000, "BatchSize": 3000, "gRPC": { - "Servers": "192.168.79.132:11800", + "Servers": "localhost:11800", "Timeout": 100000, "ConnectTimeout": 100000, "ReportTimeout": 600000 diff --git a/src/SkyApm.Diagnostics.Grpc/Client/ClientDiagnosticInterceptor.cs b/src/SkyApm.Diagnostics.Grpc/Client/ClientDiagnosticInterceptor.cs index 3310c2ca..603d5873 100644 --- a/src/SkyApm.Diagnostics.Grpc/Client/ClientDiagnosticInterceptor.cs +++ b/src/SkyApm.Diagnostics.Grpc/Client/ClientDiagnosticInterceptor.cs @@ -19,9 +19,6 @@ using Grpc.Core; using Grpc.Core.Interceptors; using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; namespace SkyApm.Diagnostics.Grpc.Client { @@ -36,30 +33,17 @@ public ClientDiagnosticInterceptor(ClientDiagnosticProcessor processor) public override TResponse BlockingUnaryCall(TRequest request, ClientInterceptorContext context, BlockingUnaryCallContinuation continuation) { - var metadata = _processor.BeginRequest(context); - try + return Call(context, (newContext) => { - var options = context.Options.WithHeaders(metadata); - context = new ClientInterceptorContext(context.Method, context.Host, options); - var response = continuation(request, context); - _processor.EndRequest(); - return response; - } - catch (Exception ex) - { - _processor.DiagnosticUnhandledException(ex); - throw ex; - } + return continuation(request, newContext); + }); } public override AsyncUnaryCall AsyncUnaryCall(TRequest request, ClientInterceptorContext context, AsyncUnaryCallContinuation continuation) { - var metadata = _processor.BeginRequest(context); - try + return Call(context, (newContext) => { - var options = context.Options.WithHeaders(metadata); - context = new ClientInterceptorContext(context.Method, context.Host, options); - var response = continuation(request, context); + var response = continuation(request, newContext); var responseAsync = response.ResponseAsync.ContinueWith(r => { try @@ -74,6 +58,45 @@ public override AsyncUnaryCall AsyncUnaryCall(TR } }); return new AsyncUnaryCall(responseAsync, response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); + }); + } + + public override AsyncClientStreamingCall AsyncClientStreamingCall(ClientInterceptorContext context, AsyncClientStreamingCallContinuation continuation) + { + return Call(context, (newContext) => + { + return continuation(newContext); + }); + } + + public override AsyncServerStreamingCall AsyncServerStreamingCall(TRequest request, ClientInterceptorContext context, AsyncServerStreamingCallContinuation continuation) + { + return Call(context, (newContext) => + { + return continuation(request, newContext); + }); + } + + public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall(ClientInterceptorContext context, AsyncDuplexStreamingCallContinuation continuation) + { + return Call(context, (newContext) => + { + return continuation(newContext); + }); + } + + public T Call(ClientInterceptorContext context, Func, T> func) + where TRequest : class + where TResponse : class + { + var metadata = _processor.BeginRequest(context); + try + { + var options = context.Options.WithHeaders(metadata); + var newContext = new ClientInterceptorContext(context.Method, context.Host, options); + var response = func(newContext); + _processor.EndRequest(); + return response; } catch (Exception ex) { diff --git a/src/SkyApm.Diagnostics.Grpc/Server/ServerDiagnosticInterceptor.cs b/src/SkyApm.Diagnostics.Grpc/Server/ServerDiagnosticInterceptor.cs index 377272b0..bfb61ab2 100644 --- a/src/SkyApm.Diagnostics.Grpc/Server/ServerDiagnosticInterceptor.cs +++ b/src/SkyApm.Diagnostics.Grpc/Server/ServerDiagnosticInterceptor.cs @@ -19,8 +19,6 @@ using Grpc.Core; using Grpc.Core.Interceptors; using System; -using System.Collections.Generic; -using System.Text; using System.Threading.Tasks; namespace SkyApm.Diagnostics.Grpc.Server @@ -35,11 +33,63 @@ public ServerDiagnosticInterceptor(ServerDiagnosticProcessor processor) } public override async Task UnaryServerHandler(TRequest request, ServerCallContext context, UnaryServerMethod continuation) + { + _processor.BeginRequest(context); + + return await Handler(context, async () => + { + return await continuation(request, context); + }); + } + + public override async Task ClientStreamingServerHandler(IAsyncStreamReader requestStream, ServerCallContext context, ClientStreamingServerMethod continuation) + { + return await Handler(context, async () => + { + return await continuation(requestStream, context); + }); + } + + public override async Task ServerStreamingServerHandler(TRequest request, IServerStreamWriter responseStream, ServerCallContext context, ServerStreamingServerMethod continuation) { + await Handler(context, async () => + { + await continuation(request, responseStream, context); + }); + } + + public override async Task DuplexStreamingServerHandler(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context, DuplexStreamingServerMethod continuation) + { + await Handler(context, async () => + { + await continuation(requestStream, responseStream, context); + }); + } + + private async Task Handler(ServerCallContext context, Func func) + where TRequest : class + { _processor.BeginRequest(context); try + { + await func(); + _processor.EndRequest(context); + } + catch (Exception ex) { - var response = await continuation(request, context); + _processor.DiagnosticUnhandledException(ex); + throw ex; + } + } + + private async Task Handler(ServerCallContext context, Func> func) + where TRequest : class + where TResponse : class + { + _processor.BeginRequest(context); + try + { + var response = await func(); _processor.EndRequest(context); return response; } @@ -47,7 +97,7 @@ public override async Task UnaryServerHandler(TR { _processor.DiagnosticUnhandledException(ex); throw ex; - } - } + } + } } }