Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUIC] QuicStream add ReadsCompleted #57492

Merged
merged 6 commits into from
Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/libraries/System.Net.Quic/ref/System.Net.Quic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ internal QuicStream() { }
public override bool CanTimeout { get { throw null; } }
public override long Length { get { throw null; } }
public override long Position { get { throw null; } set { } }
public bool ReadsCompleted { get { throw null; } }
public long StreamId { get { throw null; } }
public void AbortRead(long errorCode) { }
public void AbortWrite(long errorCode) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ internal override int WriteTimeout

internal override bool CanRead => !_disposed && ReadStreamBuffer is not null;

internal override bool ReadsCompleted => ReadStreamBuffer?.IsComplete ?? false;

internal override int Read(Span<byte> buffer)
{
CheckDisposed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net.Quic.Implementations.MsQuic.Internal;
Expand Down Expand Up @@ -50,6 +49,7 @@ private sealed class State
public QuicBuffer[] ReceiveQuicBuffers = Array.Empty<QuicBuffer>();
public int ReceiveQuicBuffersCount;
public int ReceiveQuicBuffersTotalBytes;
public bool ReceiveIsFinal;

// set when ReadState.PendingRead:
public Memory<byte> ReceiveUserBuffer;
Expand Down Expand Up @@ -193,6 +193,8 @@ internal MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_F

internal override bool CanWrite => _disposed == 0 && _canWrite;

internal override bool ReadsCompleted => _state.ReadState == ReadState.ReadsCompleted;

internal override bool CanTimeout => true;

private int _readTimeout = Timeout.Infinite;
Expand Down Expand Up @@ -415,13 +417,13 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
initialReadState = _state.ReadState;
abortError = _state.ReadErrorCode;

// Failure scenario: pre-canceled token. Transition: any -> Aborted
// Failure scenario: pre-canceled token. Transition: Any non-final -> Aborted
// PendingRead state indicates there is another concurrent read operation in flight
// which is forbidden, so it is handled separately
if (initialReadState != ReadState.PendingRead && cancellationToken.IsCancellationRequested)
{
initialReadState = ReadState.Aborted;
_state.ReadState = ReadState.Aborted;
CleanupReadStateAndCheckPending(_state, ReadState.Aborted);
preCanceled = true;
}

Expand All @@ -442,16 +444,14 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio

if (cancellationToken.CanBeCanceled)
{
// Failure scenario: cancellation. Transition: Any non-final -> Aborted
_state.ReceiveCancellationRegistration = cancellationToken.UnsafeRegister(static (obj, token) =>
{
var state = (State)obj!;
bool completePendingRead;
lock (state)
{
completePendingRead = state.ReadState == ReadState.PendingRead;
state.Stream = null;
state.ReceiveUserBuffer = null;
state.ReadState = ReadState.Aborted;
completePendingRead = CleanupReadStateAndCheckPending(state, ReadState.Aborted);
}

if (completePendingRead)
Expand All @@ -468,7 +468,8 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
return _state.ReceiveResettableCompletionSource.GetValueTask();
}

// Success scenario: data already available, completing synchronously. Transition IndividualReadComplete->None
// Success scenario: data already available, completing synchronously.
// Transition IndividualReadComplete->None, or IndividualReadComplete->ReadsCompleted, if it was the last message and we fully consumed it
if (initialReadState == ReadState.IndividualReadComplete)
{
_state.ReadState = ReadState.None;
Expand All @@ -481,6 +482,11 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
// Need to re-enable receives because MsQuic will pause them when we don't consume the entire buffer.
EnableReceive();
}
else if (_state.ReceiveIsFinal)
{
// This was a final message and we've consumed everything. We can complete the state without waiting for PEER_SEND_SHUTDOWN
_state.ReadState = ReadState.ReadsCompleted;
}

return new ValueTask<int>(taken);
}
Expand Down Expand Up @@ -512,7 +518,10 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
/// <returns>The number of bytes copied.</returns>
private static unsafe int CopyMsQuicBuffersToUserBuffer(ReadOnlySpan<QuicBuffer> sourceBuffers, Span<byte> destinationBuffer)
{
Debug.Assert(sourceBuffers.Length != 0);
if (sourceBuffers.Length == 0)
{
return 0;
}

int originalDestinationLength = destinationBuffer.Length;
QuicBuffer nativeBuffer;
Expand Down Expand Up @@ -543,16 +552,7 @@ internal override void AbortRead(long errorCode)
bool shouldComplete = false;
lock (_state)
{
if (_state.ReadState == ReadState.PendingRead)
{
shouldComplete = true;
_state.Stream = null;
_state.ReceiveUserBuffer = null;
}
if (_state.ReadState < ReadState.ReadsCompleted)
{
_state.ReadState = ReadState.Aborted;
}
shouldComplete = CleanupReadStateAndCheckPending(_state, ReadState.Aborted);
}

if (shouldComplete)
Expand Down Expand Up @@ -754,9 +754,7 @@ private void Dispose(bool disposing)
if (_state.ReadState < ReadState.ReadsCompleted || _state.ReadState == ReadState.Aborted)
{
abortRead = true;
completeRead = _state.ReadState == ReadState.PendingRead;
_state.Stream = null;
_state.ReadState = ReadState.Aborted;
completeRead = CleanupReadStateAndCheckPending(_state, ReadState.Aborted);
}

if (_state.ShutdownState == ShutdownState.None)
Expand Down Expand Up @@ -881,11 +879,9 @@ private static unsafe uint HandleEventRecv(State state, ref StreamEvent evt)
{
ref StreamEventDataReceive receiveEvent = ref evt.Data.Receive;

if (receiveEvent.BufferCount == 0)
if (NetEventSource.Log.IsEnabled())
{
// This is a 0-length receive that happens once reads are finished (via abort or otherwise).
// State changes for this are handled in PEER_SEND_SHUTDOWN / PEER_SEND_ABORT / SHUTDOWN_COMPLETE event handlers.
return MsQuicStatusCodes.Success;
NetEventSource.Info(state, $"{state.TraceId} Stream received {receiveEvent.TotalBufferLength} bytes{(receiveEvent.Flags.HasFlag(QUIC_RECEIVE_FLAGS.FIN) ? " with FIN flag" : "")}");
}

int readLength;
Expand Down Expand Up @@ -922,8 +918,27 @@ private static unsafe uint HandleEventRecv(State state, ref StreamEvent evt)

state.ReceiveQuicBuffersCount = (int)receiveEvent.BufferCount;
state.ReceiveQuicBuffersTotalBytes = checked((int)receiveEvent.TotalBufferLength);
state.ReadState = ReadState.IndividualReadComplete;
return MsQuicStatusCodes.Pending;
state.ReceiveIsFinal = receiveEvent.Flags.HasFlag(QUIC_RECEIVE_FLAGS.FIN);

// 0-length receive can happens once reads are finished (gracefully or otherwise).
if (state.ReceiveQuicBuffersTotalBytes == 0)
{
if (state.ReceiveIsFinal)
{
// We can complete the state without waiting for PEER_SEND_SHUTDOWN
state.ReadState = ReadState.ReadsCompleted;
}

// if it was not a graceful shutdown, we defer aborting to PEER_SEND_ABORT event handler
return MsQuicStatusCodes.Success;
}
else
{
// Normal RECEIVE - data will be buffered until user calls ReadAsync() and no new event will be issued until EnableReceive()
state.ReadState = ReadState.IndividualReadComplete;
return MsQuicStatusCodes.Pending;
}

case ReadState.PendingRead:
// There is a pending ReadAsync().

Expand All @@ -933,8 +948,17 @@ private static unsafe uint HandleEventRecv(State state, ref StreamEvent evt)
state.ReadState = ReadState.None;

readLength = CopyMsQuicBuffersToUserBuffer(new ReadOnlySpan<QuicBuffer>(receiveEvent.Buffers, (int)receiveEvent.BufferCount), state.ReceiveUserBuffer.Span);

// This was a final message and we've consumed everything. We can complete the state without waiting for PEER_SEND_SHUTDOWN
if (receiveEvent.Flags.HasFlag(QUIC_RECEIVE_FLAGS.FIN) && (uint)readLength == receiveEvent.TotalBufferLength)
{
state.ReadState = ReadState.ReadsCompleted;
}
// Else, if this was a final message, but we haven't consumed it fully, FIN flag will arrive again in the next RECEIVE event

state.ReceiveUserBuffer = null;
break;

default:
Debug.Assert(state.ReadState is ReadState.Aborted or ReadState.ConnectionClosed, $"Unexpected {nameof(ReadState)} '{state.ReadState}' in {nameof(HandleEventRecv)}.");

Expand Down Expand Up @@ -1008,16 +1032,7 @@ private static uint HandleEventShutdownComplete(State state, ref StreamEvent evt
// This event won't occur within the middle of a receive.
if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(state, $"{state.TraceId} Stream completing resettable event source.");

if (state.ReadState == ReadState.PendingRead)
{
shouldReadComplete = true;
state.Stream = null;
state.ReceiveUserBuffer = null;
}
if (state.ReadState < ReadState.ReadsCompleted)
{
state.ReadState = ReadState.ReadsCompleted;
}
shouldReadComplete = CleanupReadStateAndCheckPending(state, ReadState.ReadsCompleted);

if (state.ShutdownState == ShutdownState.None)
{
Expand Down Expand Up @@ -1051,13 +1066,7 @@ private static uint HandleEventPeerSendAborted(State state, ref StreamEvent evt)
bool shouldComplete = false;
lock (state)
{
if (state.ReadState == ReadState.PendingRead)
{
shouldComplete = true;
state.Stream = null;
state.ReceiveUserBuffer = null;
}
state.ReadState = ReadState.Aborted;
shouldComplete = CleanupReadStateAndCheckPending(state, ReadState.Aborted);
state.ReadErrorCode = (long)evt.Data.PeerSendAborted.ErrorCode;
}

Expand All @@ -1079,16 +1088,7 @@ private static uint HandleEventPeerSendShutdown(State state)
// This event won't occur within the middle of a receive.
if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(state, $"{state.TraceId} Stream completing resettable event source.");

if (state.ReadState == ReadState.PendingRead)
{
shouldComplete = true;
state.Stream = null;
state.ReceiveUserBuffer = null;
}
if (state.ReadState < ReadState.ReadsCompleted)
{
state.ReadState = ReadState.ReadsCompleted;
}
shouldComplete = CleanupReadStateAndCheckPending(state, ReadState.ReadsCompleted);
}

if (shouldComplete)
Expand Down Expand Up @@ -1378,11 +1378,7 @@ private static uint HandleEventConnectionClose(State state)

lock (state)
{
shouldCompleteRead = state.ReadState == ReadState.PendingRead;
if (state.ReadState < ReadState.ReadsCompleted)
{
state.ReadState = ReadState.ConnectionClosed;
}
shouldCompleteRead = CleanupReadStateAndCheckPending(state, ReadState.ConnectionClosed);

if (state.SendState == SendState.None || state.SendState == SendState.Pending)
{
Expand Down Expand Up @@ -1428,15 +1424,47 @@ private static uint HandleEventConnectionClose(State state)
private static Exception GetConnectionAbortedException(State state) =>
ThrowHelper.GetConnectionAbortedException(state.ConnectionState.AbortErrorCode);

private static bool CleanupReadStateAndCheckPending(State state, ReadState finalState)
{
Debug.Assert(finalState >= ReadState.ReadsCompleted, $"Expected final read state, got {finalState}");
Debug.Assert(Monitor.IsEntered(state));

bool shouldComplete = false;
if (state.ReadState == ReadState.PendingRead)
{
shouldComplete = true;
state.Stream = null;
state.ReceiveUserBuffer = null;
state.ReceiveCancellationRegistration.Unregister();
}
if (state.ReadState < ReadState.ReadsCompleted)
{
state.ReadState = finalState;
}
return shouldComplete;
}

// Read state transitions:
//
// None --(data arrives in event RECV)-> IndividualReadComplete --(user calls ReadAsync() & completes syncronously)-> None
// None --(user calls ReadAsync() & waits)-> PendingRead --(data arrives in event RECV & completes user's ReadAsync())-> None
// None --(data arrives in event RECV)-> IndividualReadComplete
// None --(data arrives in event RECV with FIN flag)-> IndividualReadComplete(+FIN)
// None --(0-byte data arrives in event RECV with FIN flag)-> ReadsCompleted
// None --(user calls ReadAsync() & waits)-> PendingRead
//
// IndividualReadComplete --(user calls ReadAsync())-> None
// IndividualReadComplete(+FIN) --(user calls ReadAsync() & consumes only partial data)-> None
// IndividualReadComplete(+FIN) --(user calls ReadAsync() & consumes full data)-> ReadsCompleted
//
// PendingRead --(data arrives in event RECV & completes user's ReadAsync())-> None
// PendingRead --(data arrives in event RECV with FIN flag & completes user's ReadAsync() with only partial data)-> None
// PendingRead --(data arrives in event RECV with FIN flag & completes user's ReadAsync() with full data)-> ReadsCompleted
//
// Any non-final state --(event PEER_SEND_SHUTDOWN or SHUTDOWN_COMPLETED with ConnectionClosed=false)-> ReadsCompleted
// Any non-final state --(event PEER_SEND_ABORT)-> Aborted
// Any non-final state --(user calls AbortRead())-> Aborted
// Any state --(CancellationToken's cancellation for ReadAsync())-> Aborted (TODO: should it be only for non-final as others?)
// Any non-final state --(CancellationToken's cancellation for ReadAsync())-> Aborted
// Any non-final state --(event SHUTDOWN_COMPLETED with ConnectionClosed=true)-> ConnectionClosed
//
// Closed - no transitions, set for Unidirectional write-only streams
private enum ReadState
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ internal abstract class QuicStreamProvider : IDisposable, IAsyncDisposable

internal abstract bool CanRead { get; }

internal abstract bool ReadsCompleted { get; }

internal abstract int ReadTimeout { get; set; }

internal abstract int Read(Span<byte> buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati

public override bool CanRead => _provider.CanRead;

public bool ReadsCompleted => _provider.ReadsCompleted;

public override int Read(Span<byte> buffer) => _provider.Read(buffer);

public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) => _provider.ReadAsync(buffer, cancellationToken);
Expand Down
Loading