Skip to content

Commit

Permalink
HTTP/3: Use new QuicStream.ReadsCompleted property in transport (#35482)
Browse files Browse the repository at this point in the history
Co-authored-by: James Newton-King <[email protected]>
  • Loading branch information
github-actions[bot] and JamesNK authored Aug 20, 2021
1 parent 1329a6f commit 6b4cf67
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,30 @@ private async Task DoReceive()

input.Advance(bytesReceived);

var flushTask = input.FlushAsync();
ValueTask<FlushResult> flushTask;

if (_stream.ReadsCompleted)
{
// If the data returned from ReadAsync is the final chunk on the stream then
// flush data and end pipe together with CompleteAsync.
//
// Getting data and complete together is important for HTTP/3 when parsing headers.
// It is important that it knows that there is no body after the headers.
var completeTask = input.CompleteAsync(ResolveCompleteReceiveException(error));
if (completeTask.IsCompletedSuccessfully)
{
// Fast path. CompleteAsync completed immediately.
flushTask = ValueTask.FromResult(new FlushResult(isCanceled: false, isCompleted: true));
}
else
{
flushTask = AwaitCompleteTaskAsync(completeTask);
}
}
else
{
flushTask = input.FlushAsync();
}

var paused = !flushTask.IsCompleted;

Expand Down Expand Up @@ -244,12 +267,23 @@ private async Task DoReceive()
finally
{
// If Shutdown() has already bee called, assume that was the reason ProcessReceives() exited.
Input.Complete(_shutdownReadReason ?? _shutdownReason ?? error);
Input.Complete(ResolveCompleteReceiveException(error));

FireStreamClosed();

await _waitForConnectionClosedTcs.Task;
}

async static ValueTask<FlushResult> AwaitCompleteTaskAsync(ValueTask completeTask)
{
await completeTask;
return new FlushResult(isCanceled: false, isCompleted: true);
}
}

private Exception? ResolveCompleteReceiveException(Exception? error)
{
return _shutdownReadReason ?? _shutdownReason ?? error;
}

private void FireStreamClosed()
Expand Down
32 changes: 32 additions & 0 deletions src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,38 @@ public async Task ClientToServerUnidirectionalStream_ClientAbort_ServerReceivesA
await closedTcs.Task.DefaultTimeout();
}

[ConditionalFact]
[MsQuicSupported]
public async Task ClientToServerUnidirectionalStream_CompleteWrites_PipeProvidesDataAndCompleteTogether()
{
// Arrange
await using var connectionListener = await QuicTestHelpers.CreateConnectionListenerFactory(LoggerFactory);

var options = QuicTestHelpers.CreateClientConnectionOptions(connectionListener.EndPoint);
using var quicConnection = new QuicConnection(QuicImplementationProviders.MsQuic, options);
await quicConnection.ConnectAsync().DefaultTimeout();

await using var serverConnection = await connectionListener.AcceptAndAddFeatureAsync().DefaultTimeout();

// Act
await using var clientStream = quicConnection.OpenUnidirectionalStream();
await clientStream.WriteAsync(TestData).DefaultTimeout();

await using var serverStream = await serverConnection.AcceptAsync().DefaultTimeout();
var readResult = await serverStream.Transport.Input.ReadAtLeastAsync(TestData.Length).DefaultTimeout();
serverStream.Transport.Input.AdvanceTo(readResult.Buffer.End);

var readResultTask = serverStream.Transport.Input.ReadAsync();

await clientStream.WriteAsync(TestData, endStream: true).DefaultTimeout();

// Assert
var completeReadResult = await readResultTask.DefaultTimeout();

Assert.Equal(TestData, completeReadResult.Buffer.ToArray());
Assert.True(completeReadResult.IsCompleted);
}

[ConditionalFact]
[MsQuicSupported]
public async Task ServerToClientUnidirectionalStream_ServerWritesDataAndCompletes_GracefullyClosed()
Expand Down

0 comments on commit 6b4cf67

Please sign in to comment.