Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support gRPC streaming #448

Merged
merged 1 commit into from
Dec 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ BenchmarkDotNet.Artifacts/
tools/

*.proto
!hello.proto

generated/
generated-v3/
Expand Down
21 changes: 21 additions & 0 deletions sample/SkyApm.Sample.Frontend/Controllers/ValuesController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,27 @@ public async Task<IActionResult> SayHelloWithExceptionAsync(string name)
return Ok(message);
}

[HttpGet("greeter/clientstreaming")]
public async Task<IActionResult> SayHelloByClientStreamingAsync()
{
var message = await _greeter.SayHelloByClientStreamingAsync();
return Ok(message);
}

[HttpGet("greeter/serverstreaming")]
public async Task<IActionResult> SayHelloByServerStreamingAsync()
{
var message = await _greeter.SayHelloByServerStreamingAsync();
return Ok(message);
}

[HttpGet("greeter/duplexstreaming")]
public async Task<IActionResult> SayHelloByDuplexStreamingAsync()
{
var message = await _greeter.SayHelloByDuplexStreamingAsync();
return Ok(message);
}

[HttpGet("hellojava")]
public async Task<IActionResult> HelloJava()
{
Expand Down
61 changes: 61 additions & 0 deletions sample/SkyApm.Sample.Frontend/Services/GreeterGrpcService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,66 @@ public async Task<string> SayHelloWithExceptionAsync(string name)
var reply = await _client.SayHelloWithExceptionAsync(new HelloRequest { Name = name });
return reply.Message;
}

public async Task<string> SayHelloByClientStreamingAsync()
{
using (var call = _client.SayHelloByClientStreaming())
{
for (int i = 0; i < 10; i++)
{
await call.RequestStream.WriteAsync(new HelloRequest
{
Name = $"hello-{i}"
});
}
await call.RequestStream.CompleteAsync();
var response = await call.ResponseAsync;
return response.Message;
}
}

public async Task<string> SayHelloByServerStreamingAsync()
{
using (var call = _client.SayHelloByServerStreaming(new HelloRequest { Name = "hello" }))
{
var iterator = call.ResponseStream;
var list = new List<string>();
while (await iterator.MoveNext())
{
list.Add(iterator.Current.Message);
}
return string.Join(",", list);
}
}


public async Task<string> SayHelloByDuplexStreamingAsync()
{
using (var call = _client.SayHelloByDuplexStreaming())
{
var list = new List<string>();

var responseTask = Task.Run(async () =>
{
var iterator = call.ResponseStream;
while (await iterator.MoveNext())
{
list.Add(iterator.Current.Message);
}
});

for (int i = 0; i < 10; i++)
{
await call.RequestStream.WriteAsync(new HelloRequest
{
Name = $"hello-{i}"
});
}
await call.RequestStream.CompleteAsync();
await responseTask;

return string.Join(",", list);
}
}
}
}
120 changes: 103 additions & 17 deletions sample/grpc/SkyApm.Sample.GrpcProto/Hello.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,34 @@ public static partial class HelloReflection {
static HelloReflection() {
byte[] descriptorData = global::System.Convert.FromBase64String(
string.Concat(
"CgtoZWxsby5wcm90bxIFR3JlZXQiHAoMSGVsbG9SZXF1ZXN0EgwKBG5hbWUY",
"ASABKAkiHQoKSGVsbG9SZXBseRIPCgdtZXNzYWdlGAEgASgJMn4KB0dyZWV0",
"ZXISMgoIU2F5SGVsbG8SEy5HcmVldC5IZWxsb1JlcXVlc3QaES5HcmVldC5I",
"ZWxsb1JlcGx5Ej8KFVNheUhlbGxvV2l0aEV4Y2VwdGlvbhITLkdyZWV0Lkhl",
"bGxvUmVxdWVzdBoRLkdyZWV0LkhlbGxvUmVwbHlCDqoCC0dycGNHcmVldGVy",
"YgZwcm90bzM="));
"CgtoZWxsby5wcm90bxILR3JwY0dyZWV0ZXIiHAoMSGVsbG9SZXF1ZXN0EgwK",
"BG5hbWUYASABKAkiHQoKSGVsbG9SZXBseRIPCgdtZXNzYWdlGAEgASgJMpsD",
"CgdHcmVldGVyEkAKCFNheUhlbGxvEhkuR3JwY0dyZWV0ZXIuSGVsbG9SZXF1",
"ZXN0GhcuR3JwY0dyZWV0ZXIuSGVsbG9SZXBseSIAEk0KFVNheUhlbGxvV2l0",
"aEV4Y2VwdGlvbhIZLkdycGNHcmVldGVyLkhlbGxvUmVxdWVzdBoXLkdycGNH",
"cmVldGVyLkhlbGxvUmVwbHkiABJTChlTYXlIZWxsb0J5U2VydmVyU3RyZWFt",
"aW5nEhkuR3JwY0dyZWV0ZXIuSGVsbG9SZXF1ZXN0GhcuR3JwY0dyZWV0ZXIu",
"SGVsbG9SZXBseSIAMAESUwoZU2F5SGVsbG9CeUNsaWVudFN0cmVhbWluZxIZ",
"LkdycGNHcmVldGVyLkhlbGxvUmVxdWVzdBoXLkdycGNHcmVldGVyLkhlbGxv",
"UmVwbHkiACgBElUKGVNheUhlbGxvQnlEdXBsZXhTdHJlYW1pbmcSGS5HcnBj",
"R3JlZXRlci5IZWxsb1JlcXVlc3QaFy5HcnBjR3JlZXRlci5IZWxsb1JlcGx5",
"IgAoATABYgZwcm90bzM="));
descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
new pbr::FileDescriptor[] { },
new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] {
new pbr::GeneratedClrTypeInfo(typeof(global::GrpcGreeter.HelloRequest), global::GrpcGreeter.HelloRequest.Parser, new[]{ "Name" }, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::GrpcGreeter.HelloReply), global::GrpcGreeter.HelloReply.Parser, new[]{ "Message" }, null, null, null)
new pbr::GeneratedClrTypeInfo(null, null, new pbr::GeneratedClrTypeInfo[] {
new pbr::GeneratedClrTypeInfo(typeof(global::GrpcGreeter.HelloRequest), global::GrpcGreeter.HelloRequest.Parser, new[]{ "Name" }, null, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::GrpcGreeter.HelloReply), global::GrpcGreeter.HelloReply.Parser, new[]{ "Message" }, null, null, null, null)
}));
}
#endregion

}
#region Messages
/// <summary>
/// The request message containing the user's name.
/// </summary>
public sealed partial class HelloRequest : pb::IMessage<HelloRequest> {
public sealed partial class HelloRequest : pb::IMessage<HelloRequest>
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
, pb::IBufferMessage
#endif
{
private static readonly pb::MessageParser<HelloRequest> _parser = new pb::MessageParser<HelloRequest>(() => new HelloRequest());
private pb::UnknownFieldSet _unknownFields;
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
Expand Down Expand Up @@ -123,15 +130,32 @@ public override string ToString() {

[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public void WriteTo(pb::CodedOutputStream output) {
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
output.WriteRawMessage(this);
#else
if (Name.Length != 0) {
output.WriteRawTag(10);
output.WriteString(Name);
}
if (_unknownFields != null) {
_unknownFields.WriteTo(output);
}
#endif
}

#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
void pb::IBufferMessage.InternalWriteTo(ref pb::WriteContext output) {
if (Name.Length != 0) {
output.WriteRawTag(10);
output.WriteString(Name);
}
if (_unknownFields != null) {
_unknownFields.WriteTo(ref output);
}
}
#endif

[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public int CalculateSize() {
int size = 0;
Expand All @@ -157,6 +181,9 @@ public void MergeFrom(HelloRequest other) {

[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public void MergeFrom(pb::CodedInputStream input) {
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
input.ReadRawMessage(this);
#else
uint tag;
while ((tag = input.ReadTag()) != 0) {
switch(tag) {
Expand All @@ -169,14 +196,34 @@ public void MergeFrom(pb::CodedInputStream input) {
}
}
}
#endif
}

#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
void pb::IBufferMessage.InternalMergeFrom(ref pb::ParseContext input) {
uint tag;
while ((tag = input.ReadTag()) != 0) {
switch(tag) {
default:
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, ref input);
break;
case 10: {
Name = input.ReadString();
break;
}
}
}
}
#endif

}

/// <summary>
/// The response message containing the greetings.
/// </summary>
public sealed partial class HelloReply : pb::IMessage<HelloReply> {
public sealed partial class HelloReply : pb::IMessage<HelloReply>
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
, pb::IBufferMessage
#endif
{
private static readonly pb::MessageParser<HelloReply> _parser = new pb::MessageParser<HelloReply>(() => new HelloReply());
private pb::UnknownFieldSet _unknownFields;
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
Expand Down Expand Up @@ -255,15 +302,32 @@ public override string ToString() {

[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public void WriteTo(pb::CodedOutputStream output) {
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
output.WriteRawMessage(this);
#else
if (Message.Length != 0) {
output.WriteRawTag(10);
output.WriteString(Message);
}
if (_unknownFields != null) {
_unknownFields.WriteTo(output);
}
#endif
}

#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
void pb::IBufferMessage.InternalWriteTo(ref pb::WriteContext output) {
if (Message.Length != 0) {
output.WriteRawTag(10);
output.WriteString(Message);
}
if (_unknownFields != null) {
_unknownFields.WriteTo(ref output);
}
}
#endif

[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public int CalculateSize() {
int size = 0;
Expand All @@ -289,6 +353,9 @@ public void MergeFrom(HelloReply other) {

[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public void MergeFrom(pb::CodedInputStream input) {
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
input.ReadRawMessage(this);
#else
uint tag;
while ((tag = input.ReadTag()) != 0) {
switch(tag) {
Expand All @@ -301,7 +368,26 @@ public void MergeFrom(pb::CodedInputStream input) {
}
}
}
#endif
}

#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
void pb::IBufferMessage.InternalMergeFrom(ref pb::ParseContext input) {
uint tag;
while ((tag = input.ReadTag()) != 0) {
switch(tag) {
default:
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, ref input);
break;
case 10: {
Message = input.ReadString();
break;
}
}
}
}
#endif

}

Expand Down
Loading