From 8f7c2109f6c0e365e13ec0e4075e1d7a220b1c83 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Mon, 11 Apr 2022 15:54:13 +0200 Subject: [PATCH 01/24] Early draft of Open(Uni|Bidi)rectionalStreamAsync --- .../System.Net.Quic/ref/System.Net.Quic.cs | 8 ++-- .../Implementations/Mock/MockConnection.cs | 46 ++++++++++++++++++ .../MsQuic/Interop/MsQuicEnums.cs | 1 + .../MsQuic/MsQuicConnection.cs | 25 ++++++++-- .../Implementations/MsQuic/MsQuicStream.cs | 48 +++++++++++++++++-- .../Implementations/QuicConnectionProvider.cs | 5 +- .../src/System/Net/Quic/QuicConnection.cs | 13 +++++ .../tests/FunctionalTests/MsQuicTests.cs | 1 - 8 files changed, 134 insertions(+), 13 deletions(-) diff --git a/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs b/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs index 0d4e4997cc4894..9f4061e86d8886 100644 --- a/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs +++ b/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs @@ -31,13 +31,15 @@ public void Dispose() { } public int GetRemoteAvailableUnidirectionalStreamCount() { throw null; } public System.Net.Quic.QuicStream OpenBidirectionalStream() { throw null; } public System.Net.Quic.QuicStream OpenUnidirectionalStream() { throw null; } + public System.Threading.Tasks.ValueTask OpenBidirectionalStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public System.Threading.Tasks.ValueTask OpenUnidirectionalStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public System.Security.Cryptography.X509Certificates.X509Certificate? RemoteCertificate { get { throw null; } } public System.Threading.Tasks.ValueTask WaitForAvailableBidirectionalStreamsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public System.Threading.Tasks.ValueTask WaitForAvailableUnidirectionalStreamsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } } public partial class QuicConnectionAbortedException : System.Net.Quic.QuicException { - public QuicConnectionAbortedException(string message, long errorCode) : base (default(string)) { } + public QuicConnectionAbortedException(string message, long errorCode) : base(default(string)) { } public long ErrorCode { get { throw null; } } } public partial class QuicException : System.Exception @@ -71,7 +73,7 @@ public QuicListenerOptions() { } } public partial class QuicOperationAbortedException : System.Net.Quic.QuicException { - public QuicOperationAbortedException(string message) : base (default(string)) { } + public QuicOperationAbortedException(string message) : base(default(string)) { } } public partial class QuicOptions { @@ -125,7 +127,7 @@ public override void WriteByte(byte value) { } } public partial class QuicStreamAbortedException : System.Net.Quic.QuicException { - public QuicStreamAbortedException(string message, long errorCode) : base (default(string)) { } + public QuicStreamAbortedException(string message, long errorCode) : base(default(string)) { } public long ErrorCode { get { throw null; } } } } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs index 5ccb653f7b9938..6f37644ae88150 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs @@ -230,6 +230,52 @@ internal override QuicStreamProvider OpenBidirectionalStream() return OpenStream(streamId, true); } + internal async override ValueTask OpenUnidirectionalStreamAsync(CancellationToken cancellationToken) + { + PeerStreamLimit? streamLimit = RemoteStreamLimit; + if (streamLimit is null) + { + throw new InvalidOperationException("Not connected"); + } + + while (!streamLimit.Unidirectional.TryIncrement()) + { + await WaitForAvailableUnidirectionalStreamsAsync(cancellationToken).ConfigureAwait(false); + } + + long streamId; + lock (_syncObject) + { + streamId = _nextOutboundUnidirectionalStream; + _nextOutboundUnidirectionalStream += 4; + } + + return OpenStream(streamId, false); + } + + internal async override ValueTask OpenBidirectionalStreamAsync(CancellationToken cancellationToken) + { + PeerStreamLimit? streamLimit = RemoteStreamLimit; + if (streamLimit is null) + { + throw new InvalidOperationException("Not connected"); + } + + while (!streamLimit.Bidirectional.TryIncrement()) + { + await WaitForAvailableBidirectionalStreamsAsync(cancellationToken).ConfigureAwait(false); + } + + long streamId; + lock (_syncObject) + { + streamId = _nextOutboundBidirectionalStream; + _nextOutboundBidirectionalStream += 4; + } + + return OpenStream(streamId, true); + } + internal MockStream OpenStream(long streamId, bool bidirectional) { CheckDisposed(); diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Interop/MsQuicEnums.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Interop/MsQuicEnums.cs index 956c425828c46c..8745dd5f57b9ac 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Interop/MsQuicEnums.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Interop/MsQuicEnums.cs @@ -213,6 +213,7 @@ internal enum QUIC_STREAM_EVENT_TYPE : uint SEND_SHUTDOWN_COMPLETE = 6, SHUTDOWN_COMPLETE = 7, IDEAL_SEND_BUFFER_SIZE = 8, + PEER_ACCEPTED = 9 } #if SOCKADDR_HAS_LENGTH diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs index 99dc15398e9f04..052fa1f54813f6 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs @@ -582,7 +582,7 @@ internal override ValueTask WaitForAvailableBidirectionalStreamsAsync(Cancellati return new ValueTask(tcs.Task.WaitAsync(cancellationToken)); } - internal override QuicStreamProvider OpenUnidirectionalStream() + private QuicStreamProvider OpenStream(QUIC_STREAM_OPEN_FLAGS flags) { ThrowIfDisposed(); if (!Connected) @@ -590,10 +590,20 @@ internal override QuicStreamProvider OpenUnidirectionalStream() throw new InvalidOperationException(SR.net_quic_not_connected); } - return new MsQuicStream(_state, QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL); + var stream = new MsQuicStream(_state, flags); + + // should complete synchronously + ValueTask startTask = stream.StartAsync(QUIC_STREAM_START_FLAGS.FAIL_BLOCKED | QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT, default); + startTask.AsTask().GetAwaiter().GetResult(); + Debug.Assert(startTask.IsCompleted); + + return stream; } - internal override QuicStreamProvider OpenBidirectionalStream() + internal override QuicStreamProvider OpenUnidirectionalStream() => OpenStream(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL); + internal override QuicStreamProvider OpenBidirectionalStream() => OpenStream(QUIC_STREAM_OPEN_FLAGS.NONE); + + private async ValueTask OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS flags, CancellationToken cancellationToken) { ThrowIfDisposed(); if (!Connected) @@ -601,9 +611,16 @@ internal override QuicStreamProvider OpenBidirectionalStream() throw new InvalidOperationException(SR.net_quic_not_connected); } - return new MsQuicStream(_state, QUIC_STREAM_OPEN_FLAGS.NONE); + var stream = new MsQuicStream(_state, flags); + + await stream.StartAsync(QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT, cancellationToken).ConfigureAwait(false); + + return stream; } + internal override ValueTask OpenUnidirectionalStreamAsync(CancellationToken cancellationToken = default) => OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL, cancellationToken); + internal override ValueTask OpenBidirectionalStreamAsync(CancellationToken cancellationToken = default) => OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS.NONE, cancellationToken); + internal override int GetRemoteAvailableUnidirectionalStreamCount() { Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)"); diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index baf9b70bf84528..3f7b87715f4ec1 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -76,6 +76,9 @@ private sealed class State // Set once writes have been shutdown. public readonly TaskCompletionSource ShutdownWriteCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + // Set once stream has been started and within peer's advertised stream limits + public readonly TaskCompletionSource StartCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + public ShutdownState ShutdownState; // The value makes sure that we release the handles only once. public int ShutdownDone; @@ -182,10 +185,6 @@ internal MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_F } QuicExceptionHelpers.ThrowIfFailed(status, "Failed to open stream to peer."); - - Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)"); - status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, QUIC_STREAM_START_FLAGS.FAIL_BLOCKED | QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL); - QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream."); } catch { @@ -888,6 +887,31 @@ private void EnableReceive() QuicExceptionHelpers.ThrowIfFailed(status, "StreamReceiveSetEnabled failed."); } + internal async ValueTask StartAsync(QUIC_STREAM_START_FLAGS flags, CancellationToken cancellationToken) + { + Debug.Assert(!Monitor.IsEntered(_state)); + + try + { + cancellationToken.ThrowIfCancellationRequested(); + using CancellationTokenRegistration registration = cancellationToken.UnsafeRegister(static (s, token) => + { + ((State)s!).StartCompletionSource.TrySetCanceled(token); + }, _state); + + uint status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, flags); + QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream."); + + await _state.StartCompletionSource.Task.ConfigureAwait(false); + } + catch + { + _state.Handle?.Dispose(); + _state.StateGCHandle.Free(); + throw; + } + } + /// /// Callback calls for a single instance of a stream are serialized by msquic. /// They happen on a msquic thread and shouldn't take too long to not to block msquic. @@ -944,6 +968,8 @@ private static uint HandleEvent(State state, ref StreamEvent evt) // Shutdown for both sending and receiving is completed. case QUIC_STREAM_EVENT_TYPE.SHUTDOWN_COMPLETE: return HandleEventShutdownComplete(state, ref evt); + case QUIC_STREAM_EVENT_TYPE.PEER_ACCEPTED: + return HandleEventPeerAccepted(state); default: return MsQuicStatusCodes.Success; } @@ -1111,6 +1137,14 @@ private static uint HandleEventStartComplete(State state, ref StreamEvent evt) { // Store the start status code and check it when propagating shutdown event, which we'll get since we set SHUTDOWN_ON_FAIL in StreamStart. state.StartStatus = evt.Data.StartComplete.Status; + if (state.StartStatus != MsQuicStatusCodes.Success) + { + state.StartCompletionSource.TrySetException(ExceptionDispatchInfo.SetCurrentStackTrace(new QuicException($"Stream start failed with {MsQuicStatusCodes.GetError(state.StartStatus)}"))); + } + else if ((evt.Data.StartComplete.PeerAccepted & 1) != 0) + { + state.StartCompletionSource.TrySetResult(); + } return MsQuicStatusCodes.Success; } @@ -1223,6 +1257,12 @@ private static uint HandleEventShutdownComplete(State state, ref StreamEvent evt return MsQuicStatusCodes.Success; } + private static uint HandleEventPeerAccepted(State state) + { + state.StartCompletionSource.TrySetResult(); + return MsQuicStatusCodes.Success; + } + private static uint HandleEventPeerSendAborted(State state, ref StreamEvent evt) { bool shouldComplete = false; diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/QuicConnectionProvider.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/QuicConnectionProvider.cs index 38c0e27a2ab41a..c0cddcb6910cbe 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/QuicConnectionProvider.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/QuicConnectionProvider.cs @@ -24,6 +24,9 @@ internal abstract class QuicConnectionProvider : IDisposable internal abstract QuicStreamProvider OpenBidirectionalStream(); + internal abstract ValueTask OpenUnidirectionalStreamAsync(CancellationToken cancellationToken = default); + internal abstract ValueTask OpenBidirectionalStreamAsync(CancellationToken cancellationToken = default); + internal abstract int GetRemoteAvailableUnidirectionalStreamCount(); internal abstract int GetRemoteAvailableBidirectionalStreamCount(); @@ -32,7 +35,7 @@ internal abstract class QuicConnectionProvider : IDisposable internal abstract System.Net.Security.SslApplicationProtocol NegotiatedApplicationProtocol { get; } - internal abstract System.Security.Cryptography.X509Certificates.X509Certificate? RemoteCertificate { get ; } + internal abstract System.Security.Cryptography.X509Certificates.X509Certificate? RemoteCertificate { get; } internal abstract ValueTask CloseAsync(long errorCode, CancellationToken cancellationToken = default); diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs index f94ee81bfacd4d..8a4b667a2ef943 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs @@ -94,6 +94,19 @@ internal QuicConnection(QuicConnectionProvider provider) /// public QuicStream OpenBidirectionalStream() => new QuicStream(_provider.OpenBidirectionalStream()); + /// + /// Create an outbound unidirectional stream. + /// + /// + public async ValueTask OpenUnidirectionalStreamAsync(CancellationToken cancellationToken = default) => new QuicStream(await _provider.OpenUnidirectionalStreamAsync(cancellationToken).ConfigureAwait(false)); + + /// + /// Create an outbound bidirectional stream. + /// + /// + public async ValueTask OpenBidirectionalStreamAsync(CancellationToken cancellationToken = default) => new QuicStream(await _provider.OpenBidirectionalStreamAsync(cancellationToken).ConfigureAwait(false)); + + /// /// Accept an incoming stream. /// diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs index 9cd0dad78c32cb..64f6059fe49bcf 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs @@ -415,7 +415,6 @@ public async Task WaitForAvailableUnidirectionStreamsAsyncWorks() } [Fact] - [ActiveIssue("https://github.com/dotnet/runtime/issues/67302")] public async Task WaitForAvailableBidirectionStreamsAsyncWorks() { QuicListenerOptions listenerOptions = CreateQuicListenerOptions(); From 3c2b1f7413729f394124e8ede0b9a066cbb81490 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Mon, 11 Apr 2022 16:13:27 +0200 Subject: [PATCH 02/24] Improve exception handling --- .../MsQuic/MsQuicConnection.cs | 23 +++++++++++++++---- .../Implementations/MsQuic/MsQuicStream.cs | 23 ++++++------------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs index 052fa1f54813f6..28e425434b0d74 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs @@ -592,10 +592,15 @@ private QuicStreamProvider OpenStream(QUIC_STREAM_OPEN_FLAGS flags) var stream = new MsQuicStream(_state, flags); - // should complete synchronously - ValueTask startTask = stream.StartAsync(QUIC_STREAM_START_FLAGS.FAIL_BLOCKED | QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT, default); - startTask.AsTask().GetAwaiter().GetResult(); - Debug.Assert(startTask.IsCompleted); + try + { + stream.StartAsync(QUIC_STREAM_START_FLAGS.FAIL_BLOCKED | QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT, default).AsTask().GetAwaiter().GetResult(); + } + catch + { + stream.Dispose(); + throw; + } return stream; } @@ -613,7 +618,15 @@ private async ValueTask OpenStreamAsync(QUIC_STREAM_OPEN_FLA var stream = new MsQuicStream(_state, flags); - await stream.StartAsync(QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT, cancellationToken).ConfigureAwait(false); + try + { + await stream.StartAsync(QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT, cancellationToken).ConfigureAwait(false); + } + catch + { + stream.Dispose(); + throw; + } return stream; } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index 3f7b87715f4ec1..4d0c595641996a 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -891,25 +891,16 @@ internal async ValueTask StartAsync(QUIC_STREAM_START_FLAGS flags, CancellationT { Debug.Assert(!Monitor.IsEntered(_state)); - try + cancellationToken.ThrowIfCancellationRequested(); + using CancellationTokenRegistration registration = cancellationToken.UnsafeRegister(static (s, token) => { - cancellationToken.ThrowIfCancellationRequested(); - using CancellationTokenRegistration registration = cancellationToken.UnsafeRegister(static (s, token) => - { - ((State)s!).StartCompletionSource.TrySetCanceled(token); - }, _state); + ((State)s!).StartCompletionSource.TrySetCanceled(token); + }, _state); - uint status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, flags); - QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream."); + uint status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, flags); + QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream."); - await _state.StartCompletionSource.Task.ConfigureAwait(false); - } - catch - { - _state.Handle?.Dispose(); - _state.StateGCHandle.Free(); - throw; - } + await _state.StartCompletionSource.Task.ConfigureAwait(false); } /// From ed05be754a6998f145702a4c16a1d97e5fbb164a Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Mon, 11 Apr 2022 18:49:12 +0200 Subject: [PATCH 03/24] Remove non-Async versions --- .../System.Net.Quic/ref/System.Net.Quic.cs | 2 - .../Implementations/Mock/MockConnection.cs | 46 ------------------- .../MsQuic/MsQuicConnection.cs | 26 ----------- .../Implementations/QuicConnectionProvider.cs | 4 -- .../src/System/Net/Quic/QuicConnection.cs | 12 ----- .../tests/FunctionalTests/MsQuicTests.cs | 29 ++++++------ .../FunctionalTests/QuicConnectionTests.cs | 6 +-- ...icStreamConnectedStreamConformanceTests.cs | 2 +- .../tests/FunctionalTests/QuicStreamTests.cs | 40 ++++++++-------- .../tests/FunctionalTests/QuicTestBase.cs | 4 +- 10 files changed, 39 insertions(+), 132 deletions(-) diff --git a/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs b/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs index 9f4061e86d8886..7a33daf11c96ee 100644 --- a/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs +++ b/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs @@ -29,8 +29,6 @@ public QuicConnection(System.Net.Quic.QuicClientConnectionOptions options) { } public void Dispose() { } public int GetRemoteAvailableBidirectionalStreamCount() { throw null; } public int GetRemoteAvailableUnidirectionalStreamCount() { throw null; } - public System.Net.Quic.QuicStream OpenBidirectionalStream() { throw null; } - public System.Net.Quic.QuicStream OpenUnidirectionalStream() { throw null; } public System.Threading.Tasks.ValueTask OpenBidirectionalStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public System.Threading.Tasks.ValueTask OpenUnidirectionalStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public System.Security.Cryptography.X509Certificates.X509Certificate? RemoteCertificate { get { throw null; } } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs index 6f37644ae88150..250d1c59f7f316 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs @@ -184,52 +184,6 @@ internal override ValueTask WaitForAvailableBidirectionalStreamsAsync(Cancellati return streamLimit.Bidirectional.WaitForAvailableStreams(cancellationToken); } - internal override QuicStreamProvider OpenUnidirectionalStream() - { - PeerStreamLimit? streamLimit = RemoteStreamLimit; - if (streamLimit is null) - { - throw new InvalidOperationException("Not connected"); - } - - if (!streamLimit.Unidirectional.TryIncrement()) - { - throw new QuicException("No available unidirectional stream"); - } - - long streamId; - lock (_syncObject) - { - streamId = _nextOutboundUnidirectionalStream; - _nextOutboundUnidirectionalStream += 4; - } - - return OpenStream(streamId, false); - } - - internal override QuicStreamProvider OpenBidirectionalStream() - { - PeerStreamLimit? streamLimit = RemoteStreamLimit; - if (streamLimit is null) - { - throw new InvalidOperationException("Not connected"); - } - - if (!streamLimit.Bidirectional.TryIncrement()) - { - throw new QuicException("No available bidirectional stream"); - } - - long streamId; - lock (_syncObject) - { - streamId = _nextOutboundBidirectionalStream; - _nextOutboundBidirectionalStream += 4; - } - - return OpenStream(streamId, true); - } - internal async override ValueTask OpenUnidirectionalStreamAsync(CancellationToken cancellationToken) { PeerStreamLimit? streamLimit = RemoteStreamLimit; diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs index 28e425434b0d74..6c4a35d114f6b7 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs @@ -582,32 +582,6 @@ internal override ValueTask WaitForAvailableBidirectionalStreamsAsync(Cancellati return new ValueTask(tcs.Task.WaitAsync(cancellationToken)); } - private QuicStreamProvider OpenStream(QUIC_STREAM_OPEN_FLAGS flags) - { - ThrowIfDisposed(); - if (!Connected) - { - throw new InvalidOperationException(SR.net_quic_not_connected); - } - - var stream = new MsQuicStream(_state, flags); - - try - { - stream.StartAsync(QUIC_STREAM_START_FLAGS.FAIL_BLOCKED | QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT, default).AsTask().GetAwaiter().GetResult(); - } - catch - { - stream.Dispose(); - throw; - } - - return stream; - } - - internal override QuicStreamProvider OpenUnidirectionalStream() => OpenStream(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL); - internal override QuicStreamProvider OpenBidirectionalStream() => OpenStream(QUIC_STREAM_OPEN_FLAGS.NONE); - private async ValueTask OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS flags, CancellationToken cancellationToken) { ThrowIfDisposed(); diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/QuicConnectionProvider.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/QuicConnectionProvider.cs index c0cddcb6910cbe..72ee5f0d38fe58 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/QuicConnectionProvider.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/QuicConnectionProvider.cs @@ -20,10 +20,6 @@ internal abstract class QuicConnectionProvider : IDisposable internal abstract ValueTask WaitForAvailableBidirectionalStreamsAsync(CancellationToken cancellationToken = default); - internal abstract QuicStreamProvider OpenUnidirectionalStream(); - - internal abstract QuicStreamProvider OpenBidirectionalStream(); - internal abstract ValueTask OpenUnidirectionalStreamAsync(CancellationToken cancellationToken = default); internal abstract ValueTask OpenBidirectionalStreamAsync(CancellationToken cancellationToken = default); diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs index 8a4b667a2ef943..bd72aec8d12570 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs @@ -82,18 +82,6 @@ internal QuicConnection(QuicConnectionProvider provider) /// public ValueTask WaitForAvailableBidirectionalStreamsAsync(CancellationToken cancellationToken = default) => _provider.WaitForAvailableBidirectionalStreamsAsync(cancellationToken); - /// - /// Create an outbound unidirectional stream. - /// - /// - public QuicStream OpenUnidirectionalStream() => new QuicStream(_provider.OpenUnidirectionalStream()); - - /// - /// Create an outbound bidirectional stream. - /// - /// - public QuicStream OpenBidirectionalStream() => new QuicStream(_provider.OpenBidirectionalStream()); - /// /// Create an outbound unidirectional stream. /// diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs index 64f6059fe49bcf..f182254ce460b3 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs @@ -389,7 +389,6 @@ public async Task ConnectWithClientCertificate(bool sendCerttificate) } [Fact] - [ActiveIssue("https://github.com/dotnet/runtime/issues/67302")] public async Task WaitForAvailableUnidirectionStreamsAsyncWorks() { QuicListenerOptions listenerOptions = CreateQuicListenerOptions(); @@ -400,10 +399,10 @@ public async Task WaitForAvailableUnidirectionStreamsAsyncWorks() Assert.True(clientConnection.WaitForAvailableUnidirectionalStreamsAsync().IsCompletedSuccessfully); // Open one stream, should wait till it closes. - QuicStream stream = clientConnection.OpenUnidirectionalStream(); + QuicStream stream = await clientConnection.OpenUnidirectionalStreamAsync(); ValueTask waitTask = clientConnection.WaitForAvailableUnidirectionalStreamsAsync(); Assert.False(waitTask.IsCompleted); - Assert.Throws(() => clientConnection.OpenUnidirectionalStream()); + // Close the streams, the waitTask should finish as a result. stream.Dispose(); QuicStream newStream = await serverConnection.AcceptStreamAsync(); @@ -425,15 +424,15 @@ public async Task WaitForAvailableBidirectionStreamsAsyncWorks() Assert.True(clientConnection.WaitForAvailableBidirectionalStreamsAsync().IsCompletedSuccessfully); // Open one stream, should wait till it closes. - QuicStream stream = clientConnection.OpenBidirectionalStream(); + QuicStream stream = await clientConnection.OpenBidirectionalStreamAsync(); ValueTask waitTask = clientConnection.WaitForAvailableBidirectionalStreamsAsync(); Assert.False(waitTask.IsCompleted); - Assert.Throws(() => clientConnection.OpenBidirectionalStream()); - // Close the streams, the waitTask should finish as a result. + // Close the streams, the waitTask and nextStreamTask should finish as a result. stream.Dispose(); QuicStream newStream = await serverConnection.AcceptStreamAsync(); newStream.Dispose(); + await waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(10)); clientConnection.Dispose(); serverConnection.Dispose(); @@ -461,7 +460,7 @@ public async Task WriteTests(int[][] writes, WriteType writeType) await RunClientServer( async clientConnection => { - await using QuicStream stream = clientConnection.OpenUnidirectionalStream(); + await using QuicStream stream = await clientConnection.OpenUnidirectionalStreamAsync(); foreach (int[] bufferLengths in writes) { @@ -554,7 +553,7 @@ public async Task CallDifferentWriteMethodsWorks() ReadOnlySequence ros = CreateReadOnlySequenceFromBytes(helloWorld.ToArray()); Assert.False(ros.IsSingleSegment); - using QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + using QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); ValueTask writeTask = clientStream.WriteAsync(ros); using QuicStream serverStream = await serverConnection.AcceptStreamAsync(); @@ -700,7 +699,7 @@ await RunClientServer( }, clientFunction: async connection => { - await using QuicStream stream = connection.OpenBidirectionalStream(); + await using QuicStream stream = await connection.OpenBidirectionalStreamAsync(); for (int pos = 0; pos < data.Length; pos += writeSize) { @@ -728,7 +727,7 @@ async Task GetStreamIdWithoutStartWorks() { (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(); - using QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + using QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); Assert.Equal(0, clientStream.StreamId); // TODO: stream that is opened by client but left unaccepted by server may cause AccessViolationException in its Finalizer @@ -748,7 +747,7 @@ async Task GetStreamIdWithoutStartWorks() { (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(); - using QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + using QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); Assert.Equal(0, clientStream.StreamId); // Dispose all connections before the streams; @@ -770,7 +769,7 @@ await Task.Run(async () => { (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(); - await using QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + await using QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); await clientStream.WriteAsync(new byte[1]); await using QuicStream serverStream = await serverConnection.AcceptStreamAsync(); @@ -791,7 +790,7 @@ await Task.Run(async () => { (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(); - await using QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + await using QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); await clientStream.WriteAsync(new byte[1]); await using QuicStream serverStream = await serverConnection.AcceptStreamAsync(); @@ -816,7 +815,7 @@ public async Task BigWrite_SmallRead_Success(bool closeWithData) { byte[] buffer = new byte[1] { 42 }; - QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); Task t = serverConnection.AcceptStreamAsync().AsTask(); await TaskTimeoutExtensions.WhenAllOrAnyFailed(clientStream.WriteAsync(buffer).AsTask(), t, PassingTestTimeoutMilliseconds); QuicStream serverStream = t.Result; @@ -880,7 +879,7 @@ await RunClientServer( }, clientFunction: async connection => { - using QuicStream stream = connection.OpenBidirectionalStream(); + using QuicStream stream = await connection.OpenBidirectionalStreamAsync(); Assert.False(stream.ReadsCompleted); await stream.WriteAsync(s_data, endStream: true); diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs index d4d1a89f11fa48..ac0adc90cd16b0 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs @@ -42,7 +42,7 @@ public async Task TestConnect() private static async Task OpenAndUseStreamAsync(QuicConnection c) { - QuicStream s = c.OpenBidirectionalStream(); + QuicStream s = await c.OpenBidirectionalStreamAsync(); // This will pend await s.ReadAsync(new byte[1]); @@ -199,7 +199,7 @@ public async Task CloseAsync_WithOpenStream_LocalAndPeerStreamsFailWithQuicOpera await RunClientServer( async clientConnection => { - using QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + using QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); await DoWrites(clientStream, writesBeforeClose); // Wait for peer to receive data @@ -246,7 +246,7 @@ public async Task Dispose_WithOpenLocalStream_LocalStreamFailsWithQuicOperationA await RunClientServer( async clientConnection => { - using QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + using QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); await DoWrites(clientStream, writesBeforeClose); // Wait for peer to receive data diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamConnectedStreamConformanceTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamConnectedStreamConformanceTests.cs index 5df77a4ced54af..5b58ee229496ec 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamConnectedStreamConformanceTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamConnectedStreamConformanceTests.cs @@ -95,7 +95,7 @@ await WhenAllOrAnyFailed( listener.ListenEndPoint, GetSslClientAuthenticationOptions()); await connection2.ConnectAsync(); - stream2 = connection2.OpenBidirectionalStream(); + stream2 = await connection2.OpenBidirectionalStreamAsync(); // OpenBidirectionalStream only allocates ID. We will force stream opening // by Writing there and receiving data on the other side. await stream2.WriteAsync(buffer); diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs index d20027f2d09ba6..73a307f92ed510 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs @@ -38,7 +38,7 @@ await RunClientServer( }, clientFunction: async connection => { - await using QuicStream stream = connection.OpenBidirectionalStream(); + await using QuicStream stream = await connection.OpenBidirectionalStreamAsync(); await stream.WriteAsync(s_data, endStream: true); @@ -89,7 +89,7 @@ await RunClientServer( }, clientFunction: async connection => { - await using QuicStream stream = connection.OpenBidirectionalStream(); + await using QuicStream stream = await connection.OpenBidirectionalStreamAsync(); for (int i = 0; i < sendCount; i++) { @@ -135,8 +135,8 @@ await RunClientServer( }, clientFunction: async connection => { - await using QuicStream stream = connection.OpenBidirectionalStream(); - await using QuicStream stream2 = connection.OpenBidirectionalStream(); + await using QuicStream stream = await connection.OpenBidirectionalStreamAsync(); + await using QuicStream stream2 = await connection.OpenBidirectionalStreamAsync(); await stream.WriteAsync(s_data, endStream: true); await stream2.WriteAsync(s_data, endStream: true); @@ -178,7 +178,7 @@ public async Task MultipleConcurrentStreamsOnSingleConnection() static async Task MakeStreams(QuicConnection clientConnection, QuicConnection serverConnection) { byte[] buffer = new byte[64]; - QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); ValueTask writeTask = clientStream.WriteAsync(Encoding.UTF8.GetBytes("PING"), endStream: true); ValueTask acceptTask = serverConnection.AcceptStreamAsync(); await new Task[] { writeTask.AsTask(), acceptTask.AsTask() }.WhenAllOrAnyFailed(PassingTestTimeoutMilliseconds); @@ -195,7 +195,7 @@ public async Task GetStreamIdWithoutStartWorks() using (clientConnection) using (serverConnection) { - using QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + using QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); Assert.Equal(0, clientStream.StreamId); // TODO: stream that is opened by client but left unaccepted by server may cause AccessViolationException in its Finalizer @@ -233,7 +233,7 @@ await RunClientServer( }, clientFunction: async connection => { - await using QuicStream stream = connection.OpenBidirectionalStream(); + await using QuicStream stream = await connection.OpenBidirectionalStreamAsync(); for (int pos = 0; pos < data.Length; pos += writeSize) { @@ -275,7 +275,7 @@ public async Task TestStreams() private static async Task CreateAndTestBidirectionalStream(QuicConnection c1, QuicConnection c2) { - using QuicStream s1 = c1.OpenBidirectionalStream(); + using QuicStream s1 = await c1.OpenBidirectionalStreamAsync(); Assert.True(s1.CanRead); Assert.True(s1.CanWrite); @@ -289,7 +289,7 @@ private static async Task CreateAndTestBidirectionalStream(QuicConnection c1, Qu private static async Task CreateAndTestUnidirectionalStream(QuicConnection c1, QuicConnection c2) { - using QuicStream s1 = c1.OpenUnidirectionalStream(); + using QuicStream s1 = await c1.OpenUnidirectionalStreamAsync(); Assert.False(s1.CanRead); Assert.True(s1.CanWrite); @@ -383,7 +383,7 @@ public async Task ReadWrite_Random_Success(int readSize, int writeSize) await RunClientServer( async clientConnection => { - await using QuicStream clientStream = clientConnection.OpenUnidirectionalStream(); + await using QuicStream clientStream = await clientConnection.OpenUnidirectionalStreamAsync(); ReadOnlyMemory sendBuffer = testBuffer; while (sendBuffer.Length != 0) @@ -511,7 +511,7 @@ public async Task ReadOutstanding_ReadAborted_Throws() byte[] buffer = new byte[1] { 42 }; const int ExpectedErrorCode = 0xfffffff; - QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); Task t = serverConnection.AcceptStreamAsync().AsTask(); await TaskTimeoutExtensions.WhenAllOrAnyFailed(clientStream.WriteAsync(buffer).AsTask(), t, PassingTestTimeoutMilliseconds); QuicStream serverStream = t.Result; @@ -563,7 +563,7 @@ public async Task WriteAbortedWithoutWriting_ReadThrows() await RunClientServer( clientFunction: async connection => { - await using QuicStream stream = connection.OpenUnidirectionalStream(); + await using QuicStream stream = await connection.OpenUnidirectionalStreamAsync(); stream.AbortWrite(expectedErrorCode); }, serverFunction: async connection => @@ -589,7 +589,7 @@ public async Task ReadAbortedWithoutReading_WriteThrows() await RunClientServer( clientFunction: async connection => { - await using QuicStream stream = connection.OpenBidirectionalStream(); + await using QuicStream stream = await connection.OpenBidirectionalStreamAsync(); stream.AbortRead(expectedErrorCode); }, serverFunction: async connection => @@ -613,7 +613,7 @@ public async Task WritePreCanceled_Throws() await RunClientServer( clientFunction: async connection => { - await using QuicStream stream = connection.OpenUnidirectionalStream(); + await using QuicStream stream = await connection.OpenUnidirectionalStreamAsync(); CancellationTokenSource cts = new CancellationTokenSource(); cts.Cancel(); @@ -655,7 +655,7 @@ public async Task WriteCanceled_NextWriteThrows() await RunClientServer( clientFunction: async connection => { - await using QuicStream stream = connection.OpenUnidirectionalStream(); + await using QuicStream stream = await connection.OpenUnidirectionalStreamAsync(); CancellationTokenSource cts = new CancellationTokenSource(500); @@ -712,7 +712,7 @@ public async Task AbortAfterDispose_ProperlyOpenedStream_Success() await RunClientServer( clientFunction: async connection => { - QuicStream stream = connection.OpenBidirectionalStream(); + QuicStream stream = await connection.OpenBidirectionalStreamAsync(); // Force stream to open on the wire await stream.WriteAsync(buffer); await sem.WaitAsync(); @@ -739,9 +739,9 @@ await RunClientServer( public async Task AbortAfterDispose_StreamCreationFlushedByDispose_Success() { await RunClientServer( - clientFunction: connection => + clientFunction: async connection => { - QuicStream stream = connection.OpenBidirectionalStream(); + QuicStream stream = await connection.OpenBidirectionalStreamAsync(); // dispose will flush stream creation on the wire stream.Dispose(); @@ -749,8 +749,6 @@ await RunClientServer( // should not throw ODE on aborting stream.AbortRead(1234); stream.AbortWrite(5675); - - return Task.CompletedTask; }, serverFunction: async connection => { @@ -1005,7 +1003,7 @@ async ValueTask ReleaseOnWriteCompletionAsync() }, clientFunction: async connection => { - await using QuicStream stream = connection.OpenBidirectionalStream(); + await using QuicStream stream = await connection.OpenBidirectionalStreamAsync(); await stream.WriteAsync(new byte[1], endStream: true); diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicTestBase.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicTestBase.cs index c0b8b4a9caf36c..8566d0cd40f52b 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicTestBase.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicTestBase.cs @@ -187,7 +187,7 @@ internal QuicListener CreateQuicListener(IPEndPoint endpoint) internal async Task PingPong(QuicConnection client, QuicConnection server) { - using QuicStream clientStream = client.OpenBidirectionalStream(); + using QuicStream clientStream = await client.OpenBidirectionalStreamAsync(); ValueTask t = clientStream.WriteAsync(s_ping); using QuicStream serverStream = await server.AcceptStreamAsync(); @@ -259,7 +259,7 @@ internal async Task RunStreamClientServer(Func clientFunction, await RunClientServer( clientFunction: async connection => { - await using QuicStream stream = bidi ? connection.OpenBidirectionalStream() : connection.OpenUnidirectionalStream(); + await using QuicStream stream = bidi ? await connection.OpenBidirectionalStreamAsync() : await connection.OpenUnidirectionalStreamAsync(); // Open(Bi|Uni)directionalStream only allocates ID. We will force stream opening // by Writing there and receiving data on the other side. await stream.WriteAsync(buffer); From 08420ff588f128e431ac28db8c8f98532ac71ca0 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Mon, 11 Apr 2022 19:16:01 +0200 Subject: [PATCH 04/24] fixup! Improve exception handling --- .../System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs | 1 + .../src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs index 6c4a35d114f6b7..0d69798bc945d7 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs @@ -590,6 +590,7 @@ private async ValueTask OpenStreamAsync(QUIC_STREAM_OPEN_FLA throw new InvalidOperationException(SR.net_quic_not_connected); } + cancellationToken.ThrowIfCancellationRequested(); var stream = new MsQuicStream(_state, flags); try diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index 4d0c595641996a..58fa23dac6490c 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -894,7 +894,7 @@ internal async ValueTask StartAsync(QUIC_STREAM_START_FLAGS flags, CancellationT cancellationToken.ThrowIfCancellationRequested(); using CancellationTokenRegistration registration = cancellationToken.UnsafeRegister(static (s, token) => { - ((State)s!).StartCompletionSource.TrySetCanceled(token); + ((State)s!).StartCompletionSource.TrySetException(new OperationCanceledException(token)); }, _state); uint status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, flags); From ac3bfbfb18cc956d6ba48f0544f2f31c03af7cef Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Mon, 11 Apr 2022 19:16:14 +0200 Subject: [PATCH 05/24] Add tests --- .../tests/FunctionalTests/MsQuicTests.cs | 121 ++++++++++++++++-- 1 file changed, 107 insertions(+), 14 deletions(-) diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs index f182254ce460b3..cf96100ae5e297 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs @@ -388,19 +388,36 @@ public async Task ConnectWithClientCertificate(bool sendCerttificate) serverConnection.Dispose(); } - [Fact] - public async Task WaitForAvailableUnidirectionStreamsAsyncWorks() + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task WaitForAvailableStreamsAsyncWorks(bool unidirectional) { + ValueTask WaitForAvailableStreamsAsync(QuicConnection connection) + { + return unidirectional + ? connection.WaitForAvailableUnidirectionalStreamsAsync() + : connection.WaitForAvailableBidirectionalStreamsAsync(); + } + + ValueTask OpenStreamAsync(QuicConnection connection) + { + return unidirectional + ? connection.OpenUnidirectionalStreamAsync() + : connection.OpenBidirectionalStreamAsync(); + } + QuicListenerOptions listenerOptions = CreateQuicListenerOptions(); listenerOptions.MaxUnidirectionalStreams = 1; + listenerOptions.MaxBidirectionalStreams = 1; (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(null, listenerOptions); // No stream opened yet, should return immediately. - Assert.True(clientConnection.WaitForAvailableUnidirectionalStreamsAsync().IsCompletedSuccessfully); + Assert.True(WaitForAvailableStreamsAsync(clientConnection).IsCompletedSuccessfully); // Open one stream, should wait till it closes. - QuicStream stream = await clientConnection.OpenUnidirectionalStreamAsync(); - ValueTask waitTask = clientConnection.WaitForAvailableUnidirectionalStreamsAsync(); + QuicStream stream = await OpenStreamAsync(clientConnection); + ValueTask waitTask = WaitForAvailableStreamsAsync(clientConnection); Assert.False(waitTask.IsCompleted); // Close the streams, the waitTask should finish as a result. @@ -413,27 +430,103 @@ public async Task WaitForAvailableUnidirectionStreamsAsyncWorks() serverConnection.Dispose(); } - [Fact] - public async Task WaitForAvailableBidirectionStreamsAsyncWorks() + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task OpenStreamAsync_BlocksUntilAvailable(bool unidirectional) { + ValueTask OpenStreamAsync(QuicConnection connection) + { + return unidirectional + ? connection.OpenUnidirectionalStreamAsync() + : connection.OpenBidirectionalStreamAsync(); + } + QuicListenerOptions listenerOptions = CreateQuicListenerOptions(); + listenerOptions.MaxUnidirectionalStreams = 1; listenerOptions.MaxBidirectionalStreams = 1; (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(null, listenerOptions); - // No stream opened yet, should return immediately. - Assert.True(clientConnection.WaitForAvailableBidirectionalStreamsAsync().IsCompletedSuccessfully); + // Open one stream, second call should block + QuicStream stream = await OpenStreamAsync(clientConnection); + ValueTask waitTask = OpenStreamAsync(clientConnection); + Assert.False(waitTask.IsCompleted); - // Open one stream, should wait till it closes. - QuicStream stream = await clientConnection.OpenBidirectionalStreamAsync(); - ValueTask waitTask = clientConnection.WaitForAvailableBidirectionalStreamsAsync(); + // Close the streams, the waitTask should finish as a result. + stream.Dispose(); + QuicStream newStream = await serverConnection.AcceptStreamAsync(); + newStream.Dispose(); + + newStream = await waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(10)); + newStream.Dispose(); + + clientConnection.Dispose(); + serverConnection.Dispose(); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task OpenStreamAsync_Canceled_Throws(bool unidirectional) + { + ValueTask OpenStreamAsync(QuicConnection connection, CancellationToken token = default) + { + return unidirectional + ? connection.OpenUnidirectionalStreamAsync(token) + : connection.OpenBidirectionalStreamAsync(token); + } + + QuicListenerOptions listenerOptions = CreateQuicListenerOptions(); + listenerOptions.MaxUnidirectionalStreams = 1; + listenerOptions.MaxBidirectionalStreams = 1; + (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(null, listenerOptions); + + CancellationTokenSource cts = new CancellationTokenSource(); + + // Open one stream, second call should block + QuicStream stream = await OpenStreamAsync(clientConnection); + ValueTask waitTask = OpenStreamAsync(clientConnection, cts.Token); Assert.False(waitTask.IsCompleted); - // Close the streams, the waitTask and nextStreamTask should finish as a result. + cts.Cancel(); + + // awaiting the task should throw + var ex = await Assert.ThrowsAsync(async () => await waitTask); + Assert.Equal(cts.Token, ex.CancellationToken); + + // Close the streams, the waitTask should finish as a result. stream.Dispose(); QuicStream newStream = await serverConnection.AcceptStreamAsync(); newStream.Dispose(); - await waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(10)); + // next call should work as intended + newStream = await OpenStreamAsync(clientConnection).AsTask().WaitAsync(TimeSpan.FromSeconds(10)); + newStream.Dispose(); + + clientConnection.Dispose(); + serverConnection.Dispose(); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task OpenStreamAsync_PreCanceled_Throws(bool unidirectional) + { + ValueTask OpenStreamAsync(QuicConnection connection, CancellationToken token = default) + { + return unidirectional + ? connection.OpenUnidirectionalStreamAsync(token) + : connection.OpenBidirectionalStreamAsync(token); + } + + (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(null, CreateQuicListenerOptions()); + + CancellationTokenSource cts = new CancellationTokenSource(); + cts.Cancel(); + + var ex = await Assert.ThrowsAsync(async () => await OpenStreamAsync(clientConnection, cts.Token)); + Assert.Equal(cts.Token, ex.CancellationToken); + clientConnection.Dispose(); serverConnection.Dispose(); } From 43acea1c2d83a462b4fd7aaeb85e64c1c0420b32 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Mon, 11 Apr 2022 20:03:00 +0200 Subject: [PATCH 06/24] fixup! Remove non-Async versions --- .../Net/Http/Http3LoopbackConnection.cs | 18 ++++---- .../SocketsHttpHandler/Http3Connection.cs | 45 +++++++++++-------- 2 files changed, 36 insertions(+), 27 deletions(-) diff --git a/src/libraries/Common/tests/System/Net/Http/Http3LoopbackConnection.cs b/src/libraries/Common/tests/System/Net/Http/Http3LoopbackConnection.cs index 78c49a1aaee13d..347bb985df4169 100644 --- a/src/libraries/Common/tests/System/Net/Http/Http3LoopbackConnection.cs +++ b/src/libraries/Common/tests/System/Net/Http/Http3LoopbackConnection.cs @@ -59,9 +59,9 @@ public override void Dispose() stream.Dispose(); } -// We don't dispose the connection currently, because this causes races when the server connection is closed before -// the client has received and handled all response data. -// See discussion in https://github.com/dotnet/runtime/pull/57223#discussion_r687447832 + // We don't dispose the connection currently, because this causes races when the server connection is closed before + // the client has received and handled all response data. + // See discussion in https://github.com/dotnet/runtime/pull/57223#discussion_r687447832 #if false // Dispose the connection // If we already waited for graceful shutdown from the client, then the connection is already closed and this will simply release the handle. @@ -79,14 +79,14 @@ public async Task CloseAsync(long errorCode) await _connection.CloseAsync(errorCode).ConfigureAwait(false); } - public Http3LoopbackStream OpenUnidirectionalStream() + public async ValueTask OpenUnidirectionalStreamAsync() { - return new Http3LoopbackStream(_connection.OpenUnidirectionalStream()); + return new Http3LoopbackStream(await _connection.OpenUnidirectionalStreamAsync()); } - public Http3LoopbackStream OpenBidirectionalStream() + public async ValueTask OpenBidirectionalStreamAsync() { - return new Http3LoopbackStream(_connection.OpenBidirectionalStream()); + return new Http3LoopbackStream(await _connection.OpenBidirectionalStreamAsync()); } public static int GetRequestId(QuicStream stream) @@ -185,10 +185,10 @@ public async Task AcceptRequestStreamAsync() public async Task EstablishControlStreamAsync() { - _outboundControlStream = OpenUnidirectionalStream(); + _outboundControlStream = await OpenUnidirectionalStreamAsync(); await _outboundControlStream.SendUnidirectionalStreamTypeAsync(Http3LoopbackStream.ControlStream); await _outboundControlStream.SendSettingsFrameAsync(); - } + } public override async Task ReadRequestBodyAsync() { diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs index 39e0d80107b36e..d9421705cc70ef 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs @@ -174,40 +174,49 @@ public async Task SendAsync(HttpRequestMessage request, lon // Allocate an active request QuicStream? quicStream = null; Http3RequestStream? requestStream = null; - ValueTask waitTask = default; try { try { - while (true) + if (_connection != null) { + ValueTask openTask; + bool synchronous = false; + + // unfortunately, the compiler cannot infer that the task is consumed only once +#pragma warning disable CA2012 // ValueTasks instances should only be consumed once lock (SyncObj) { - if (_connection == null) - { - break; - } + openTask = _connection.OpenBidirectionalStreamAsync(cancellationToken); - if (_connection.GetRemoteAvailableBidirectionalStreamCount() > 0) + if (openTask.IsCompleted) { - quicStream = _connection.OpenBidirectionalStream(); + // hot path for synchronous completion: finish while still holding the lock + synchronous = true; + quicStream = openTask.Result; requestStream = new Http3RequestStream(request, this, quicStream); _activeRequests.Add(quicStream, requestStream); - break; } - - waitTask = _connection.WaitForAvailableBidirectionalStreamsAsync(cancellationToken); } - if (HttpTelemetry.Log.IsEnabled() && !waitTask.IsCompleted && queueStartingTimestamp == 0) + if (!synchronous) { - // We avoid logging RequestLeftQueue if a stream was available immediately (synchronously) - queueStartingTimestamp = Stopwatch.GetTimestamp(); - } + // cold path: waiting until a stream is available + if (HttpTelemetry.Log.IsEnabled() && queueStartingTimestamp == 0) + { + queueStartingTimestamp = Stopwatch.GetTimestamp(); + } - // Wait for an available stream (based on QUIC MAX_STREAMS) if there isn't one available yet. - await waitTask.ConfigureAwait(false); + quicStream = await openTask.ConfigureAwait(false); + requestStream = new Http3RequestStream(request, this, quicStream); + + lock (SyncObj) + { + _activeRequests.Add(quicStream, requestStream); + } + } +#pragma warning restore CA2021 } } finally @@ -377,7 +386,7 @@ private async Task SendSettingsAsync() { try { - _clientControl = _connection!.OpenUnidirectionalStream(); + _clientControl = await _connection!.OpenUnidirectionalStreamAsync().ConfigureAwait(false); await _clientControl.WriteAsync(_pool.Settings.Http3SettingsFrame, CancellationToken.None).ConfigureAwait(false); } catch (Exception ex) From fa9fdbfa05e0498fb77676f479571ddbb7a95bd9 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Mon, 11 Apr 2022 20:59:10 +0200 Subject: [PATCH 07/24] handle connection abort --- .../Implementations/MsQuic/MsQuicStream.cs | 29 ++++++++++-- .../tests/FunctionalTests/MsQuicTests.cs | 46 +++++++++++++++++-- 2 files changed, 66 insertions(+), 9 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index 58fa23dac6490c..cc61d258a1cf9e 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -40,7 +40,7 @@ private sealed class State public MsQuicConnection.State ConnectionState = null!; // set in ctor. public string TraceId = null!; // set in ctor. - public uint StartStatus = MsQuicStatusCodes.Success; + public uint StartStatus = unchecked((uint)-1); public ReadState ReadState; @@ -1126,16 +1126,28 @@ private static uint HandleEventPeerRecvAborted(State state, ref StreamEvent evt) private static uint HandleEventStartComplete(State state, ref StreamEvent evt) { - // Store the start status code and check it when propagating shutdown event, which we'll get since we set SHUTDOWN_ON_FAIL in StreamStart. - state.StartStatus = evt.Data.StartComplete.Status; - if (state.StartStatus != MsQuicStatusCodes.Success) + uint status = evt.Data.StartComplete.Status; + + // The way we expose Open(Uni|Bi)directionalStreamAsync operations is that the stream + // is also accepted by the peer (i.e. it is within advertised stream limits). However, + // We may receive START_COMPLETE notification before the stream is accepted, so we defer + // setting state.StartStatus until the stream is accepted or we meet failure along the way. + + if (status != MsQuicStatusCodes.Success) { - state.StartCompletionSource.TrySetException(ExceptionDispatchInfo.SetCurrentStackTrace(new QuicException($"Stream start failed with {MsQuicStatusCodes.GetError(state.StartStatus)}"))); + // Start failed, stream not accepted. Store the start status code and check it + // when propagating shutdown event, which we'll get since we set SHUTDOWN_ON_FAIL + // in StreamStart. + // state.StartCompletionSource will be set when handling shutdown event as well. + state.StartStatus = status; } else if ((evt.Data.StartComplete.PeerAccepted & 1) != 0) { + // Start succeeded and we were within stream limits, stream already usable. + state.StartStatus = status; state.StartCompletionSource.TrySetResult(); } + return MsQuicStatusCodes.Success; } @@ -1250,6 +1262,7 @@ private static uint HandleEventShutdownComplete(State state, ref StreamEvent evt private static uint HandleEventPeerAccepted(State state) { + state.StartStatus = MsQuicStatusCodes.Success; state.StartCompletionSource.TrySetResult(); return MsQuicStatusCodes.Success; } @@ -1618,6 +1631,12 @@ private static uint HandleEventConnectionClose(State state) ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state))); } + if (state.StartStatus != MsQuicStatusCodes.Success) + { + state.StartCompletionSource.TrySetException( + ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state))); + } + // Dispose was called before complete event. bool releaseHandles = Interlocked.Exchange(ref state.ShutdownDone, 2) == 1; if (releaseHandles) diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs index cf96100ae5e297..13069add301b8a 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs @@ -467,7 +467,7 @@ ValueTask OpenStreamAsync(QuicConnection connection) [Theory] [InlineData(false)] [InlineData(true)] - public async Task OpenStreamAsync_Canceled_Throws(bool unidirectional) + public async Task OpenStreamAsync_Canceled_Throws_OperationCanceledException(bool unidirectional) { ValueTask OpenStreamAsync(QuicConnection connection, CancellationToken token = default) { @@ -491,7 +491,7 @@ ValueTask OpenStreamAsync(QuicConnection connection, CancellationTok cts.Cancel(); // awaiting the task should throw - var ex = await Assert.ThrowsAsync(async () => await waitTask); + var ex = await Assert.ThrowsAsync(() => waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(3))); Assert.Equal(cts.Token, ex.CancellationToken); // Close the streams, the waitTask should finish as a result. @@ -510,7 +510,7 @@ ValueTask OpenStreamAsync(QuicConnection connection, CancellationTok [Theory] [InlineData(false)] [InlineData(true)] - public async Task OpenStreamAsync_PreCanceled_Throws(bool unidirectional) + public async Task OpenStreamAsync_PreCanceled_Throws_OperationCanceledException(bool unidirectional) { ValueTask OpenStreamAsync(QuicConnection connection, CancellationToken token = default) { @@ -524,13 +524,51 @@ ValueTask OpenStreamAsync(QuicConnection connection, CancellationTok CancellationTokenSource cts = new CancellationTokenSource(); cts.Cancel(); - var ex = await Assert.ThrowsAsync(async () => await OpenStreamAsync(clientConnection, cts.Token)); + var ex = await Assert.ThrowsAsync(() => OpenStreamAsync(clientConnection, cts.Token).AsTask().WaitAsync(TimeSpan.FromSeconds(3))); Assert.Equal(cts.Token, ex.CancellationToken); clientConnection.Dispose(); serverConnection.Dispose(); } + [Theory] + [InlineData(false, false)] + [InlineData(true, true)] + public async Task OpenStreamAsync_ConnectionAbort_Throws(bool unidirectional, bool localAbort) + { + ValueTask OpenStreamAsync(QuicConnection connection, CancellationToken token = default) + { + return unidirectional + ? connection.OpenUnidirectionalStreamAsync(token) + : connection.OpenBidirectionalStreamAsync(token); + } + + QuicListenerOptions listenerOptions = CreateQuicListenerOptions(); + listenerOptions.MaxUnidirectionalStreams = 1; + listenerOptions.MaxBidirectionalStreams = 1; + (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(null, listenerOptions); + + // Open one stream, second call should block + QuicStream stream = await OpenStreamAsync(clientConnection); + ValueTask waitTask = OpenStreamAsync(clientConnection); + Assert.False(waitTask.IsCompleted); + + if (localAbort) + { + await clientConnection.CloseAsync(0); + await Assert.ThrowsAsync(() => waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(3))); + } + else + { + await serverConnection.CloseAsync(0); + await Assert.ThrowsAsync(() => waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(3))); + } + + clientConnection.Dispose(); + serverConnection.Dispose(); + } + + [Fact] [OuterLoop("May take several seconds")] public async Task SetListenerTimeoutWorksWithSmallTimeout() From 7744b97e4161445be2e26f159fd7024744cb1579 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Tue, 12 Apr 2022 11:21:07 +0200 Subject: [PATCH 08/24] Minor change --- .../MsQuic/MsQuicConnection.cs | 15 +----- .../Implementations/MsQuic/MsQuicStream.cs | 49 +++++++++++------ .../tests/FunctionalTests/MsQuicTests.cs | 54 +++++++------------ 3 files changed, 52 insertions(+), 66 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs index 0d69798bc945d7..ae97c75d48e43a 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs @@ -590,20 +590,7 @@ private async ValueTask OpenStreamAsync(QUIC_STREAM_OPEN_FLA throw new InvalidOperationException(SR.net_quic_not_connected); } - cancellationToken.ThrowIfCancellationRequested(); - var stream = new MsQuicStream(_state, flags); - - try - { - await stream.StartAsync(QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT, cancellationToken).ConfigureAwait(false); - } - catch - { - stream.Dispose(); - throw; - } - - return stream; + return await MsQuicStream.CreateOutbound(_state, cancellationToken).ConfigureAwait(false); } internal override ValueTask OpenUnidirectionalStreamAsync(CancellationToken cancellationToken = default) => OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL, cancellationToken); diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index 6a8f59b2cb22df..40e27243a7da18 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -136,6 +136,7 @@ internal MsQuicStream(MsQuicConnection.State connectionState, SafeMsQuicStreamHa catch { _state.StateGCHandle.Free(); + // don't free the streamHandle, it will be freed by the caller throw; } @@ -149,6 +150,38 @@ internal MsQuicStream(MsQuicConnection.State connectionState, SafeMsQuicStreamHa } } + internal static async ValueTask CreateOutbound(MsQuicConnection.State connectionState, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + var stream = new MsQuicStream(connectionState, flags); + State state = stream._state; + + try + { + Debug.Assert(!Monitor.IsEntered(state)); + + cancellationToken.ThrowIfCancellationRequested(); + using CancellationTokenRegistration registration = cancellationToken.UnsafeRegister(static (s, token) => + { + ((State)s!).StartCompletionSource.TrySetException(new OperationCanceledException(token)); + }, state); + + // Fire of start of the stream + uint status = MsQuicApi.Api.StreamStartDelegate(state.Handle, QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT); + QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream."); + + // wait unit start completes. + await state.StartCompletionSource.Task.ConfigureAwait(false); + } + catch + { + stream.Dispose(); + throw; + } + + return stream; + } + // outbound. internal MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_FLAGS flags) { @@ -891,22 +924,6 @@ private void EnableReceive() QuicExceptionHelpers.ThrowIfFailed(status, "StreamReceiveSetEnabled failed."); } - internal async ValueTask StartAsync(QUIC_STREAM_START_FLAGS flags, CancellationToken cancellationToken) - { - Debug.Assert(!Monitor.IsEntered(_state)); - - cancellationToken.ThrowIfCancellationRequested(); - using CancellationTokenRegistration registration = cancellationToken.UnsafeRegister(static (s, token) => - { - ((State)s!).StartCompletionSource.TrySetException(new OperationCanceledException(token)); - }, _state); - - uint status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, flags); - QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream."); - - await _state.StartCompletionSource.Task.ConfigureAwait(false); - } - /// /// Callback calls for a single instance of a stream are serialized by msquic. /// They happen on a msquic thread and shouldn't take too long to not to block msquic. diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs index 13069add301b8a..7ead0d9eecbe49 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs @@ -393,19 +393,13 @@ public async Task ConnectWithClientCertificate(bool sendCerttificate) [InlineData(true)] public async Task WaitForAvailableStreamsAsyncWorks(bool unidirectional) { - ValueTask WaitForAvailableStreamsAsync(QuicConnection connection) - { - return unidirectional - ? connection.WaitForAvailableUnidirectionalStreamsAsync() - : connection.WaitForAvailableBidirectionalStreamsAsync(); - } + ValueTask WaitForAvailableStreamsAsync(QuicConnection connection) => unidirectional + ? connection.WaitForAvailableUnidirectionalStreamsAsync() + : connection.WaitForAvailableBidirectionalStreamsAsync(); - ValueTask OpenStreamAsync(QuicConnection connection) - { - return unidirectional - ? connection.OpenUnidirectionalStreamAsync() - : connection.OpenBidirectionalStreamAsync(); - } + ValueTask OpenStreamAsync(QuicConnection connection) => unidirectional + ? connection.OpenUnidirectionalStreamAsync() + : connection.OpenBidirectionalStreamAsync(); QuicListenerOptions listenerOptions = CreateQuicListenerOptions(); listenerOptions.MaxUnidirectionalStreams = 1; @@ -435,12 +429,9 @@ ValueTask OpenStreamAsync(QuicConnection connection) [InlineData(true)] public async Task OpenStreamAsync_BlocksUntilAvailable(bool unidirectional) { - ValueTask OpenStreamAsync(QuicConnection connection) - { - return unidirectional - ? connection.OpenUnidirectionalStreamAsync() - : connection.OpenBidirectionalStreamAsync(); - } + ValueTask OpenStreamAsync(QuicConnection connection) => unidirectional + ? connection.OpenUnidirectionalStreamAsync() + : connection.OpenBidirectionalStreamAsync(); QuicListenerOptions listenerOptions = CreateQuicListenerOptions(); listenerOptions.MaxUnidirectionalStreams = 1; @@ -469,12 +460,9 @@ ValueTask OpenStreamAsync(QuicConnection connection) [InlineData(true)] public async Task OpenStreamAsync_Canceled_Throws_OperationCanceledException(bool unidirectional) { - ValueTask OpenStreamAsync(QuicConnection connection, CancellationToken token = default) - { - return unidirectional - ? connection.OpenUnidirectionalStreamAsync(token) - : connection.OpenBidirectionalStreamAsync(token); - } + ValueTask OpenStreamAsync(QuicConnection connection, CancellationToken token = default) => unidirectional + ? connection.OpenUnidirectionalStreamAsync(token) + : connection.OpenBidirectionalStreamAsync(token); QuicListenerOptions listenerOptions = CreateQuicListenerOptions(); listenerOptions.MaxUnidirectionalStreams = 1; @@ -512,12 +500,9 @@ ValueTask OpenStreamAsync(QuicConnection connection, CancellationTok [InlineData(true)] public async Task OpenStreamAsync_PreCanceled_Throws_OperationCanceledException(bool unidirectional) { - ValueTask OpenStreamAsync(QuicConnection connection, CancellationToken token = default) - { - return unidirectional - ? connection.OpenUnidirectionalStreamAsync(token) - : connection.OpenBidirectionalStreamAsync(token); - } + ValueTask OpenStreamAsync(QuicConnection connection, CancellationToken token = default) => unidirectional + ? connection.OpenUnidirectionalStreamAsync(token) + : connection.OpenBidirectionalStreamAsync(token); (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(null, CreateQuicListenerOptions()); @@ -536,12 +521,9 @@ ValueTask OpenStreamAsync(QuicConnection connection, CancellationTok [InlineData(true, true)] public async Task OpenStreamAsync_ConnectionAbort_Throws(bool unidirectional, bool localAbort) { - ValueTask OpenStreamAsync(QuicConnection connection, CancellationToken token = default) - { - return unidirectional - ? connection.OpenUnidirectionalStreamAsync(token) - : connection.OpenBidirectionalStreamAsync(token); - } + ValueTask OpenStreamAsync(QuicConnection connection, CancellationToken token = default) => unidirectional + ? connection.OpenUnidirectionalStreamAsync(token) + : connection.OpenBidirectionalStreamAsync(token); QuicListenerOptions listenerOptions = CreateQuicListenerOptions(); listenerOptions.MaxUnidirectionalStreams = 1; From de1744002957c41d4988d03e6c7b0d7a41a19015 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Tue, 12 Apr 2022 13:51:33 +0200 Subject: [PATCH 09/24] Fix build --- .../System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs | 2 +- .../src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs index ae97c75d48e43a..aa5fbfd2205c6d 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs @@ -590,7 +590,7 @@ private async ValueTask OpenStreamAsync(QUIC_STREAM_OPEN_FLA throw new InvalidOperationException(SR.net_quic_not_connected); } - return await MsQuicStream.CreateOutbound(_state, cancellationToken).ConfigureAwait(false); + return await MsQuicStream.CreateOutbound(_state, flags, cancellationToken).ConfigureAwait(false); } internal override ValueTask OpenUnidirectionalStreamAsync(CancellationToken cancellationToken = default) => OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL, cancellationToken); diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index 40e27243a7da18..9204b39d659d1b 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -150,7 +150,7 @@ internal MsQuicStream(MsQuicConnection.State connectionState, SafeMsQuicStreamHa } } - internal static async ValueTask CreateOutbound(MsQuicConnection.State connectionState, CancellationToken cancellationToken) + internal static async ValueTask CreateOutbound(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_FLAGS flags, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); var stream = new MsQuicStream(connectionState, flags); From 5abe872f2c1bab2b2215758c78e2c8bfd4a9746a Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Wed, 13 Apr 2022 16:04:56 +0200 Subject: [PATCH 10/24] Code review feedback --- .../SocketsHttpHandler/Http3Connection.cs | 39 ++++--------------- .../MsQuic/MsQuicConnection.cs | 4 +- .../Implementations/MsQuic/MsQuicStream.cs | 3 +- 3 files changed, 12 insertions(+), 34 deletions(-) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs index d9421705cc70ef..8abca7075822a9 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs @@ -181,42 +181,19 @@ public async Task SendAsync(HttpRequestMessage request, lon { if (_connection != null) { - ValueTask openTask; - bool synchronous = false; - - // unfortunately, the compiler cannot infer that the task is consumed only once -#pragma warning disable CA2012 // ValueTasks instances should only be consumed once - lock (SyncObj) + if (HttpTelemetry.Log.IsEnabled() && queueStartingTimestamp == 0 && _connection.GetRemoteAvailableBidirectionalStreamCount() == 0) { - openTask = _connection.OpenBidirectionalStreamAsync(cancellationToken); - - if (openTask.IsCompleted) - { - // hot path for synchronous completion: finish while still holding the lock - synchronous = true; - quicStream = openTask.Result; - requestStream = new Http3RequestStream(request, this, quicStream); - _activeRequests.Add(quicStream, requestStream); - } + // the call below will almost certainly block, measure waiting time for telemetry purposes + queueStartingTimestamp = Stopwatch.GetTimestamp(); } - if (!synchronous) - { - // cold path: waiting until a stream is available - if (HttpTelemetry.Log.IsEnabled() && queueStartingTimestamp == 0) - { - queueStartingTimestamp = Stopwatch.GetTimestamp(); - } - - quicStream = await openTask.ConfigureAwait(false); - requestStream = new Http3RequestStream(request, this, quicStream); + quicStream = await _connection.OpenBidirectionalStreamAsync(cancellationToken).ConfigureAwait(false); + requestStream = new Http3RequestStream(request, this, quicStream); - lock (SyncObj) - { - _activeRequests.Add(quicStream, requestStream); - } + lock (SyncObj) + { + _activeRequests.Add(quicStream, requestStream); } -#pragma warning restore CA2021 } } finally diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs index aa5fbfd2205c6d..8c1ca18e645fe2 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs @@ -582,7 +582,7 @@ internal override ValueTask WaitForAvailableBidirectionalStreamsAsync(Cancellati return new ValueTask(tcs.Task.WaitAsync(cancellationToken)); } - private async ValueTask OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS flags, CancellationToken cancellationToken) + private ValueTask OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS flags, CancellationToken cancellationToken) { ThrowIfDisposed(); if (!Connected) @@ -590,7 +590,7 @@ private async ValueTask OpenStreamAsync(QUIC_STREAM_OPEN_FLA throw new InvalidOperationException(SR.net_quic_not_connected); } - return await MsQuicStream.CreateOutbound(_state, flags, cancellationToken).ConfigureAwait(false); + return MsQuicStream.CreateOutbound(_state, flags, cancellationToken); } internal override ValueTask OpenUnidirectionalStreamAsync(CancellationToken cancellationToken = default) => OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL, cancellationToken); diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index 9204b39d659d1b..ec3f04ffa4e8b4 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -150,7 +150,8 @@ internal MsQuicStream(MsQuicConnection.State connectionState, SafeMsQuicStreamHa } } - internal static async ValueTask CreateOutbound(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_FLAGS flags, CancellationToken cancellationToken) + // return QuicStreamProvider to avoid async/await state machine on the caller's side just to perform the cast + internal static async ValueTask CreateOutbound(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_FLAGS flags, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); var stream = new MsQuicStream(connectionState, flags); From 5143c42386f7c4254a390374e2af54f9d64a3ae9 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Wed, 13 Apr 2022 16:07:03 +0200 Subject: [PATCH 11/24] Remove WaitForAvailable* methods --- .../System.Net.Quic/ref/System.Net.Quic.cs | 2 - .../Implementations/Mock/MockConnection.cs | 26 +--- .../MsQuic/MsQuicConnection.cs | 117 ------------------ .../Implementations/QuicConnectionProvider.cs | 5 +- .../src/System/Net/Quic/QuicConnection.cs | 12 -- 5 files changed, 3 insertions(+), 159 deletions(-) diff --git a/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs b/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs index 7a33daf11c96ee..f5175bf7004919 100644 --- a/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs +++ b/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs @@ -32,8 +32,6 @@ public void Dispose() { } public System.Threading.Tasks.ValueTask OpenBidirectionalStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public System.Threading.Tasks.ValueTask OpenUnidirectionalStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public System.Security.Cryptography.X509Certificates.X509Certificate? RemoteCertificate { get { throw null; } } - public System.Threading.Tasks.ValueTask WaitForAvailableBidirectionalStreamsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } - public System.Threading.Tasks.ValueTask WaitForAvailableUnidirectionalStreamsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } } public partial class QuicConnectionAbortedException : System.Net.Quic.QuicException { diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs index 250d1c59f7f316..0a0b6e1edca607 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs @@ -162,28 +162,6 @@ internal override ValueTask ConnectAsync(CancellationToken cancellationToken = d return ValueTask.CompletedTask; } - internal override ValueTask WaitForAvailableUnidirectionalStreamsAsync(CancellationToken cancellationToken = default) - { - PeerStreamLimit? streamLimit = RemoteStreamLimit; - if (streamLimit is null) - { - throw new InvalidOperationException("Not connected"); - } - - return streamLimit.Unidirectional.WaitForAvailableStreams(cancellationToken); - } - - internal override ValueTask WaitForAvailableBidirectionalStreamsAsync(CancellationToken cancellationToken = default) - { - PeerStreamLimit? streamLimit = RemoteStreamLimit; - if (streamLimit is null) - { - throw new InvalidOperationException("Not connected"); - } - - return streamLimit.Bidirectional.WaitForAvailableStreams(cancellationToken); - } - internal async override ValueTask OpenUnidirectionalStreamAsync(CancellationToken cancellationToken) { PeerStreamLimit? streamLimit = RemoteStreamLimit; @@ -194,7 +172,7 @@ internal async override ValueTask OpenUnidirectionalStreamAs while (!streamLimit.Unidirectional.TryIncrement()) { - await WaitForAvailableUnidirectionalStreamsAsync(cancellationToken).ConfigureAwait(false); + await streamLimit.Unidirectional.WaitForAvailableStreams(cancellationToken).ConfigureAwait(false); } long streamId; @@ -217,7 +195,7 @@ internal async override ValueTask OpenBidirectionalStreamAsy while (!streamLimit.Bidirectional.TryIncrement()) { - await WaitForAvailableBidirectionalStreamsAsync(cancellationToken).ConfigureAwait(false); + await streamLimit.Bidirectional.WaitForAvailableStreams(cancellationToken).ConfigureAwait(false); } long streamId; diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs index 8c1ca18e645fe2..e198c75d4052ca 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs @@ -52,12 +52,6 @@ internal sealed class State // TODO: only allocate these when there is an outstanding shutdown. public readonly TaskCompletionSource ShutdownTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - // Note that there's no such thing as resetable TCS, so we cannot reuse the same instance after we've set the result. - // We also cannot use solutions like ManualResetValueTaskSourceCore, since we can have multiple waiters on the same TCS. - // As a result, we allocate a new TCS when needed, which is when someone explicitely asks for them in WaitForAvailableStreamsAsync. - public TaskCompletionSource? NewUnidirectionalStreamsAvailable; - public TaskCompletionSource? NewBidirectionalStreamsAvailable; - public bool Connected; public long AbortErrorCode = -1; public int StreamCount; @@ -320,26 +314,6 @@ private static uint HandleEventShutdownComplete(State state, ref ConnectionEvent // Stop accepting new streams. state.AcceptQueue.Writer.TryComplete(); - // Stop notifying about available streams. - TaskCompletionSource? unidirectionalTcs = null; - TaskCompletionSource? bidirectionalTcs = null; - lock (state) - { - unidirectionalTcs = state.NewUnidirectionalStreamsAvailable; - bidirectionalTcs = state.NewBidirectionalStreamsAvailable; - state.NewUnidirectionalStreamsAvailable = null; - state.NewBidirectionalStreamsAvailable = null; - } - - if (unidirectionalTcs is not null) - { - unidirectionalTcs.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(new QuicOperationAbortedException())); - } - if (bidirectionalTcs is not null) - { - bidirectionalTcs.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(new QuicOperationAbortedException())); - } - return MsQuicStatusCodes.Success; } @@ -358,32 +332,6 @@ private static uint HandleEventNewStream(State state, ref ConnectionEvent connec private static uint HandleEventStreamsAvailable(State state, ref ConnectionEvent connectionEvent) { - TaskCompletionSource? unidirectionalTcs = null; - TaskCompletionSource? bidirectionalTcs = null; - lock (state) - { - if (connectionEvent.Data.StreamsAvailable.UniDirectionalCount > 0) - { - unidirectionalTcs = state.NewUnidirectionalStreamsAvailable; - state.NewUnidirectionalStreamsAvailable = null; - } - - if (connectionEvent.Data.StreamsAvailable.BiDirectionalCount > 0) - { - bidirectionalTcs = state.NewBidirectionalStreamsAvailable; - state.NewBidirectionalStreamsAvailable = null; - } - } - - if (unidirectionalTcs is not null) - { - unidirectionalTcs.SetResult(); - } - if (bidirectionalTcs is not null) - { - bidirectionalTcs.SetResult(); - } - return MsQuicStatusCodes.Success; } @@ -517,71 +465,6 @@ internal override async ValueTask AcceptStreamAsync(Cancella return stream; } - internal override ValueTask WaitForAvailableUnidirectionalStreamsAsync(CancellationToken cancellationToken = default) - { - TaskCompletionSource? tcs = _state.NewUnidirectionalStreamsAvailable; - if (tcs is null) - { - // We need to avoid calling MsQuic under lock. - // This is not atomic but it won't be anyway as counts can change between when task is completed - // and before somebody may try to allocate new stream. - int count = GetRemoteAvailableUnidirectionalStreamCount(); - lock (_state) - { - if (_state.NewUnidirectionalStreamsAvailable is null) - { - if (_state.ShutdownTcs.Task.IsCompleted) - { - throw new QuicOperationAbortedException(); - } - - if (count > 0) - { - return ValueTask.CompletedTask; - } - - _state.NewUnidirectionalStreamsAvailable = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - } - - tcs = _state.NewUnidirectionalStreamsAvailable; - } - } - - return new ValueTask(tcs.Task.WaitAsync(cancellationToken)); - } - - internal override ValueTask WaitForAvailableBidirectionalStreamsAsync(CancellationToken cancellationToken = default) - { - TaskCompletionSource? tcs = _state.NewBidirectionalStreamsAvailable; - if (tcs is null) - { - // We need to avoid calling MsQuic under lock. - // This is not atomic but it won't be anyway as counts can change between when task is completed - // and before somebody may try to allocate new stream. - int count = GetRemoteAvailableBidirectionalStreamCount(); - lock (_state) - { - if (_state.NewBidirectionalStreamsAvailable is null) - { - if (_state.ShutdownTcs.Task.IsCompleted) - { - throw new QuicOperationAbortedException(); - } - - if (count > 0) - { - return ValueTask.CompletedTask; - } - - _state.NewBidirectionalStreamsAvailable = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - } - tcs = _state.NewBidirectionalStreamsAvailable; - } - } - - return new ValueTask(tcs.Task.WaitAsync(cancellationToken)); - } - private ValueTask OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS flags, CancellationToken cancellationToken) { ThrowIfDisposed(); diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/QuicConnectionProvider.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/QuicConnectionProvider.cs index 72ee5f0d38fe58..182c68abe2b8bf 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/QuicConnectionProvider.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/QuicConnectionProvider.cs @@ -16,11 +16,8 @@ internal abstract class QuicConnectionProvider : IDisposable internal abstract ValueTask ConnectAsync(CancellationToken cancellationToken = default); - internal abstract ValueTask WaitForAvailableUnidirectionalStreamsAsync(CancellationToken cancellationToken = default); - - internal abstract ValueTask WaitForAvailableBidirectionalStreamsAsync(CancellationToken cancellationToken = default); - internal abstract ValueTask OpenUnidirectionalStreamAsync(CancellationToken cancellationToken = default); + internal abstract ValueTask OpenBidirectionalStreamAsync(CancellationToken cancellationToken = default); internal abstract int GetRemoteAvailableUnidirectionalStreamCount(); diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs index bd72aec8d12570..74fa392c340b2f 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs @@ -70,18 +70,6 @@ internal QuicConnection(QuicConnectionProvider provider) /// public ValueTask ConnectAsync(CancellationToken cancellationToken = default) => _provider.ConnectAsync(cancellationToken); - /// - /// Waits for available unidirectional stream capacity to be announced by the peer. If any capacity is available, returns immediately. - /// - /// - public ValueTask WaitForAvailableUnidirectionalStreamsAsync(CancellationToken cancellationToken = default) => _provider.WaitForAvailableUnidirectionalStreamsAsync(cancellationToken); - - /// - /// Waits for available bidirectional stream capacity to be announced by the peer. If any capacity is available, returns immediately. - /// - /// - public ValueTask WaitForAvailableBidirectionalStreamsAsync(CancellationToken cancellationToken = default) => _provider.WaitForAvailableBidirectionalStreamsAsync(cancellationToken); - /// /// Create an outbound unidirectional stream. /// From 836a501433b9742d694e42bc400190223026f671 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Wed, 13 Apr 2022 16:08:16 +0200 Subject: [PATCH 12/24] fixup! Remove WaitForAvailable* methods --- .../tests/FunctionalTests/MsQuicTests.cs | 36 ------------------- 1 file changed, 36 deletions(-) diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs index 7ead0d9eecbe49..4fa98b80f7a68e 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs @@ -388,42 +388,6 @@ public async Task ConnectWithClientCertificate(bool sendCerttificate) serverConnection.Dispose(); } - [Theory] - [InlineData(false)] - [InlineData(true)] - public async Task WaitForAvailableStreamsAsyncWorks(bool unidirectional) - { - ValueTask WaitForAvailableStreamsAsync(QuicConnection connection) => unidirectional - ? connection.WaitForAvailableUnidirectionalStreamsAsync() - : connection.WaitForAvailableBidirectionalStreamsAsync(); - - ValueTask OpenStreamAsync(QuicConnection connection) => unidirectional - ? connection.OpenUnidirectionalStreamAsync() - : connection.OpenBidirectionalStreamAsync(); - - QuicListenerOptions listenerOptions = CreateQuicListenerOptions(); - listenerOptions.MaxUnidirectionalStreams = 1; - listenerOptions.MaxBidirectionalStreams = 1; - (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(null, listenerOptions); - - // No stream opened yet, should return immediately. - Assert.True(WaitForAvailableStreamsAsync(clientConnection).IsCompletedSuccessfully); - - // Open one stream, should wait till it closes. - QuicStream stream = await OpenStreamAsync(clientConnection); - ValueTask waitTask = WaitForAvailableStreamsAsync(clientConnection); - Assert.False(waitTask.IsCompleted); - - // Close the streams, the waitTask should finish as a result. - stream.Dispose(); - QuicStream newStream = await serverConnection.AcceptStreamAsync(); - newStream.Dispose(); - - await waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(10)); - clientConnection.Dispose(); - serverConnection.Dispose(); - } - [Theory] [InlineData(false)] [InlineData(true)] From 52f7ee2899596a72c717e0fac78a39e626329786 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Tue, 19 Apr 2022 14:56:57 +0200 Subject: [PATCH 13/24] Remove CreateOuboundAsync static method --- .../MsQuic/MsQuicConnection.cs | 17 +++++- .../Implementations/MsQuic/MsQuicStream.cs | 52 +++++++------------ 2 files changed, 34 insertions(+), 35 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs index e198c75d4052ca..f089778c63e107 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs @@ -465,7 +465,7 @@ internal override async ValueTask AcceptStreamAsync(Cancella return stream; } - private ValueTask OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS flags, CancellationToken cancellationToken) + private async ValueTask OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS flags, CancellationToken cancellationToken) { ThrowIfDisposed(); if (!Connected) @@ -473,7 +473,20 @@ private ValueTask OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS fla throw new InvalidOperationException(SR.net_quic_not_connected); } - return MsQuicStream.CreateOutbound(_state, flags, cancellationToken); + cancellationToken.ThrowIfCancellationRequested(); + var stream = new MsQuicStream(_state, flags); + + try + { + await stream.StartAsync(cancellationToken).ConfigureAwait(false); + } + catch + { + stream.Dispose(); + throw; + } + + return stream; } internal override ValueTask OpenUnidirectionalStreamAsync(CancellationToken cancellationToken = default) => OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL, cancellationToken); diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index ec3f04ffa4e8b4..c8e24202208917 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -150,39 +150,6 @@ internal MsQuicStream(MsQuicConnection.State connectionState, SafeMsQuicStreamHa } } - // return QuicStreamProvider to avoid async/await state machine on the caller's side just to perform the cast - internal static async ValueTask CreateOutbound(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_FLAGS flags, CancellationToken cancellationToken) - { - cancellationToken.ThrowIfCancellationRequested(); - var stream = new MsQuicStream(connectionState, flags); - State state = stream._state; - - try - { - Debug.Assert(!Monitor.IsEntered(state)); - - cancellationToken.ThrowIfCancellationRequested(); - using CancellationTokenRegistration registration = cancellationToken.UnsafeRegister(static (s, token) => - { - ((State)s!).StartCompletionSource.TrySetException(new OperationCanceledException(token)); - }, state); - - // Fire of start of the stream - uint status = MsQuicApi.Api.StreamStartDelegate(state.Handle, QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT); - QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream."); - - // wait unit start completes. - await state.StartCompletionSource.Task.ConfigureAwait(false); - } - catch - { - stream.Dispose(); - throw; - } - - return stream; - } - // outbound. internal MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_FLAGS flags) { @@ -1158,12 +1125,14 @@ private static uint HandleEventStartComplete(State state, ref StreamEvent evt) // in StreamStart. // state.StartCompletionSource will be set when handling shutdown event as well. state.StartStatus = status; + // TODO: can we complete the StartCompletionSource here? } else if ((evt.Data.StartComplete.PeerAccepted & 1) != 0) { // Start succeeded and we were within stream limits, stream already usable. state.StartStatus = status; state.StartCompletionSource.TrySetResult(); + // TODO: comment when the source is completed when PeerAccepted is 0 } return MsQuicStatusCodes.Success; @@ -1688,6 +1657,23 @@ private static bool CleanupReadStateAndCheckPending(State state, ReadState final return shouldComplete; } + internal async Task StartAsync(CancellationToken cancellationToken) + { + Debug.Assert(!Monitor.IsEntered(_state)); + + using CancellationTokenRegistration registration = cancellationToken.UnsafeRegister(static (s, token) => + { + ((State)s!).StartCompletionSource.TrySetException(new OperationCanceledException(token)); + }, _state); + + // Fire of start of the stream + uint status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT); + QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream."); + + // wait unit start completes. + await _state.StartCompletionSource.Task.ConfigureAwait(false); + } + // Read state transitions: // // None --(data arrives in event RECV)-> IndividualReadComplete From 5fab97ea202e586c97810e4c9a2423c826f01deb Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Tue, 19 Apr 2022 15:47:45 +0200 Subject: [PATCH 14/24] Remove StartStatus --- .../Implementations/MsQuic/MsQuicStream.cs | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index c8e24202208917..b2aeab011a4a65 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -40,8 +40,6 @@ private sealed class State public MsQuicConnection.State ConnectionState = null!; // set in ctor. public string TraceId = null!; // set in ctor. - public uint StartStatus = unchecked((uint)-1); - public ReadState ReadState; // set when ReadState.Aborted: @@ -1116,24 +1114,27 @@ private static uint HandleEventStartComplete(State state, ref StreamEvent evt) // The way we expose Open(Uni|Bi)directionalStreamAsync operations is that the stream // is also accepted by the peer (i.e. it is within advertised stream limits). However, // We may receive START_COMPLETE notification before the stream is accepted, so we defer - // setting state.StartStatus until the stream is accepted or we meet failure along the way. + // completing the StartcompletionSource until we get PeerAccepted notification. if (status != MsQuicStatusCodes.Success) { - // Start failed, stream not accepted. Store the start status code and check it - // when propagating shutdown event, which we'll get since we set SHUTDOWN_ON_FAIL - // in StreamStart. - // state.StartCompletionSource will be set when handling shutdown event as well. - state.StartStatus = status; - // TODO: can we complete the StartCompletionSource here? + // Start irrecoverably failed. The possible status codes are: + // - Aborted - connection aborted by peer + // - InvalidState - stream already started before, or connection aborted locally + // - StreamLimitReached - only if QUIC_STREAM_START_FLAG_FAIL_BLOCKED was specified (not in our case). + // + // We disregard duplicate StreamStart calls + Debug.Assert(status == MsQuicStatusCodes.Aborted || status == MsQuicStatusCodes.InvalidState); + state.StartCompletionSource.SetException( + ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state))); } else if ((evt.Data.StartComplete.PeerAccepted & 1) != 0) { // Start succeeded and we were within stream limits, stream already usable. - state.StartStatus = status; - state.StartCompletionSource.TrySetResult(); - // TODO: comment when the source is completed when PeerAccepted is 0 + state.StartCompletionSource.SetResult(); } + // if PeerAccepted == 0, we will later receive PEER_ACCEPTED event, which will + // complete the StartCompletionSource return MsQuicStatusCodes.Success; } @@ -1208,27 +1209,27 @@ private static uint HandleEventShutdownComplete(State state, ref StreamEvent evt if (shouldReadComplete) { - if (state.StartStatus == MsQuicStatusCodes.Success) + if (state.StartCompletionSource.Task.IsCompletedSuccessfully) { state.ReceiveResettableCompletionSource.Complete(0); } else { state.ReceiveResettableCompletionSource.CompleteException( - ExceptionDispatchInfo.SetCurrentStackTrace(new QuicOperationAbortedException($"Stream start failed with {MsQuicStatusCodes.GetError(state.StartStatus)}"))); + ExceptionDispatchInfo.SetCurrentStackTrace(new QuicOperationAbortedException($"Stream start failed"))); } } if (shouldShutdownWriteComplete) { - if (state.StartStatus == MsQuicStatusCodes.Success) + if (state.StartCompletionSource.Task.IsCompletedSuccessfully) { state.ShutdownWriteCompletionSource.SetResult(); } else { state.ShutdownWriteCompletionSource.SetException( - ExceptionDispatchInfo.SetCurrentStackTrace(new QuicOperationAbortedException($"Stream start failed with {MsQuicStatusCodes.GetError(state.StartStatus)}"))); + ExceptionDispatchInfo.SetCurrentStackTrace(new QuicOperationAbortedException($"Stream start failed"))); } } @@ -1249,7 +1250,6 @@ private static uint HandleEventShutdownComplete(State state, ref StreamEvent evt private static uint HandleEventPeerAccepted(State state) { - state.StartStatus = MsQuicStatusCodes.Success; state.StartCompletionSource.TrySetResult(); return MsQuicStatusCodes.Success; } @@ -1618,14 +1618,14 @@ private static uint HandleEventConnectionClose(State state) ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state))); } - if (state.StartStatus != MsQuicStatusCodes.Success) + if (!state.StartCompletionSource.Task.IsCompleted) { - state.StartCompletionSource.TrySetException( + state.StartCompletionSource.SetException( ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state))); } // Dispose was called before complete event. - bool releaseHandles = Interlocked.Exchange(ref state.ShutdownDone, 2) == 1; + bool releaseHandles = Interlocked.Exchange(ref state.ShutdownDone, State.ShutdownDone_NotificationReceived) == State.ShutdownDone_Disposed; if (releaseHandles) { state.Cleanup(); From 95af94f01949b9799329688add3b90aaca7dba96 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Thu, 21 Apr 2022 13:56:17 +0200 Subject: [PATCH 15/24] Code review changes --- .../Net/Quic/Implementations/MsQuic/MsQuicStream.cs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index b2aeab011a4a65..3ec69b93349cd0 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -1124,9 +1124,17 @@ private static uint HandleEventStartComplete(State state, ref StreamEvent evt) // - StreamLimitReached - only if QUIC_STREAM_START_FLAG_FAIL_BLOCKED was specified (not in our case). // // We disregard duplicate StreamStart calls + if (status == MsQuicStatusCodes.Aborted) + { + state.StartCompletionSource.SetException( + ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state))); + } + else + { + state.StartCompletionSource.SetException( + ExceptionDispatchInfo.SetCurrentStackTrace(new QuicException($"StreamStart finished with status{MsQuicStatusCodes.GetError(status)}"))); + } Debug.Assert(status == MsQuicStatusCodes.Aborted || status == MsQuicStatusCodes.InvalidState); - state.StartCompletionSource.SetException( - ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state))); } else if ((evt.Data.StartComplete.PeerAccepted & 1) != 0) { From b51bad8f5cc29d1bde89a3a91168e59f1005b950 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Thu, 21 Apr 2022 16:42:52 +0200 Subject: [PATCH 16/24] More code review changes --- .../Net/Http/SocketsHttpHandler/Http3Connection.cs | 13 ++++++++----- .../Quic/Implementations/MsQuic/MsQuicStream.cs | 14 +++----------- .../tests/FunctionalTests/MsQuicTests.cs | 6 +++--- 3 files changed, 14 insertions(+), 19 deletions(-) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs index 8abca7075822a9..272c78fbcbed20 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs @@ -187,12 +187,15 @@ public async Task SendAsync(HttpRequestMessage request, lon queueStartingTimestamp = Stopwatch.GetTimestamp(); } - quicStream = await _connection.OpenBidirectionalStreamAsync(cancellationToken).ConfigureAwait(false); - requestStream = new Http3RequestStream(request, this, quicStream); - - lock (SyncObj) + quicStream = await _connection?.OpenBidirectionalStreamAsync(cancellationToken).ConfigureAwait(false); + if (quicStream != null) { - _activeRequests.Add(quicStream, requestStream); + requestStream = new Http3RequestStream(request, this, quicStream); + + lock (SyncObj) + { + _activeRequests.Add(quicStream, requestStream); + } } } } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index 3ec69b93349cd0..38341e0504c4fb 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -1132,9 +1132,8 @@ private static uint HandleEventStartComplete(State state, ref StreamEvent evt) else { state.StartCompletionSource.SetException( - ExceptionDispatchInfo.SetCurrentStackTrace(new QuicException($"StreamStart finished with status{MsQuicStatusCodes.GetError(status)}"))); + ExceptionDispatchInfo.SetCurrentStackTrace(new QuicException($"StreamStart finished with status {MsQuicStatusCodes.GetError(status)}"))); } - Debug.Assert(status == MsQuicStatusCodes.Aborted || status == MsQuicStatusCodes.InvalidState); } else if ((evt.Data.StartComplete.PeerAccepted & 1) != 0) { @@ -1665,21 +1664,14 @@ private static bool CleanupReadStateAndCheckPending(State state, ReadState final return shouldComplete; } - internal async Task StartAsync(CancellationToken cancellationToken) + internal ValueTask StartAsync(CancellationToken cancellationToken) { Debug.Assert(!Monitor.IsEntered(_state)); - using CancellationTokenRegistration registration = cancellationToken.UnsafeRegister(static (s, token) => - { - ((State)s!).StartCompletionSource.TrySetException(new OperationCanceledException(token)); - }, _state); - - // Fire of start of the stream uint status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT); QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream."); - // wait unit start completes. - await _state.StartCompletionSource.Task.ConfigureAwait(false); + return new ValueTask(_state.StartCompletionSource.Task.WaitAsync(cancellationToken)); } // Read state transitions: diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs index 4fa98b80f7a68e..532317377120f5 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs @@ -443,7 +443,7 @@ ValueTask OpenStreamAsync(QuicConnection connection, CancellationTok cts.Cancel(); // awaiting the task should throw - var ex = await Assert.ThrowsAsync(() => waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(3))); + var ex = await Assert.ThrowsAnyAsync(() => waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(3))); Assert.Equal(cts.Token, ex.CancellationToken); // Close the streams, the waitTask should finish as a result. @@ -473,7 +473,7 @@ ValueTask OpenStreamAsync(QuicConnection connection, CancellationTok CancellationTokenSource cts = new CancellationTokenSource(); cts.Cancel(); - var ex = await Assert.ThrowsAsync(() => OpenStreamAsync(clientConnection, cts.Token).AsTask().WaitAsync(TimeSpan.FromSeconds(3))); + var ex = await Assert.ThrowsAnyAsync(() => OpenStreamAsync(clientConnection, cts.Token).AsTask().WaitAsync(TimeSpan.FromSeconds(3))); Assert.Equal(cts.Token, ex.CancellationToken); clientConnection.Dispose(); @@ -482,7 +482,7 @@ ValueTask OpenStreamAsync(QuicConnection connection, CancellationTok [Theory] [InlineData(false, false)] - [InlineData(true, true)] + [InlineData(true, true)] // the code path for uni/bidirectional streams differs only in a flag passed to MsQuic, so there is no need to test all possible combinations. public async Task OpenStreamAsync_ConnectionAbort_Throws(bool unidirectional, bool localAbort) { ValueTask OpenStreamAsync(QuicConnection connection, CancellationToken token = default) => unidirectional From bbf1d65a3faf4c46565771db056d7c5b71cd70dc Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Thu, 21 Apr 2022 19:27:36 +0200 Subject: [PATCH 17/24] Fix build --- .../Http/SocketsHttpHandler/Http3Connection.cs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs index 272c78fbcbed20..517090dbdfb488 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs @@ -179,23 +179,21 @@ public async Task SendAsync(HttpRequestMessage request, lon { try { - if (_connection != null) + QuicConnection? conn = _connection; + if (conn != null) { - if (HttpTelemetry.Log.IsEnabled() && queueStartingTimestamp == 0 && _connection.GetRemoteAvailableBidirectionalStreamCount() == 0) + if (HttpTelemetry.Log.IsEnabled() && queueStartingTimestamp == 0 && conn.GetRemoteAvailableBidirectionalStreamCount() == 0) { // the call below will almost certainly block, measure waiting time for telemetry purposes queueStartingTimestamp = Stopwatch.GetTimestamp(); } - quicStream = await _connection?.OpenBidirectionalStreamAsync(cancellationToken).ConfigureAwait(false); - if (quicStream != null) - { - requestStream = new Http3RequestStream(request, this, quicStream); + quicStream = await conn.OpenBidirectionalStreamAsync(cancellationToken).ConfigureAwait(false); - lock (SyncObj) - { - _activeRequests.Add(quicStream, requestStream); - } + requestStream = new Http3RequestStream(request, this, quicStream); + lock (SyncObj) + { + _activeRequests.Add(quicStream, requestStream); } } } From 79ce72b09d3b2bd46f61fc69a90c043563e57a2d Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Fri, 22 Apr 2022 12:09:43 +0200 Subject: [PATCH 18/24] Fix data race on locally closing the connection --- .../src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs index 517090dbdfb488..75d86f29c3ba32 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs @@ -197,6 +197,10 @@ public async Task SendAsync(HttpRequestMessage request, lon } } } + // Swallow any exceptions caused by the connection being closed locally or even disposed due to a race. + // Since quicStream will stay `null`, the code below will throw appropriate exception to retry the request. + catch (ObjectDisposedException) { } + catch (QuicException e) when (!(e is QuicConnectionAbortedException)) { } finally { if (HttpTelemetry.Log.IsEnabled() && queueStartingTimestamp != 0) From 346d025a1865d7a4febf3453013a0cf08a6c720e Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Mon, 25 Apr 2022 15:19:11 +0200 Subject: [PATCH 19/24] Code review feedback --- .../MsQuic/MsQuicConnection.cs | 1 - .../Implementations/MsQuic/MsQuicStream.cs | 24 ++++++++++++++----- .../tests/FunctionalTests/MsQuicTests.cs | 6 ++++- .../FunctionalTests/QuicConnectionTests.cs | 5 ++++ 4 files changed, 28 insertions(+), 8 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs index f089778c63e107..f5d3aba6ee878f 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs @@ -473,7 +473,6 @@ private async ValueTask OpenStreamAsync(QUIC_STREAM_OPEN_FLA throw new InvalidOperationException(SR.net_quic_not_connected); } - cancellationToken.ThrowIfCancellationRequested(); var stream = new MsQuicStream(_state, flags); try diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index 38341e0504c4fb..14b1110d819be6 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -114,6 +114,9 @@ internal MsQuicStream(MsQuicConnection.State connectionState, SafeMsQuicStreamHa // but after TryAddStream to prevent unnecessary RemoveStream in finalizer _state.ConnectionState = connectionState; + // Inbound streams are already started + _state.StartCompletionSource.SetResult(); + _state.Handle = streamHandle; _canRead = true; _canWrite = !flags.HasFlag(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL); @@ -1123,22 +1126,23 @@ private static uint HandleEventStartComplete(State state, ref StreamEvent evt) // - InvalidState - stream already started before, or connection aborted locally // - StreamLimitReached - only if QUIC_STREAM_START_FLAG_FAIL_BLOCKED was specified (not in our case). // - // We disregard duplicate StreamStart calls if (status == MsQuicStatusCodes.Aborted) { - state.StartCompletionSource.SetException( + state.StartCompletionSource.TrySetException( ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state))); } else { - state.StartCompletionSource.SetException( + // TODO: Should we throw QuicOperationAbortedException when status is InvalidState? + // [ActiveIssue("https://github.com/dotnet/runtime/issues/55619")] + state.StartCompletionSource.TrySetException( ExceptionDispatchInfo.SetCurrentStackTrace(new QuicException($"StreamStart finished with status {MsQuicStatusCodes.GetError(status)}"))); } } else if ((evt.Data.StartComplete.PeerAccepted & 1) != 0) { // Start succeeded and we were within stream limits, stream already usable. - state.StartCompletionSource.SetResult(); + state.StartCompletionSource.TrySetResult(); } // if PeerAccepted == 0, we will later receive PEER_ACCEPTED event, which will // complete the StartCompletionSource @@ -1245,6 +1249,10 @@ private static uint HandleEventShutdownComplete(State state, ref StreamEvent evt state.ShutdownCompletionSource.SetResult(); } + // If we are receiving stream shutdown notification, the start comletion source must have been already completed + // eihter by StreamOpen or PeerAccepted event, Connection closing, or it was cancelled by user. + Debug.Assert(state.StartCompletionSource.Task.IsCompleted); + // Dispose was called before complete event. bool releaseHandles = Interlocked.Exchange(ref state.ShutdownDone, State.ShutdownDone_NotificationReceived) == State.ShutdownDone_Disposed; if (releaseHandles) @@ -1664,14 +1672,18 @@ private static bool CleanupReadStateAndCheckPending(State state, ReadState final return shouldComplete; } - internal ValueTask StartAsync(CancellationToken cancellationToken) + internal async ValueTask StartAsync(CancellationToken cancellationToken) { Debug.Assert(!Monitor.IsEntered(_state)); + using var registration = cancellationToken.UnsafeRegister((state, token) => { + ((State)state!).StartCompletionSource.SetCanceled(token); + }, _state); + uint status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT); QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream."); - return new ValueTask(_state.StartCompletionSource.Task.WaitAsync(cancellationToken)); + await _state.StartCompletionSource.Task.ConfigureAwait(false); } // Read state transitions: diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs index 532317377120f5..0baa8dc3c9e708 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs @@ -502,7 +502,11 @@ ValueTask OpenStreamAsync(QuicConnection connection, CancellationTok if (localAbort) { await clientConnection.CloseAsync(0); - await Assert.ThrowsAsync(() => waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(3))); + // TODO: This may not always throw QuicOperationAbortedException due to a data race with MsQuic worker threads + // (CloseAsync may be processed before OpenStreamAsync as it is scheduled to the front of the operation queue) + // To be revisited once we standartize on exceptions. + // [ActiveIssue("https://github.com/dotnet/runtime/issues/55619")] + await Assert.ThrowsAnyAsync(() => waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(3))); } else { diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs index ac0adc90cd16b0..de0feab7712dab 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs @@ -115,6 +115,11 @@ await RunClientServer( // Pending ops should fail await Assert.ThrowsAsync(() => acceptTask); + + // TODO: This may not always throw QuicOperationAbortedException due to a data race with MsQuic worker threads + // (CloseAsync may be processed before OpenStreamAsync as it is scheduled to the front of the operation queue) + // To be revisited once we standartize on exceptions. + // [ActiveIssue("https://github.com/dotnet/runtime/issues/55619")] await Assert.ThrowsAsync(() => connectTask); // Subsequent attempts should fail From 2c299dcdadfe982e0a55a168d5ad17b2ccc29982 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Mon, 25 Apr 2022 15:29:52 +0200 Subject: [PATCH 20/24] Remove PInvoke for reporting telemetry --- .../src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs index 75d86f29c3ba32..148cae3bf09070 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs @@ -182,7 +182,7 @@ public async Task SendAsync(HttpRequestMessage request, lon QuicConnection? conn = _connection; if (conn != null) { - if (HttpTelemetry.Log.IsEnabled() && queueStartingTimestamp == 0 && conn.GetRemoteAvailableBidirectionalStreamCount() == 0) + if (HttpTelemetry.Log.IsEnabled() && queueStartingTimestamp == 0) { // the call below will almost certainly block, measure waiting time for telemetry purposes queueStartingTimestamp = Stopwatch.GetTimestamp(); From 253d09ec9f70820ab1f481a768751472f633e92f Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Tue, 26 Apr 2022 17:54:25 +0200 Subject: [PATCH 21/24] Minor fixes --- .../Quic/Implementations/MsQuic/MsQuicStream.cs | 14 ++++++++++---- .../tests/FunctionalTests/QuicConnectionTests.cs | 2 +- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index 14b1110d819be6..5aa6313789109c 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -1635,7 +1635,7 @@ private static uint HandleEventConnectionClose(State state) if (!state.StartCompletionSource.Task.IsCompleted) { - state.StartCompletionSource.SetException( + state.StartCompletionSource.TrySetException( ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state))); } @@ -1676,12 +1676,18 @@ internal async ValueTask StartAsync(CancellationToken cancellationToken) { Debug.Assert(!Monitor.IsEntered(_state)); - using var registration = cancellationToken.UnsafeRegister((state, token) => { - ((State)state!).StartCompletionSource.SetCanceled(token); + using var registration = cancellationToken.UnsafeRegister((state, token) => + { + ((State)state!).StartCompletionSource.TrySetCanceled(token); }, _state); uint status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT); - QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream."); + + if (!MsQuicStatusHelper.SuccessfulStatusCode(status)) + { + _state.StartCompletionSource.TrySetException(QuicExceptionHelpers.CreateExceptionForHResult(status, "Could not start stream.")); + throw QuicExceptionHelpers.CreateExceptionForHResult(status, "Could not start stream."); + } await _state.StartCompletionSource.Task.ConfigureAwait(false); } diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs index de0feab7712dab..4348e7aaa6ad7a 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs @@ -120,7 +120,7 @@ await RunClientServer( // (CloseAsync may be processed before OpenStreamAsync as it is scheduled to the front of the operation queue) // To be revisited once we standartize on exceptions. // [ActiveIssue("https://github.com/dotnet/runtime/issues/55619")] - await Assert.ThrowsAsync(() => connectTask); + await Assert.ThrowsAnyAsync(() => connectTask); // Subsequent attempts should fail // TODO: Should these be QuicOperationAbortedException, to match above? Or vice-versa? From 4b0935731cfbfe5fe97ff23797e71c5ef1c16289 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Tue, 26 Apr 2022 17:55:16 +0200 Subject: [PATCH 22/24] fixup! Minor fixes --- .../src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs index 148cae3bf09070..f64fc6bc6200a2 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs @@ -184,7 +184,6 @@ public async Task SendAsync(HttpRequestMessage request, lon { if (HttpTelemetry.Log.IsEnabled() && queueStartingTimestamp == 0) { - // the call below will almost certainly block, measure waiting time for telemetry purposes queueStartingTimestamp = Stopwatch.GetTimestamp(); } From 91e1d33784fce4a9f05bdb4bac092aba0f106b05 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Tue, 26 Apr 2022 22:06:59 +0200 Subject: [PATCH 23/24] Regenerate ref source --- .../System.Net.Quic/ref/System.Net.Quic.cs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs b/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs index f5175bf7004919..e42d4f40c9dd10 100644 --- a/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs +++ b/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs @@ -22,6 +22,7 @@ public QuicConnection(System.Net.Quic.QuicClientConnectionOptions options) { } public bool Connected { get { throw null; } } public System.Net.IPEndPoint? LocalEndPoint { get { throw null; } } public System.Net.Security.SslApplicationProtocol NegotiatedApplicationProtocol { get { throw null; } } + public System.Security.Cryptography.X509Certificates.X509Certificate? RemoteCertificate { get { throw null; } } public System.Net.EndPoint RemoteEndPoint { get { throw null; } } public System.Threading.Tasks.ValueTask AcceptStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public System.Threading.Tasks.ValueTask CloseAsync(long errorCode, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } @@ -31,11 +32,10 @@ public void Dispose() { } public int GetRemoteAvailableUnidirectionalStreamCount() { throw null; } public System.Threading.Tasks.ValueTask OpenBidirectionalStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public System.Threading.Tasks.ValueTask OpenUnidirectionalStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } - public System.Security.Cryptography.X509Certificates.X509Certificate? RemoteCertificate { get { throw null; } } } public partial class QuicConnectionAbortedException : System.Net.Quic.QuicException { - public QuicConnectionAbortedException(string message, long errorCode) : base(default(string)) { } + public QuicConnectionAbortedException(string message, long errorCode) : base (default(string)) { } public long ErrorCode { get { throw null; } } } public partial class QuicException : System.Exception @@ -69,7 +69,7 @@ public QuicListenerOptions() { } } public partial class QuicOperationAbortedException : System.Net.Quic.QuicException { - public QuicOperationAbortedException(string message) : base(default(string)) { } + public QuicOperationAbortedException(string message) : base (default(string)) { } } public partial class QuicOptions { @@ -83,12 +83,14 @@ public sealed partial class QuicStream : System.IO.Stream internal QuicStream() { } public override bool CanRead { get { throw null; } } public override bool CanSeek { get { throw null; } } - public override bool CanWrite { get { throw null; } } public override bool CanTimeout { get { throw null; } } + public override bool CanWrite { get { throw null; } } public override long Length { get { throw null; } } public override long Position { get { throw null; } set { } } public bool ReadsCompleted { get { throw null; } } + public override int ReadTimeout { get { throw null; } set { } } public long StreamId { get { throw null; } } + public override int WriteTimeout { get { throw null; } set { } } public void AbortRead(long errorCode) { } public void AbortWrite(long errorCode) { } public override System.IAsyncResult BeginRead(byte[] buffer, int offset, int count, System.AsyncCallback? callback, object? state) { throw null; } @@ -100,10 +102,9 @@ public override void Flush() { } public override System.Threading.Tasks.Task FlushAsync(System.Threading.CancellationToken cancellationToken) { throw null; } public override int Read(byte[] buffer, int offset, int count) { throw null; } public override int Read(System.Span buffer) { throw null; } - public override int ReadByte() { throw null; } public override System.Threading.Tasks.Task ReadAsync(byte[] buffer, int offset, int count, System.Threading.CancellationToken cancellationToken) { throw null; } public override System.Threading.Tasks.ValueTask ReadAsync(System.Memory buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } - public override int ReadTimeout { get { throw null; } set { } } + public override int ReadByte() { throw null; } public override long Seek(long offset, System.IO.SeekOrigin origin) { throw null; } public override void SetLength(long value) { } public void Shutdown() { } @@ -111,7 +112,6 @@ public void Shutdown() { } public System.Threading.Tasks.ValueTask WaitForWriteCompletionAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public override void Write(byte[] buffer, int offset, int count) { } public override void Write(System.ReadOnlySpan buffer) { } - public override void WriteByte(byte value) { } public System.Threading.Tasks.ValueTask WriteAsync(System.Buffers.ReadOnlySequence buffers, bool endStream, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public System.Threading.Tasks.ValueTask WriteAsync(System.Buffers.ReadOnlySequence buffers, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public override System.Threading.Tasks.Task WriteAsync(byte[] buffer, int offset, int count, System.Threading.CancellationToken cancellationToken) { throw null; } @@ -119,11 +119,11 @@ public override void WriteByte(byte value) { } public override System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory> buffers, bool endStream, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory> buffers, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } - public override int WriteTimeout { get { throw null; } set { } } + public override void WriteByte(byte value) { } } public partial class QuicStreamAbortedException : System.Net.Quic.QuicException { - public QuicStreamAbortedException(string message, long errorCode) : base(default(string)) { } + public QuicStreamAbortedException(string message, long errorCode) : base (default(string)) { } public long ErrorCode { get { throw null; } } } } From 9ce1f0722e29eea684cfbec791467671515471e9 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Wed, 27 Apr 2022 09:08:40 +0200 Subject: [PATCH 24/24] Fix exception type in test --- .../tests/FunctionalTests/QuicConnectionTests.cs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs index 4348e7aaa6ad7a..644245d340d97d 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs @@ -74,7 +74,11 @@ await RunClientServer( // Pending ops should fail await Assert.ThrowsAsync(() => acceptTask); - await Assert.ThrowsAsync(() => connectTask); + // TODO: This may not always throw QuicOperationAbortedException due to a data race with MsQuic worker threads + // (CloseAsync may be processed before OpenStreamAsync as it is scheduled to the front of the operation queue) + // To be revisited once we standartize on exceptions. + // [ActiveIssue("https://github.com/dotnet/runtime/issues/55619")] + await Assert.ThrowsAnyAsync(() => connectTask); // Subsequent attempts should fail // TODO: Which exception is correct?