Skip to content

Commit

Permalink
use CancelationHelper
Browse files Browse the repository at this point in the history
  • Loading branch information
campersau committed Sep 8, 2023
1 parent 330d1ba commit f2a7f9d
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,25 +133,26 @@ private static async Task<WasmFetchResponse> CallFetch(HttpRequestMessage reques
List<string> headerNames = new List<string>(headerCount);
List<string> headerValues = new List<string>(headerCount);
JSObject abortController = BrowserHttpInterop.CreateAbortController();
CancellationTokenRegistration? abortRegistration = cancellationToken.Register(() =>
CancellationTokenRegistration? abortRegistration = cancellationToken.Register(static (object? state) =>
{
JSObject _abortController = (JSObject)state!;
#if FEATURE_WASM_THREADS
if (!abortController.IsDisposed)
if (!_abortController.IsDisposed)
{
abortController.SynchronizationContext.Send(static (JSObject _abortController) =>
_abortController.SynchronizationContext.Send(static (JSObject __abortController) =>
{
BrowserHttpInterop.AbortRequest(_abortController);
_abortController.Dispose();
}, abortController);
BrowserHttpInterop.AbortRequest(__abortController);
__abortController.Dispose();
}, _abortController);
}
#else
if (!abortController.IsDisposed)
if (!_abortController.IsDisposed)
{
BrowserHttpInterop.AbortRequest(abortController);
abortController.Dispose();
BrowserHttpInterop.AbortRequest(_abortController);
_abortController.Dispose();
}
#endif
});
}, abortController);
try
{
if (request.RequestUri == null)
Expand Down Expand Up @@ -223,14 +224,15 @@ private static async Task<WasmFetchResponse> CallFetch(HttpRequestMessage reques
using (JSObject transformStream = BrowserHttpInterop.CreateTransformStream())
{
Task<JSObject> fetchPromise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController, transformStream);
ValueTask<JSObject> fetchTask = BrowserHttpInterop.CancelationHelper(fetchPromise, cancellationToken, abortController, null); // initialize fetch cancellation
Task<JSObject> fetchTask = BrowserHttpInterop.CancelationHelper(fetchPromise, cancellationToken).AsTask(); // initialize fetch cancellation

using (WasmHttpWriteStream stream = new WasmHttpWriteStream(transformStream))
{
try
{
await request.Content.CopyToAsync(stream, cancellationToken).ConfigureAwait(true);
await BrowserHttpInterop.TransformStreamClose(transformStream).ConfigureAwait(true);
Task closePromise = BrowserHttpInterop.TransformStreamClose(transformStream);
await BrowserHttpInterop.CancelationHelper(closePromise, cancellationToken).ConfigureAwait(true);
}
catch (Exception ex)
{
Expand All @@ -248,13 +250,13 @@ private static async Task<WasmFetchResponse> CallFetch(HttpRequestMessage reques
cancellationToken.ThrowIfCancellationRequested();

Task<JSObject> fetchPromise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController, buffer);
fetchResponse = await BrowserHttpInterop.CancelationHelper(fetchPromise, cancellationToken, abortController, null).ConfigureAwait(true);
fetchResponse = await BrowserHttpInterop.CancelationHelper(fetchPromise, cancellationToken).ConfigureAwait(true);
}
}
else
{
Task<JSObject> fetchPromise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController);
fetchResponse = await BrowserHttpInterop.CancelationHelper(fetchPromise, cancellationToken, abortController, null).ConfigureAwait(true);
fetchResponse = await BrowserHttpInterop.CancelationHelper(fetchPromise, cancellationToken).ConfigureAwait(true);
}

return new WasmFetchResponse(fetchResponse, abortController, abortRegistration.Value);
Expand Down Expand Up @@ -352,7 +354,8 @@ private async Task WriteAsyncCore(ReadOnlyMemory<byte> buffer, CancellationToken
cancellationToken.ThrowIfCancellationRequested();
using (Buffers.MemoryHandle handle = buffer.Pin())
{
await TransformStreamWriteUnsafe(_transformStream, buffer, handle).ConfigureAwait(true);
Task writePromise = TransformStreamWriteUnsafe(_transformStream, buffer, handle);
await BrowserHttpInterop.CancelationHelper(writePromise, cancellationToken).ConfigureAwait(true);
}

static unsafe Task TransformStreamWriteUnsafe(JSObject transformStream, ReadOnlyMemory<byte> buffer, Buffers.MemoryHandle handle)
Expand Down Expand Up @@ -515,7 +518,7 @@ private async ValueTask<byte[]> GetResponseData(CancellationToken cancellationTo
#if FEATURE_WASM_THREADS
} //lock
#endif
_length = await BrowserHttpInterop.CancelationHelper(promise, cancellationToken, null, _fetchResponse.FetchResponse).ConfigureAwait(true);
_length = await BrowserHttpInterop.CancelationHelper(promise, cancellationToken, _fetchResponse.FetchResponse).ConfigureAwait(true);
#if FEATURE_WASM_THREADS
lock (_fetchResponse.ThisLock)
{
Expand Down Expand Up @@ -619,7 +622,7 @@ static async Task<int> Impl(WasmHttpReadStream self, Memory<byte> buffer, Cancel
#if FEATURE_WASM_THREADS
} //lock
#endif
int response = await BrowserHttpInterop.CancelationHelper(promise, cancellationToken, null, self._fetchResponse.FetchResponse).ConfigureAwait(true);
int response = await BrowserHttpInterop.CancelationHelper(promise, cancellationToken, self._fetchResponse.FetchResponse).ConfigureAwait(true);
return response;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,30 +123,27 @@ public static partial int GetResponseBytes(
[JSMarshalAs<JSType.MemoryView>] Span<byte> buffer);


public static async ValueTask<T> CancelationHelper<T>(Task<T> promise, CancellationToken cancellationToken, JSObject? abortController, JSObject? fetchResponse)
public static async ValueTask CancelationHelper(Task promise, CancellationToken cancellationToken, JSObject? fetchResponse = null)
{
if (promise.IsCompletedSuccessfully)
{
return promise.Result;
return;
}
try
{
using (var operationRegistration = cancellationToken.Register(() =>
using (var operationRegistration = cancellationToken.Register(static (object? state) =>
{
CancelablePromise.CancelPromise(promise, static (JSObject? _fetchResponse, JSObject? _abortController) =>
(Task _promise, JSObject? _fetchResponse) = ((Task, JSObject?))state!;
CancelablePromise.CancelPromise(_promise, static (JSObject? __fetchResponse) =>
{
if (_abortController != null)
{
AbortRequest(_abortController);
}
if (_fetchResponse != null)
if (__fetchResponse != null)
{
AbortResponse(_fetchResponse);
AbortResponse(__fetchResponse);
}
}, fetchResponse, abortController);
}))
}, _fetchResponse);
}, (promise, fetchResponse)))
{
return await promise.ConfigureAwait(true);
await promise.ConfigureAwait(true);
}
}
catch (OperationCanceledException oce) when (cancellationToken.IsCancellationRequested)
Expand All @@ -166,6 +163,16 @@ public static async ValueTask<T> CancelationHelper<T>(Task<T> promise, Cancellat
throw new HttpRequestException(jse.Message, jse);
}
}

public static async ValueTask<T> CancelationHelper<T>(Task<T> promise, CancellationToken cancellationToken, JSObject? fetchResponse = null)
{
if (promise.IsCompletedSuccessfully)
{
return promise.Result;
}
await CancelationHelper((Task)promise, cancellationToken, fetchResponse).ConfigureAwait(true);
return await promise.ConfigureAwait(true);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -580,10 +580,10 @@ private async Task CancelationHelper(Task jsTask, CancellationToken cancellation
}
try
{
using (var receiveRegistration = cancellationToken.Register(() =>
using (var receiveRegistration = cancellationToken.Register(static (object? state) =>
{
CancelablePromise.CancelPromise(jsTask);
}))
CancelablePromise.CancelPromise((Task)state!);
}, jsTask))
{
await jsTask.ConfigureAwait(true);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static void CancelPromise(Task promise)
#endif
}

public static void CancelPromise<T1, T2>(Task promise, Action<T1, T2> callback, T1 state1, T2 state2)
public static void CancelPromise<T>(Task promise, Action<T> callback, T state)
{
// this check makes sure that promiseGCHandle is still valid handle
if (promise.IsCompleted)
Expand All @@ -48,7 +48,7 @@ public static void CancelPromise<T1, T2>(Task promise, Action<T1, T2> callback,
{
#endif
_CancelPromise(holder.GCHandle);
callback.Invoke(state1, state2);
callback.Invoke(state);
#if FEATURE_WASM_THREADS
}, holder);
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ public static async Task<JSObject> CancelationHelper(Task<JSObject> jsTask, Canc
{
return jsTask.Result;
}
using (var receiveRegistration = cancellationToken.Register(() =>
using (var receiveRegistration = cancellationToken.Register(static (object? state) =>
{
CancelablePromise.CancelPromise(jsTask);
}))
CancelablePromise.CancelPromise((Task<JSObject>)state!);
}, jsTask))
{
return await jsTask.ConfigureAwait(true);
}
Expand Down
4 changes: 3 additions & 1 deletion src/mono/wasm/runtime/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export function http_wasm_transform_stream_write(ts: TransformStreamExtension, b
const copy = view.slice() as Uint8Array;
return wrap_as_cancelable_promise(async () => {
mono_assert(ts.__fetch_promise_control, "expected fetch promise control");
// race with fetch because fetch does not cancel the ReadableStream see https://bugs.chromium.org/p/chromium/issues/detail?id=1480250
await Promise.race([ts.__writer.ready, ts.__fetch_promise_control.promise]);
await Promise.race([ts.__writer.write(copy), ts.__fetch_promise_control.promise]);
});
Expand All @@ -86,8 +87,9 @@ export function http_wasm_transform_stream_write(ts: TransformStreamExtension, b
export function http_wasm_transform_stream_close(ts: TransformStreamExtension): Promise<void> {
return wrap_as_cancelable_promise(async () => {
mono_assert(ts.__fetch_promise_control, "expected fetch promise control");
// race with fetch because fetch does not cancel the ReadableStream see https://bugs.chromium.org/p/chromium/issues/detail?id=1480250
await Promise.race([ts.__writer.ready, ts.__fetch_promise_control.promise]);
ts.__writer.close();
await Promise.race([ts.__writer.close(), ts.__fetch_promise_control.promise]);
});
}

Expand Down

0 comments on commit f2a7f9d

Please sign in to comment.