Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[browser] [wasm] Refactor Request Streaming to use HttpContent.CopyToAsync #91699

Merged
merged 15 commits into from
Sep 21, 2023
Merged
10 changes: 7 additions & 3 deletions src/libraries/Common/tests/System/Net/Http/ResponseStreamTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ public async Task BrowserHttpHandler_Streaming()

int readOffset = 0;
req.Content = new StreamContent(new DelegateStream(
canReadFunc: () => true,
readAsyncFunc: async (buffer, offset, count, cancellationToken) =>
{
await Task.Delay(1);
Expand Down Expand Up @@ -296,7 +297,9 @@ public async Task BrowserHttpHandler_StreamingRequest()

int size = 1500 * 1024 * 1024;
int remaining = size;
req.Content = new StreamContent(new DelegateStream(
var content = new MultipartFormDataContent();
content.Add(new StreamContent(new DelegateStream(
canReadFunc: () => true,
readAsyncFunc: (buffer, offset, count, cancellationToken) =>
{
if (remaining > 0)
Expand All @@ -307,15 +310,16 @@ public async Task BrowserHttpHandler_StreamingRequest()
return Task.FromResult(send);
}
return Task.FromResult(0);
}));
})), "test");
req.Content = content;

req.Content.Headers.Add("Content-MD5-Skip", "browser");

using (HttpClient client = CreateHttpClientForRemoteServer(Configuration.Http.RemoteHttp2Server))
using (HttpResponseMessage response = await client.SendAsync(req))
{
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
Assert.Equal(size.ToString(), Assert.Single(response.Headers.GetValues("X-HttpRequest-Body-Length")));
Assert.Equal((size + 129).ToString(), Assert.Single(response.Headers.GetValues("X-HttpRequest-Body-Length")));
campersau marked this conversation as resolved.
Show resolved Hide resolved
// Streaming requests can't set Content-Length
Assert.False(response.Headers.Contains("X-HttpRequest-Headers-ContentLength"));
}
Expand Down
3 changes: 3 additions & 0 deletions src/libraries/System.Net.Http/src/Resources/Strings.resx
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,9 @@
<data name="net_http_synchronous_reads_not_supported" xml:space="preserve">
<value>Synchronous reads are not supported, use ReadAsync instead.</value>
</data>
<data name="net_http_synchronous_writes_not_supported" xml:space="preserve">
<value>Synchronous writes are not supported, use WriteAsync instead.</value>
</data>
<data name="net_socks_auth_failed" xml:space="preserve">
<value>Failed to authenticate with the SOCKS server.</value>
</data>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,9 @@ private static async Task<WasmFetchResponse> CallFetch(HttpRequestMessage reques

if (streamingEnabled)
{
Stream stream = await request.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(true);
cancellationToken.ThrowIfCancellationRequested();

ReadableStreamPullState pullState = new ReadableStreamPullState(stream, cancellationToken);
JSObject requestStream = BrowserHttpInterop.FetchStream(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController);

promise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController, ReadableStreamPull, pullState);
promise = WasmHttpWriteStream.CopyToAsync(requestStream, request.Content, cancellationToken);
}
else
{
Expand Down Expand Up @@ -257,14 +254,6 @@ private static async Task<WasmFetchResponse> CallFetch(HttpRequestMessage reques
}
}

private static void ReadableStreamPull(object state)
{
ReadableStreamPullState pullState = (ReadableStreamPullState)state;
#pragma warning disable CS4014 // intentionally not awaited
pullState.PullAsync();
#pragma warning restore CS4014
}

private static HttpResponseMessage ConvertResponse(HttpRequestMessage request, WasmFetchResponse fetchResponse)
{
#if FEATURE_WASM_THREADS
Expand Down Expand Up @@ -329,41 +318,98 @@ static async Task<HttpResponseMessage> Impl(HttpRequestMessage request, Cancella
}
}

internal sealed class ReadableStreamPullState
internal sealed class WasmHttpWriteStream : Stream
{
private readonly Stream _stream;
private readonly CancellationToken _cancellationToken;
private readonly byte[] _buffer;
private readonly JSObject _requestStream;

public ReadableStreamPullState(Stream stream, CancellationToken cancellationToken)
public WasmHttpWriteStream(JSObject requestStream)
{
ArgumentNullException.ThrowIfNull(stream);
ArgumentNullException.ThrowIfNull(requestStream);

_stream = stream;
_cancellationToken = cancellationToken;
_buffer = new byte[65536];
_requestStream = requestStream;
}

public async Task PullAsync()
public static async Task<JSObject> CopyToAsync(JSObject requestStream, HttpContent content, CancellationToken cancellationToken)
{
try
{
int length = await _stream.ReadAsync(_buffer, _cancellationToken).ConfigureAwait(true);
ReadableStreamControllerEnqueueUnsafe(this, _buffer, length);
}
catch (Exception ex)
using (WasmHttpWriteStream stream = new WasmHttpWriteStream(requestStream))
{
BrowserHttpInterop.ReadableStreamControllerError(this, ex);
try
{
await content.CopyToAsync(stream, cancellationToken).ConfigureAwait(true);
return await BrowserHttpInterop.RequestStreamClose(requestStream).ConfigureAwait(true);
}
catch (Exception ex)
{
await BrowserHttpInterop.RequestStreamClose(requestStream, ex).ConfigureAwait(true);
throw;
}
}
}

private static unsafe void ReadableStreamControllerEnqueueUnsafe(object pullState, byte[] buffer, int length)
private async Task WriteAsyncCore(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
{
fixed (byte* ptr = buffer)
cancellationToken.ThrowIfCancellationRequested();
campersau marked this conversation as resolved.
Show resolved Hide resolved
using (Buffers.MemoryHandle handle = buffer.Pin())
{
BrowserHttpInterop.ReadableStreamControllerEnqueue(pullState, (nint)ptr, length);
await RequestStreamWriteUnsafe(_requestStream, buffer, handle).ConfigureAwait(true);
}

static unsafe Task RequestStreamWriteUnsafe(JSObject requestStream, ReadOnlyMemory<byte> buffer, Buffers.MemoryHandle handle)
campersau marked this conversation as resolved.
Show resolved Hide resolved
=> BrowserHttpInterop.RequestStreamWrite(requestStream, (nint)handle.Pointer, buffer.Length);
}

public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
{
return new ValueTask(WriteAsyncCore(buffer, cancellationToken));
}

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
ValidateBufferArguments(buffer, offset, count);
return WriteAsyncCore(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken);
}

public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;

protected override void Dispose(bool disposing)
{
_requestStream.Dispose();
}

public override void Flush()
{
}

#region PlatformNotSupported

public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public override long Length => throw new NotSupportedException();
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}

public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}

public override void SetLength(long value)
{
throw new NotSupportedException();
}

public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException(SR.net_http_synchronous_writes_not_supported);
}
#endregion
}

internal sealed class WasmFetchResponse : IDisposable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ public static partial void AbortRequest(
public static partial void AbortResponse(
JSObject fetchResponse);

[JSImport("INTERNAL.http_wasm_readable_stream_controller_enqueue")]
public static partial void ReadableStreamControllerEnqueue(
[JSMarshalAs<JSType.Any>] object pullState,
[JSImport("INTERNAL.http_wasm_request_stream_write")]
public static partial Task RequestStreamWrite(
JSObject requestStream,
IntPtr bufferPtr,
int bufferLength);

[JSImport("INTERNAL.http_wasm_readable_stream_controller_error")]
public static partial void ReadableStreamControllerError(
[JSMarshalAs<JSType.Any>] object pullState,
Exception error);
[JSImport("INTERNAL.http_wasm_request_stream_close")]
public static partial Task<JSObject> RequestStreamClose(
campersau marked this conversation as resolved.
Show resolved Hide resolved
JSObject requestStream,
Exception? error = null);

[JSImport("INTERNAL.http_wasm_get_response_header_names")]
private static partial string[] _GetResponseHeaderNames(
Expand Down Expand Up @@ -72,15 +72,13 @@ public static partial Task<JSObject> Fetch(
JSObject abortControler);

[JSImport("INTERNAL.http_wasm_fetch_stream")]
public static partial Task<JSObject> Fetch(
public static partial JSObject FetchStream(
string uri,
string[] headerNames,
string[] headerValues,
string[] optionNames,
[JSMarshalAs<JSType.Array<JSType.Any>>] object?[] optionValues,
JSObject abortControler,
[JSMarshalAs<JSType.Function<JSType.Any>>] Action<object> pull,
[JSMarshalAs<JSType.Any>] object pullState);
JSObject abortControler);

[JSImport("INTERNAL.http_wasm_fetch_bytes")]
private static partial Task<JSObject> FetchBytes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public async Task HttpRequest_StringContent_WithoutMediaType()
await LoopbackServer.CreateServerAsync(async (server, uri) =>
{
var request = new HttpRequestMessage(HttpMethod.Post, uri);
request.Content = new StringContent("Hello World", null, ((MediaTypeHeaderValue)null)!);
request.Content = new StringContent("", null, ((MediaTypeHeaderValue)null)!);

Task<HttpResponseMessage> requestTask = client.SendAsync(request);
await server.AcceptConnectionAsync(async connection =>
Expand Down
6 changes: 3 additions & 3 deletions src/mono/wasm/runtime/exports-internal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import { mono_wasm_cancel_promise } from "./cancelable-promise";
import cwraps, { profiler_c_functions } from "./cwraps";
import { mono_wasm_send_dbg_command_with_parms, mono_wasm_send_dbg_command, mono_wasm_get_dbg_command_info, mono_wasm_get_details, mono_wasm_release_object, mono_wasm_call_function_on, mono_wasm_debugger_resume, mono_wasm_detach_debugger, mono_wasm_raise_debug_event, mono_wasm_change_debugger_log_level, mono_wasm_debugger_attached } from "./debug";
import { http_wasm_supports_streaming_request, http_wasm_supports_streaming_response, http_wasm_create_abort_controler, http_wasm_abort_request, http_wasm_abort_response, http_wasm_readable_stream_controller_enqueue, http_wasm_readable_stream_controller_error, http_wasm_fetch, http_wasm_fetch_stream, http_wasm_fetch_bytes, http_wasm_get_response_header_names, http_wasm_get_response_header_values, http_wasm_get_response_bytes, http_wasm_get_response_length, http_wasm_get_streamed_response_bytes } from "./http";
import { http_wasm_supports_streaming_request, http_wasm_supports_streaming_response, http_wasm_create_abort_controler, http_wasm_abort_request, http_wasm_abort_response, http_wasm_request_stream_write, http_wasm_request_stream_close, http_wasm_fetch, http_wasm_fetch_stream, http_wasm_fetch_bytes, http_wasm_get_response_header_names, http_wasm_get_response_header_values, http_wasm_get_response_bytes, http_wasm_get_response_length, http_wasm_get_streamed_response_bytes } from "./http";
import { exportedRuntimeAPI, Module, runtimeHelpers } from "./globals";
import { get_property, set_property, has_property, get_typeof_property, get_global_this, dynamic_import } from "./invoke-js";
import { mono_wasm_stringify_as_error_with_stack } from "./logging";
Expand Down Expand Up @@ -69,8 +69,8 @@ export function export_internal(): any {
http_wasm_create_abort_controler,
http_wasm_abort_request,
http_wasm_abort_response,
http_wasm_readable_stream_controller_enqueue,
http_wasm_readable_stream_controller_error,
http_wasm_request_stream_write,
http_wasm_request_stream_close,
http_wasm_fetch,
http_wasm_fetch_stream,
http_wasm_fetch_bytes,
Expand Down
Loading