Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous opening of QuicStreams #67859

Merged
merged 25 commits into from
Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ public override void Dispose()
stream.Dispose();
}

// We don't dispose the connection currently, because this causes races when the server connection is closed before
// the client has received and handled all response data.
// See discussion in https://github.com/dotnet/runtime/pull/57223#discussion_r687447832
// We don't dispose the connection currently, because this causes races when the server connection is closed before
// the client has received and handled all response data.
// See discussion in https://github.com/dotnet/runtime/pull/57223#discussion_r687447832
#if false
// Dispose the connection
// If we already waited for graceful shutdown from the client, then the connection is already closed and this will simply release the handle.
Expand All @@ -79,14 +79,14 @@ public async Task CloseAsync(long errorCode)
await _connection.CloseAsync(errorCode).ConfigureAwait(false);
}

public Http3LoopbackStream OpenUnidirectionalStream()
public async ValueTask<Http3LoopbackStream> OpenUnidirectionalStreamAsync()
{
return new Http3LoopbackStream(_connection.OpenUnidirectionalStream());
return new Http3LoopbackStream(await _connection.OpenUnidirectionalStreamAsync());
}

public Http3LoopbackStream OpenBidirectionalStream()
public async ValueTask<Http3LoopbackStream> OpenBidirectionalStreamAsync()
{
return new Http3LoopbackStream(_connection.OpenBidirectionalStream());
return new Http3LoopbackStream(await _connection.OpenBidirectionalStreamAsync());
}

public static int GetRequestId(QuicStream stream)
Expand Down Expand Up @@ -185,10 +185,10 @@ public async Task<Http3LoopbackStream> AcceptRequestStreamAsync()

public async Task EstablishControlStreamAsync()
{
_outboundControlStream = OpenUnidirectionalStream();
_outboundControlStream = await OpenUnidirectionalStreamAsync();
await _outboundControlStream.SendUnidirectionalStreamTypeAsync(Http3LoopbackStream.ControlStream);
await _outboundControlStream.SendSettingsFrameAsync();
}
}

public override async Task<byte[]> ReadRequestBodyAsync()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,42 +174,32 @@ public async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, lon
// Allocate an active request
QuicStream? quicStream = null;
Http3RequestStream? requestStream = null;
ValueTask waitTask = default;

try
{
try
{
while (true)
QuicConnection? conn = _connection;
if (conn != null)
{
lock (SyncObj)
if (HttpTelemetry.Log.IsEnabled() && queueStartingTimestamp == 0)
{
if (_connection == null)
{
break;
}

if (_connection.GetRemoteAvailableBidirectionalStreamCount() > 0)
{
quicStream = _connection.OpenBidirectionalStream();
requestStream = new Http3RequestStream(request, this, quicStream);
_activeRequests.Add(quicStream, requestStream);
break;
}

waitTask = _connection.WaitForAvailableBidirectionalStreamsAsync(cancellationToken);
queueStartingTimestamp = Stopwatch.GetTimestamp();
}

if (HttpTelemetry.Log.IsEnabled() && !waitTask.IsCompleted && queueStartingTimestamp == 0)
quicStream = await conn.OpenBidirectionalStreamAsync(cancellationToken).ConfigureAwait(false);

requestStream = new Http3RequestStream(request, this, quicStream);
lock (SyncObj)
{
// We avoid logging RequestLeftQueue if a stream was available immediately (synchronously)
queueStartingTimestamp = Stopwatch.GetTimestamp();
_activeRequests.Add(quicStream, requestStream);
}

// Wait for an available stream (based on QUIC MAX_STREAMS) if there isn't one available yet.
await waitTask.ConfigureAwait(false);
}
}
// Swallow any exceptions caused by the connection being closed locally or even disposed due to a race.
// Since quicStream will stay `null`, the code below will throw appropriate exception to retry the request.
catch (ObjectDisposedException) { }
catch (QuicException e) when (!(e is QuicConnectionAbortedException)) { }
finally
{
if (HttpTelemetry.Log.IsEnabled() && queueStartingTimestamp != 0)
Expand Down Expand Up @@ -377,7 +367,7 @@ private async Task SendSettingsAsync()
{
try
{
_clientControl = _connection!.OpenUnidirectionalStream();
_clientControl = await _connection!.OpenUnidirectionalStreamAsync().ConfigureAwait(false);
await _clientControl.WriteAsync(_pool.Settings.Http3SettingsFrame, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception ex)
Expand Down
18 changes: 8 additions & 10 deletions src/libraries/System.Net.Quic/ref/System.Net.Quic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,16 @@ public QuicConnection(System.Net.Quic.QuicClientConnectionOptions options) { }
public bool Connected { get { throw null; } }
public System.Net.IPEndPoint? LocalEndPoint { get { throw null; } }
public System.Net.Security.SslApplicationProtocol NegotiatedApplicationProtocol { get { throw null; } }
public System.Security.Cryptography.X509Certificates.X509Certificate? RemoteCertificate { get { throw null; } }
public System.Net.EndPoint RemoteEndPoint { get { throw null; } }
public System.Threading.Tasks.ValueTask<System.Net.Quic.QuicStream> AcceptStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Threading.Tasks.ValueTask CloseAsync(long errorCode, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Threading.Tasks.ValueTask ConnectAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public void Dispose() { }
public int GetRemoteAvailableBidirectionalStreamCount() { throw null; }
public int GetRemoteAvailableUnidirectionalStreamCount() { throw null; }
public System.Net.Quic.QuicStream OpenBidirectionalStream() { throw null; }
public System.Net.Quic.QuicStream OpenUnidirectionalStream() { throw null; }
public System.Security.Cryptography.X509Certificates.X509Certificate? RemoteCertificate { get { throw null; } }
public System.Threading.Tasks.ValueTask WaitForAvailableBidirectionalStreamsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Threading.Tasks.ValueTask WaitForAvailableUnidirectionalStreamsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Threading.Tasks.ValueTask<System.Net.Quic.QuicStream> OpenBidirectionalStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Threading.Tasks.ValueTask<System.Net.Quic.QuicStream> OpenUnidirectionalStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
public partial class QuicConnectionAbortedException : System.Net.Quic.QuicException
{
Expand Down Expand Up @@ -85,12 +83,14 @@ public sealed partial class QuicStream : System.IO.Stream
internal QuicStream() { }
public override bool CanRead { get { throw null; } }
public override bool CanSeek { get { throw null; } }
public override bool CanWrite { get { throw null; } }
public override bool CanTimeout { get { throw null; } }
public override bool CanWrite { get { throw null; } }
public override long Length { get { throw null; } }
public override long Position { get { throw null; } set { } }
public bool ReadsCompleted { get { throw null; } }
public override int ReadTimeout { get { throw null; } set { } }
public long StreamId { get { throw null; } }
public override int WriteTimeout { get { throw null; } set { } }
public void AbortRead(long errorCode) { }
public void AbortWrite(long errorCode) { }
public override System.IAsyncResult BeginRead(byte[] buffer, int offset, int count, System.AsyncCallback? callback, object? state) { throw null; }
Expand All @@ -102,26 +102,24 @@ public override void Flush() { }
public override System.Threading.Tasks.Task FlushAsync(System.Threading.CancellationToken cancellationToken) { throw null; }
public override int Read(byte[] buffer, int offset, int count) { throw null; }
public override int Read(System.Span<byte> buffer) { throw null; }
public override int ReadByte() { throw null; }
public override System.Threading.Tasks.Task<int> ReadAsync(byte[] buffer, int offset, int count, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.ValueTask<int> ReadAsync(System.Memory<byte> buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override int ReadTimeout { get { throw null; } set { } }
public override int ReadByte() { throw null; }
public override long Seek(long offset, System.IO.SeekOrigin origin) { throw null; }
public override void SetLength(long value) { }
public void Shutdown() { }
public System.Threading.Tasks.ValueTask ShutdownCompleted(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Threading.Tasks.ValueTask WaitForWriteCompletionAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override void Write(byte[] buffer, int offset, int count) { }
public override void Write(System.ReadOnlySpan<byte> buffer) { }
public override void WriteByte(byte value) { }
public System.Threading.Tasks.ValueTask WriteAsync(System.Buffers.ReadOnlySequence<byte> buffers, bool endStream, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Threading.Tasks.ValueTask WriteAsync(System.Buffers.ReadOnlySequence<byte> buffers, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override System.Threading.Tasks.Task WriteAsync(byte[] buffer, int offset, int count, System.Threading.CancellationToken cancellationToken) { throw null; }
public System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory<byte> buffer, bool endStream, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory<byte> buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory<System.ReadOnlyMemory<byte>> buffers, bool endStream, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory<System.ReadOnlyMemory<byte>> buffers, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override int WriteTimeout { get { throw null; } set { } }
public override void WriteByte(byte value) { }
}
public partial class QuicStreamAbortedException : System.Net.Quic.QuicException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,39 +162,17 @@ internal override ValueTask ConnectAsync(CancellationToken cancellationToken = d
return ValueTask.CompletedTask;
}

internal override ValueTask WaitForAvailableUnidirectionalStreamsAsync(CancellationToken cancellationToken = default)
internal async override ValueTask<QuicStreamProvider> OpenUnidirectionalStreamAsync(CancellationToken cancellationToken)
{
PeerStreamLimit? streamLimit = RemoteStreamLimit;
if (streamLimit is null)
{
throw new InvalidOperationException("Not connected");
}

return streamLimit.Unidirectional.WaitForAvailableStreams(cancellationToken);
}

internal override ValueTask WaitForAvailableBidirectionalStreamsAsync(CancellationToken cancellationToken = default)
{
PeerStreamLimit? streamLimit = RemoteStreamLimit;
if (streamLimit is null)
{
throw new InvalidOperationException("Not connected");
}

return streamLimit.Bidirectional.WaitForAvailableStreams(cancellationToken);
}

internal override QuicStreamProvider OpenUnidirectionalStream()
{
PeerStreamLimit? streamLimit = RemoteStreamLimit;
if (streamLimit is null)
{
throw new InvalidOperationException("Not connected");
}

if (!streamLimit.Unidirectional.TryIncrement())
while (!streamLimit.Unidirectional.TryIncrement())
{
throw new QuicException("No available unidirectional stream");
await streamLimit.Unidirectional.WaitForAvailableStreams(cancellationToken).ConfigureAwait(false);
}

long streamId;
Expand All @@ -207,17 +185,17 @@ internal override QuicStreamProvider OpenUnidirectionalStream()
return OpenStream(streamId, false);
}

internal override QuicStreamProvider OpenBidirectionalStream()
internal async override ValueTask<QuicStreamProvider> OpenBidirectionalStreamAsync(CancellationToken cancellationToken)
{
PeerStreamLimit? streamLimit = RemoteStreamLimit;
if (streamLimit is null)
{
throw new InvalidOperationException("Not connected");
}

if (!streamLimit.Bidirectional.TryIncrement())
while (!streamLimit.Bidirectional.TryIncrement())
{
throw new QuicException("No available bidirectional stream");
await streamLimit.Bidirectional.WaitForAvailableStreams(cancellationToken).ConfigureAwait(false);
}

long streamId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ internal enum QUIC_STREAM_EVENT_TYPE : uint
SEND_SHUTDOWN_COMPLETE = 6,
SHUTDOWN_COMPLETE = 7,
IDEAL_SEND_BUFFER_SIZE = 8,
PEER_ACCEPTED = 9
}

#if SOCKADDR_HAS_LENGTH
Expand Down
Loading