Skip to content

Commit

Permalink
feat: rest server streaming support
Browse files Browse the repository at this point in the history
  • Loading branch information
viacheslav-rostovtsev authored and jskeet committed Nov 22, 2022
1 parent b3e2cf9 commit a7f122a
Show file tree
Hide file tree
Showing 7 changed files with 397 additions and 15 deletions.
166 changes: 166 additions & 0 deletions Google.Api.Gax.Grpc.Tests/Rest/PartialDecodingStreamReaderTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright 2022 Google LLC
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file or at
* https://developers.google.com/open-source/licenses/bsd
*/

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Xunit;

namespace Google.Api.Gax.Grpc.Rest.Tests
{
public class PartialDecodingStreamReaderTest
{
private static readonly string ArrayOfObjectsJson = @"
[
{
""foo"": 1
},
{
""bar"": 2
}
]
";

private static readonly string IncompleteArrayOfObjectsJson = @"
[
{
""foo"": 1
},";

/// <summary>
/// Test coarse split data.
/// </summary>
[Fact]
public async void DecodingStreamReaderTestByLine()
{
StreamReader reader = new ReplayingStreamReader(ArrayOfObjectsJson.Split(new []{Environment.NewLine}, StringSplitOptions.RemoveEmptyEntries));
var decodingReader = new PartialDecodingStreamReader<JObject>(Task.FromResult(reader), JObject.Parse);

var result = await decodingReader.MoveNext(CancellationToken.None);
Assert.True(result);
Assert.NotNull(decodingReader.Current);
Assert.Equal(decodingReader.Current["foo"], 1);

result = await decodingReader.MoveNext(CancellationToken.None);
Assert.True(result);
Assert.NotNull(decodingReader.Current);
Assert.Equal(decodingReader.Current["bar"], 2);

result = await decodingReader.MoveNext(CancellationToken.None);
Assert.False(result);

result = await decodingReader.MoveNext(CancellationToken.None);
Assert.False(result);
}

/// <summary>
/// Test data split by characters.
/// </summary>
[Fact]
public async void DecodingStreamReaderTestByChar()
{
StreamReader reader = new ReplayingStreamReader(ArrayOfObjectsJson.Select(c => c.ToString()));
var decodingReader = new PartialDecodingStreamReader<JObject>(Task.FromResult(reader), JObject.Parse);

var result = await decodingReader.MoveNext(CancellationToken.None);
Assert.True(result);
Assert.NotNull(decodingReader.Current);
Assert.Equal(decodingReader.Current["foo"], 1);

result = await decodingReader.MoveNext(CancellationToken.None);
Assert.True(result);
Assert.NotNull(decodingReader.Current);
Assert.Equal(decodingReader.Current["bar"], 2);

result = await decodingReader.MoveNext(CancellationToken.None);
Assert.False(result);

result = await decodingReader.MoveNext(CancellationToken.None);
Assert.False(result);
}

/// <summary>
/// Test when data breaks off unexpectedly.
/// </summary>
[Fact]
public async void DecodingStreamReaderTestIncomplete()
{
StreamReader reader = new ReplayingStreamReader(IncompleteArrayOfObjectsJson.Split(new []{Environment.NewLine}, StringSplitOptions.RemoveEmptyEntries));
var decodingReader = new PartialDecodingStreamReader<JObject>(Task.FromResult(reader), JObject.Parse);

var result = await decodingReader.MoveNext(CancellationToken.None);
Assert.True(result);
Assert.NotNull(decodingReader.Current);
Assert.Equal(decodingReader.Current["foo"], 1);

var ex = await Assert.ThrowsAsync<InvalidOperationException>(async () =>
await decodingReader.MoveNext(CancellationToken.None));
Assert.Contains("Closing `]` bracket not received after iterating through the stream.", ex.Message);
}

/// <summary>
/// Test when data is empty array.
/// </summary>
[Fact]
public async void DecodingStreamReaderTestEmpty()
{
StreamReader reader = new ReplayingStreamReader(new[] {"[]"});
var decodingReader = new PartialDecodingStreamReader<JObject>(Task.FromResult(reader), JObject.Parse);

var result = await decodingReader.MoveNext(CancellationToken.None);
Assert.False(result);

result = await decodingReader.MoveNext(CancellationToken.None);
Assert.False(result);
}
}

/// <summary>
/// A fake of a StreamReader emitting given strings
/// </summary>
internal class ReplayingStreamReader : StreamReader
{
private readonly Queue<string> _queue;

/// <summary>
/// Cannot override EndOfStream, so have to nudge
/// the base class to do this.
/// Initialize it with a non-empty stream and later read
/// that one to end.
/// </summary>
/// <param name="strings"></param>
public ReplayingStreamReader(IEnumerable<string> strings) : base(new MemoryStream(new byte[1]))
{
_queue = new Queue<string>(strings);
}

public override Task<int> ReadAsync(char[] buffer, int index, int count)
{
if (_queue.Count <= 0)
{
base.ReadToEnd(); // EndOfStream starts to return true
return Task.FromResult(0);
}

var nextString = _queue.Dequeue();

Assert.True(count > nextString.Length);

for (int i = 0; i < nextString.Length; i++)
{
buffer[index + i] = nextString[i];
}

return Task.FromResult(nextString.Length);
}
}
}
152 changes: 152 additions & 0 deletions Google.Api.Gax.Grpc/Rest/PartialDecodingStreamReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright 2022 Google LLC
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file or at
* https://developers.google.com/open-source/licenses/bsd
*/

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Newtonsoft.Json.Linq;

namespace Google.Api.Gax.Grpc.Rest;

/// <summary>
/// An IAsyncStreamReader implementation that reads an array of messages
/// from HTTP stream as they arrive in (partial) JSON chunks.
/// </summary>
/// <typeparam name="TResponse">Type of proto messages in the stream</typeparam>
internal class PartialDecodingStreamReader<TResponse> : IAsyncStreamReader<TResponse>
{
private readonly Task<StreamReader> _streamReaderTask;
private readonly Func<string, TResponse> _responseConverter;

private readonly Queue<TResponse> _readyResults;
private readonly StringBuilder _currentBuffer;

private StreamReader _streamReader;
private bool _arrayClosed;

/// <summary>
/// Creates the StreamReader
/// </summary>
/// <param name="streamReaderTask">A stream reader returning partial JSON chunks</param>
/// <param name="responseConverter">A function to transform a well-formed JSON object into the proto message.</param>
public PartialDecodingStreamReader(Task<StreamReader> streamReaderTask, Func<string, TResponse> responseConverter)
{
_streamReaderTask = streamReaderTask;
_responseConverter = responseConverter;

_readyResults = new Queue<TResponse>();
_currentBuffer = new StringBuilder();

_streamReader = null;
_arrayClosed = false;
}

/// <inheritdoc />
public async Task<bool> MoveNext(CancellationToken cancellationToken)
{
_streamReader ??= await _streamReaderTask.ConfigureAwait(false);

if (_readyResults.Count > 0)
{
Current = _readyResults.Dequeue();
return true;
}

if (_streamReader.EndOfStream)
{
return false;
}

var buffer = new char[8000];
while (_readyResults.Count == 0)
{
var taskRead = _streamReader.ReadAsync(buffer, 0, buffer.Length);
var cancellationTask = Task.Delay(-1, cancellationToken);
var resultTask = await Task.WhenAny(taskRead, cancellationTask).ConfigureAwait(false);

if (resultTask == cancellationTask)
{
// If the cancellationTask "wins" `Task.WhenAny` by being cancelled, the following await will throw TaskCancelledException.
await cancellationTask.ConfigureAwait(false);
}

var readLen = await taskRead.ConfigureAwait(false);
if (readLen == 0)
{
if (!_arrayClosed)
{
var errorText = "Closing `]` bracket not received after iterating through the stream. " +
"This means that streaming ended without all objects transmitted. " +
"It is likely a result of server or network error.";
throw new InvalidOperationException(errorText);
}

return false;
}

var readChars = buffer.Take(readLen);
foreach (var c in readChars)
{
// Closing bracket for the top-level array
if (_currentBuffer.Length == 0 && c == ']')
{
// TODO[virost, jskeet, 11/2022] Fix with tokenizer:
// it's possible to receive more data after the closing `]`
_arrayClosed = true;
continue;
}

// Between-objects commas and spaces, as well as an opening bracket
// for the top-level array.
if (_currentBuffer.Length == 0 && c != '{')
{
continue;
}

_currentBuffer.Append(c);
if (c != '}')
{
continue;
}

var currentStr = _currentBuffer.ToString();
try
{
// This will throw unless the characters in the _currentBuffer
// add up to a correct JSON and since the _currentBuffer always
// starts with an opening `{` bracket from one of the
// top-level array's element's,
// this will throw unless _currentBuffer contains one message.
// TODO[virost, jskeet, 11/2022] Use a JSON tokenizer instead
JObject.Parse(currentStr);
}
catch (Newtonsoft.Json.JsonReaderException)
{
// Tried to parse a partial json because the `}` was a part of
// a string or a child inner object.
continue;
}

TResponse responseElement = _responseConverter(currentStr);
_readyResults.Enqueue(responseElement);
_currentBuffer.Clear();
}
}

Current = _readyResults.Dequeue();
return true;
}

/// <inheritdoc />
public TResponse Current { get; private set; }
}
7 changes: 5 additions & 2 deletions Google.Api.Gax.Grpc/Rest/ReadHttpResponseMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Runtime.ExceptionServices;

namespace Google.Api.Gax.Grpc.Rest
Expand Down Expand Up @@ -63,11 +64,13 @@ internal ReadHttpResponseMessage(HttpResponseMessage response, ExceptionDispatch
// We'll bubble up the _readException instead.
(OriginalResponseMessage, _readException) = (response, readException);

internal Metadata GetHeaders()
internal Metadata GetHeaders() => ReadHeaders(OriginalResponseMessage.Headers);

internal static Metadata ReadHeaders(HttpResponseHeaders headers)
{
// TODO: This could be very wrong. I don't know what headers we should really return, and I don't know about semi-colon joining.
var metadata = new Metadata();
foreach (var header in OriginalResponseMessage.Headers)
foreach (var header in headers)
{
metadata.Add(header.Key, string.Join(";", header.Value));
}
Expand Down
2 changes: 1 addition & 1 deletion Google.Api.Gax.Grpc/Rest/RestCallInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreami

/// <inheritdoc />
public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) =>
throw new NotSupportedException("Streaming methods are not supported by the hybrid REST/gRPC mode");
_channel.AsyncServerStreamingCall(method, host, options, request);

/// <inheritdoc />
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) =>
Expand Down
Loading

0 comments on commit a7f122a

Please sign in to comment.