1
0
Fork 0
mirror of https://github.com/VSadov/Satori.git synced 2025-06-09 09:34:49 +09:00

[browser][MT] Use auto thread dispatch in HTTP (#95370)

Co-authored-by: campersau <buchholz.bastian@googlemail.com>
Co-authored-by: Marek Fišera <mara@neptuo.com>
This commit is contained in:
Pavel Savara 2024-01-21 11:39:46 +01:00 committed by GitHub
parent ff1eeff500
commit 756a138766
Signed by: github
GPG key ID: B5690EEEBB952194
20 changed files with 592 additions and 738 deletions

2
.gitattributes vendored
View file

@ -77,4 +77,4 @@ src/tests/JIT/Performance/CodeQuality/BenchmarksGame/reverse-complement/revcomp-
src/tests/JIT/Performance/CodeQuality/BenchmarksGame/reverse-complement/revcomp-input25000.txt text eol=lf
src/tests/JIT/Performance/CodeQuality/BenchmarksGame/k-nucleotide/knucleotide-input.txt text eol=lf
src/tests/JIT/Performance/CodeQuality/BenchmarksGame/k-nucleotide/knucleotide-input-big.txt text eol=lf
src/mono/wasm/runtime/dotnet.d.ts text eol=lf
src/mono/browser/runtime/dotnet.d.ts text eol=lf

View file

@ -4,6 +4,7 @@
using System;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
@ -27,18 +28,33 @@ namespace NetCoreServer
RequestInformation info = await RequestInformation.CreateAsync(context.Request);
string echoJson = info.SerializeToJson();
byte[] bytes = Encoding.UTF8.GetBytes(echoJson);
// Compute MD5 hash so that clients can verify the received data.
using (MD5 md5 = MD5.Create())
{
byte[] bytes = Encoding.UTF8.GetBytes(echoJson);
byte[] hash = md5.ComputeHash(bytes);
string encodedHash = Convert.ToBase64String(hash);
context.Response.Headers["Content-MD5"] = encodedHash;
context.Response.ContentType = "application/json";
context.Response.ContentLength = bytes.Length;
await context.Response.Body.WriteAsync(bytes, 0, bytes.Length);
}
if (context.Request.QueryString.HasValue && context.Request.QueryString.Value.Contains("delay10sec"))
{
await context.Response.StartAsync(CancellationToken.None);
await context.Response.Body.FlushAsync();
await Task.Delay(10000);
}
else if (context.Request.QueryString.HasValue && context.Request.QueryString.Value.Contains("delay1sec"))
{
await context.Response.StartAsync(CancellationToken.None);
await Task.Delay(1000);
}
await context.Response.Body.WriteAsync(bytes, 0, bytes.Length);
}
}
}

View file

@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Buffers;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
@ -15,9 +16,6 @@ namespace System.Net.Http
// the JavaScript objects have thread affinity, it is necessary that the continuations run the same thread as the start of the async method.
internal sealed class BrowserHttpHandler : HttpMessageHandler
{
private static readonly HttpRequestOptionsKey<bool> EnableStreamingRequest = new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingRequest");
private static readonly HttpRequestOptionsKey<bool> EnableStreamingResponse = new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingResponse");
private static readonly HttpRequestOptionsKey<IDictionary<string, object>> FetchOptions = new HttpRequestOptionsKey<IDictionary<string, object>>("WebAssemblyFetchOptions");
private bool _allowAutoRedirect = HttpHandlerDefaults.DefaultAutomaticRedirection;
// flag to determine if the _allowAutoRedirect was explicitly set or not.
private bool _isAllowAutoRedirectTouched;
@ -114,71 +112,112 @@ namespace System.Net.Http
public const bool SupportsProxy = false;
public const bool SupportsRedirectConfiguration = true;
#if FEATURE_WASM_THREADS
private ConcurrentDictionary<string, object?>? _properties;
public IDictionary<string, object?> Properties => _properties ??= new ConcurrentDictionary<string, object?>();
#else
private Dictionary<string, object?>? _properties;
public IDictionary<string, object?> Properties => _properties ??= new Dictionary<string, object?>();
#endif
protected internal override HttpResponseMessage Send(HttpRequestMessage request, CancellationToken cancellationToken)
{
throw new PlatformNotSupportedException();
}
private static async Task<WasmFetchResponse> CallFetch(HttpRequestMessage request, CancellationToken cancellationToken, bool? allowAutoRedirect)
protected internal override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
int headerCount = request.Headers.Count + request.Content?.Headers.Count ?? 0;
List<string> headerNames = new List<string>(headerCount);
List<string> headerValues = new List<string>(headerCount);
JSObject abortController = BrowserHttpInterop.CreateAbortController();
bool? allowAutoRedirect = _isAllowAutoRedirectTouched ? AllowAutoRedirect : null;
var controller = new BrowserHttpController(request, allowAutoRedirect, cancellationToken);
return controller.CallFetch();
}
}
internal sealed class BrowserHttpController : IDisposable
{
private static readonly HttpRequestOptionsKey<bool> EnableStreamingRequest = new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingRequest");
private static readonly HttpRequestOptionsKey<bool> EnableStreamingResponse = new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingResponse");
private static readonly HttpRequestOptionsKey<IDictionary<string, object>> FetchOptions = new HttpRequestOptionsKey<IDictionary<string, object>>("WebAssemblyFetchOptions");
internal readonly JSObject _jsController;
private readonly CancellationTokenRegistration _abortRegistration;
private readonly string[] _optionNames;
private readonly object?[] _optionValues;
private readonly string[] _headerNames;
private readonly string[] _headerValues;
private readonly string uri;
private readonly CancellationToken _cancellationToken;
private readonly HttpRequestMessage _request;
private bool _isDisposed;
public BrowserHttpController(HttpRequestMessage request, bool? allowAutoRedirect, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(request);
if (request.RequestUri == null)
{
throw new ArgumentNullException(nameof(request.RequestUri));
}
_cancellationToken = cancellationToken;
_request = request;
JSObject httpController = BrowserHttpInterop.CreateController();
CancellationTokenRegistration abortRegistration = cancellationToken.Register(static s =>
{
JSObject _abortController = (JSObject)s!;
#if FEATURE_WASM_THREADS
if (!_abortController.IsDisposed)
JSObject _httpController = (JSObject)s!;
if (!_httpController.IsDisposed)
{
_abortController.SynchronizationContext.Send(static (JSObject __abortController) =>
{
BrowserHttpInterop.AbortRequest(__abortController);
__abortController.Dispose();
}, _abortController);
BrowserHttpInterop.AbortRequest(_httpController);
}
#else
if (!_abortController.IsDisposed)
{
BrowserHttpInterop.AbortRequest(_abortController);
_abortController.Dispose();
}
#endif
}, abortController);
try
}, httpController);
_jsController = httpController;
_abortRegistration = abortRegistration;
uri = request.RequestUri.IsAbsoluteUri ? request.RequestUri.AbsoluteUri : request.RequestUri.ToString();
bool hasFetchOptions = request.Options.TryGetValue(FetchOptions, out IDictionary<string, object>? fetchOptions);
int optionCount = 1 + (allowAutoRedirect.HasValue ? 1 : 0) + (hasFetchOptions && fetchOptions != null ? fetchOptions.Count : 0);
int optionIndex = 0;
// note there could be more values for each header name and so this is just name count
int headerCount = request.Headers.Count + (request.Content?.Headers.Count ?? 0);
_optionNames = new string[optionCount];
_optionValues = new object?[optionCount];
_optionNames[optionIndex] = "method";
_optionValues[optionIndex] = request.Method.Method;
optionIndex++;
if (allowAutoRedirect.HasValue)
{
if (request.RequestUri == null)
{
throw new ArgumentNullException(nameof(request.RequestUri));
}
string uri = request.RequestUri.IsAbsoluteUri ? request.RequestUri.AbsoluteUri : request.RequestUri.ToString();
bool hasFetchOptions = request.Options.TryGetValue(FetchOptions, out IDictionary<string, object>? fetchOptions);
int optionCount = 1 + (allowAutoRedirect.HasValue ? 1 : 0) + (hasFetchOptions && fetchOptions != null ? fetchOptions.Count : 0);
int optionIndex = 0;
string[] optionNames = new string[optionCount];
object?[] optionValues = new object?[optionCount];
optionNames[optionIndex] = "method";
optionValues[optionIndex] = request.Method.Method;
_optionNames[optionIndex] = "redirect";
_optionValues[optionIndex] = allowAutoRedirect.Value ? "follow" : "manual";
optionIndex++;
if (allowAutoRedirect.HasValue)
}
if (hasFetchOptions && fetchOptions != null)
{
foreach (KeyValuePair<string, object> item in fetchOptions)
{
optionNames[optionIndex] = "redirect";
optionValues[optionIndex] = allowAutoRedirect.Value ? "follow" : "manual";
_optionNames[optionIndex] = item.Key;
_optionValues[optionIndex] = item.Value;
optionIndex++;
}
}
foreach (KeyValuePair<string, IEnumerable<string>> header in request.Headers)
var headerNames = new List<string>(headerCount);
var headerValues = new List<string>(headerCount);
foreach (KeyValuePair<string, IEnumerable<string>> header in request.Headers)
{
foreach (string value in header.Value)
{
headerNames.Add(header.Key);
headerValues.Add(value);
}
}
if (request.Content != null)
{
foreach (KeyValuePair<string, IEnumerable<string>> header in request.Content.Headers)
{
foreach (string value in header.Value)
{
@ -186,117 +225,79 @@ namespace System.Net.Http
headerValues.Add(value);
}
}
}
_headerNames = headerNames.ToArray();
_headerValues = headerValues.ToArray();
}
if (request.Content != null)
{
foreach (KeyValuePair<string, IEnumerable<string>> header in request.Content.Headers)
{
foreach (string value in header.Value)
{
headerNames.Add(header.Key);
headerValues.Add(value);
}
}
}
public async Task<HttpResponseMessage> CallFetch()
{
CancellationHelper.ThrowIfCancellationRequested(_cancellationToken);
if (hasFetchOptions && fetchOptions != null)
{
foreach (KeyValuePair<string, object> item in fetchOptions)
{
optionNames[optionIndex] = item.Key;
optionValues[optionIndex] = item.Value;
optionIndex++;
}
}
BrowserHttpWriteStream? writeStream = null;
Task fetchPromise;
bool streamingRequestEnabled = false;
JSObject? fetchResponse;
cancellationToken.ThrowIfCancellationRequested();
if (request.Content != null)
try
{
if (_request.Content != null)
{
bool streamingEnabled = false;
if (BrowserHttpInterop.SupportsStreamingRequest())
{
request.Options.TryGetValue(EnableStreamingRequest, out streamingEnabled);
_request.Options.TryGetValue(EnableStreamingRequest, out streamingRequestEnabled);
}
if (streamingEnabled)
if (streamingRequestEnabled)
{
using (JSObject transformStream = BrowserHttpInterop.CreateTransformStream())
{
Task<JSObject> fetchPromise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController, transformStream);
Task<JSObject> fetchTask = BrowserHttpInterop.CancelationHelper(fetchPromise, cancellationToken).AsTask(); // initialize fetch cancellation
using (WasmHttpWriteStream stream = new WasmHttpWriteStream(transformStream))
{
try
{
await request.Content.CopyToAsync(stream, cancellationToken).ConfigureAwait(true);
Task closePromise = BrowserHttpInterop.TransformStreamClose(transformStream);
await BrowserHttpInterop.CancelationHelper(closePromise, cancellationToken).ConfigureAwait(true);
}
catch (Exception)
{
BrowserHttpInterop.TransformStreamAbort(transformStream);
if (!abortController.IsDisposed)
{
BrowserHttpInterop.AbortRequest(abortController);
}
try
{
using (fetchResponse = await fetchTask.ConfigureAwait(true)) // observe exception
{
BrowserHttpInterop.AbortResponse(fetchResponse);
}
}
catch { /* ignore */ }
cancellationToken.ThrowIfCancellationRequested();
throw;
}
}
fetchResponse = await fetchTask.ConfigureAwait(true);
}
fetchPromise = BrowserHttpInterop.FetchStream(_jsController, uri, _headerNames, _headerValues, _optionNames, _optionValues);
writeStream = new BrowserHttpWriteStream(this);
await _request.Content.CopyToAsync(writeStream, _cancellationToken).ConfigureAwait(false);
var closePromise = BrowserHttpInterop.TransformStreamClose(_jsController);
await BrowserHttpInterop.CancellationHelper(closePromise, _cancellationToken, _jsController).ConfigureAwait(false);
}
else
{
byte[] buffer = await request.Content.ReadAsByteArrayAsync(cancellationToken).ConfigureAwait(true);
cancellationToken.ThrowIfCancellationRequested();
byte[] buffer = await _request.Content.ReadAsByteArrayAsync(_cancellationToken).ConfigureAwait(false);
CancellationHelper.ThrowIfCancellationRequested(_cancellationToken);
Task<JSObject> fetchPromise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController, buffer);
fetchResponse = await BrowserHttpInterop.CancelationHelper(fetchPromise, cancellationToken).ConfigureAwait(true);
Memory<byte> bufferMemory = buffer.AsMemory();
// http_wasm_fetch_byte makes a copy of the bytes synchronously, so we can un-pin it synchronously
using MemoryHandle pinBuffer = bufferMemory.Pin();
fetchPromise = BrowserHttpInterop.FetchBytes(_jsController, uri, _headerNames, _headerValues, _optionNames, _optionValues, pinBuffer, buffer.Length);
}
}
else
{
Task<JSObject> fetchPromise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController);
fetchResponse = await BrowserHttpInterop.CancelationHelper(fetchPromise, cancellationToken).ConfigureAwait(true);
fetchPromise = BrowserHttpInterop.Fetch(_jsController, uri, _headerNames, _headerValues, _optionNames, _optionValues);
}
await BrowserHttpInterop.CancellationHelper(fetchPromise, _cancellationToken, _jsController).ConfigureAwait(false);
return new WasmFetchResponse(fetchResponse, abortController, abortRegistration);
return ConvertResponse();
}
catch (Exception ex)
{
abortRegistration.Dispose();
abortController.Dispose();
Dispose(); // will also abort request
if (ex is JSException jse)
{
throw new HttpRequestException(jse.Message, jse);
}
throw;
}
finally
{
writeStream?.Dispose();
}
}
private static HttpResponseMessage ConvertResponse(HttpRequestMessage request, WasmFetchResponse fetchResponse)
private HttpResponseMessage ConvertResponse()
{
#if FEATURE_WASM_THREADS
lock (fetchResponse.ThisLock)
lock (this)
{
#endif
fetchResponse.ThrowIfDisposed();
string? responseType = fetchResponse.FetchResponse!.GetPropertyAsString("type")!;
int status = fetchResponse.FetchResponse.GetPropertyAsInt32("status");
ThrowIfDisposed();
string? responseType = BrowserHttpInterop.GetResponseType(_jsController);
int status = BrowserHttpInterop.GetResponseStatus(_jsController);
HttpResponseMessage responseMessage = new HttpResponseMessage((HttpStatusCode)status);
responseMessage.RequestMessage = request;
responseMessage.RequestMessage = _request;
if (responseType == "opaqueredirect")
{
// Here we will set the ReasonPhrase so that it can be evaluated later.
@ -309,77 +310,69 @@ namespace System.Net.Http
responseMessage.SetReasonPhraseWithoutValidation(responseType);
}
bool streamingEnabled = false;
bool streamingResponseEnabled = false;
if (BrowserHttpInterop.SupportsStreamingResponse())
{
request.Options.TryGetValue(EnableStreamingResponse, out streamingEnabled);
_request.Options.TryGetValue(EnableStreamingResponse, out streamingResponseEnabled);
}
responseMessage.Content = streamingEnabled
? new StreamContent(new WasmHttpReadStream(fetchResponse))
: new BrowserHttpContent(fetchResponse);
responseMessage.Content = streamingResponseEnabled
? new StreamContent(new BrowserHttpReadStream(this))
: new BrowserHttpContent(this);
// Some of the headers may not even be valid header types in .NET thus we use TryAddWithoutValidation
// CORS will only allow access to certain headers on browser.
BrowserHttpInterop.GetResponseHeaders(fetchResponse.FetchResponse, responseMessage.Headers, responseMessage.Content.Headers);
BrowserHttpInterop.GetResponseHeaders(_jsController!, responseMessage.Headers, responseMessage.Content.Headers);
return responseMessage;
#if FEATURE_WASM_THREADS
} //lock
#endif
}
protected internal override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
public void ThrowIfDisposed()
{
bool? allowAutoRedirect = _isAllowAutoRedirectTouched ? AllowAutoRedirect : null;
#if FEATURE_WASM_THREADS
return JSHost.CurrentOrMainJSSynchronizationContext.Post(() =>
lock (this)
{
#endif
return Impl(request, cancellationToken, allowAutoRedirect);
#if FEATURE_WASM_THREADS
});
#endif
ObjectDisposedException.ThrowIf(_isDisposed, this);
} //lock
}
static async Task<HttpResponseMessage> Impl(HttpRequestMessage request, CancellationToken cancellationToken, bool? allowAutoRedirect)
public void Dispose()
{
lock (this)
{
WasmFetchResponse fetchRespose = await CallFetch(request, cancellationToken, allowAutoRedirect).ConfigureAwait(true);
return ConvertResponse(request, fetchRespose);
if (_isDisposed)
return;
_isDisposed = true;
}
_abortRegistration.Dispose();
if (_jsController != null)
{
if (!_jsController.IsDisposed)
{
BrowserHttpInterop.AbortRequest(_jsController);// aborts also response
}
_jsController.Dispose();
}
}
}
internal sealed class WasmHttpWriteStream : Stream
internal sealed class BrowserHttpWriteStream : Stream
{
private readonly JSObject _transformStream;
public WasmHttpWriteStream(JSObject transformStream)
private readonly BrowserHttpController _controller; // we don't own it, we don't dispose it from here
public BrowserHttpWriteStream(BrowserHttpController controller)
{
ArgumentNullException.ThrowIfNull(transformStream);
ArgumentNullException.ThrowIfNull(controller);
_transformStream = transformStream;
_controller = controller;
}
private Task WriteAsyncCore(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
#if FEATURE_WASM_THREADS
return _transformStream.SynchronizationContext.Post(() => Impl(this, buffer, cancellationToken));
#else
return Impl(this, buffer, cancellationToken);
#endif
static async Task Impl(WasmHttpWriteStream self, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
{
using (Buffers.MemoryHandle handle = buffer.Pin())
{
Task writePromise = TransformStreamWriteUnsafe(self._transformStream, buffer, handle);
await BrowserHttpInterop.CancelationHelper(writePromise, cancellationToken).ConfigureAwait(true);
}
}
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
_controller.ThrowIfDisposed();
static unsafe Task TransformStreamWriteUnsafe(JSObject transformStream, ReadOnlyMemory<byte> buffer, Buffers.MemoryHandle handle)
=> BrowserHttpInterop.TransformStreamWrite(transformStream, (nint)handle.Pointer, buffer.Length);
// http_wasm_transform_stream_write makes a copy of the bytes synchronously, so we can dispose the handle synchronously
using MemoryHandle pinBuffer = buffer.Pin();
Task writePromise = BrowserHttpInterop.TransformStreamWriteUnsafe(_controller._jsController, buffer, pinBuffer);
return BrowserHttpInterop.CancellationHelper(writePromise, cancellationToken, _controller._jsController);
}
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
@ -399,7 +392,6 @@ namespace System.Net.Http
protected override void Dispose(bool disposing)
{
_transformStream.Dispose();
}
public override void Flush()
@ -436,159 +428,57 @@ namespace System.Net.Http
#endregion
}
internal sealed class WasmFetchResponse : IDisposable
{
#if FEATURE_WASM_THREADS
public readonly object ThisLock = new object();
#endif
public JSObject? FetchResponse;
private readonly JSObject _abortController;
private readonly CancellationTokenRegistration _abortRegistration;
private bool _isDisposed;
public WasmFetchResponse(JSObject fetchResponse, JSObject abortController, CancellationTokenRegistration abortRegistration)
{
ArgumentNullException.ThrowIfNull(fetchResponse);
ArgumentNullException.ThrowIfNull(abortController);
FetchResponse = fetchResponse;
_abortRegistration = abortRegistration;
_abortController = abortController;
}
public void ThrowIfDisposed()
{
#if FEATURE_WASM_THREADS
lock (ThisLock)
{
#endif
ObjectDisposedException.ThrowIf(_isDisposed, this);
#if FEATURE_WASM_THREADS
} //lock
#endif
}
public void Dispose()
{
if (_isDisposed)
return;
#if FEATURE_WASM_THREADS
FetchResponse?.SynchronizationContext.Post(static (WasmFetchResponse self) =>
{
lock (self.ThisLock)
{
if (!self._isDisposed)
{
self._isDisposed = true;
self._abortRegistration.Dispose();
self._abortController.Dispose();
if (!self.FetchResponse!.IsDisposed)
{
BrowserHttpInterop.AbortResponse(self.FetchResponse);
}
self.FetchResponse.Dispose();
self.FetchResponse = null;
}
return Task.CompletedTask;
}
}, this);
#else
_isDisposed = true;
_abortRegistration.Dispose();
_abortController.Dispose();
if (FetchResponse != null)
{
if (!FetchResponse.IsDisposed)
{
BrowserHttpInterop.AbortResponse(FetchResponse);
}
FetchResponse.Dispose();
FetchResponse = null;
}
#endif
}
}
internal sealed class BrowserHttpContent : HttpContent
{
private byte[]? _data;
private int _length = -1;
private readonly WasmFetchResponse _fetchResponse;
private readonly BrowserHttpController _controller;
public BrowserHttpContent(WasmFetchResponse fetchResponse)
public BrowserHttpContent(BrowserHttpController controller)
{
ArgumentNullException.ThrowIfNull(fetchResponse);
_fetchResponse = fetchResponse;
ArgumentNullException.ThrowIfNull(controller);
_controller = controller;
}
// TODO allocate smaller buffer and call multiple times
private async ValueTask<byte[]> GetResponseData(CancellationToken cancellationToken)
{
Task<int> promise;
#if FEATURE_WASM_THREADS
lock (_fetchResponse.ThisLock)
lock (_controller)
{
#endif
if (_data != null)
{
return _data;
}
_fetchResponse.ThrowIfDisposed();
promise = BrowserHttpInterop.GetResponseLength(_fetchResponse.FetchResponse!);
#if FEATURE_WASM_THREADS
_controller.ThrowIfDisposed();
promise = BrowserHttpInterop.GetResponseLength(_controller._jsController!);
} //lock
#endif
_length = await BrowserHttpInterop.CancelationHelper(promise, cancellationToken, _fetchResponse.FetchResponse).ConfigureAwait(true);
#if FEATURE_WASM_THREADS
lock (_fetchResponse.ThisLock)
_length = await BrowserHttpInterop.CancellationHelper(promise, cancellationToken, _controller._jsController).ConfigureAwait(false);
lock (_controller)
{
#endif
_data = new byte[_length];
BrowserHttpInterop.GetResponseBytes(_fetchResponse.FetchResponse!, new Span<byte>(_data));
BrowserHttpInterop.GetResponseBytes(_controller._jsController!, new Span<byte>(_data));
return _data;
#if FEATURE_WASM_THREADS
} //lock
#endif
}
protected override Task<Stream> CreateContentReadStreamAsync()
protected override async Task<Stream> CreateContentReadStreamAsync()
{
_fetchResponse.ThrowIfDisposed();
#if FEATURE_WASM_THREADS
return _fetchResponse.FetchResponse!.SynchronizationContext.Post(() => Impl(this));
#else
return Impl(this);
#endif
static async Task<Stream> Impl(BrowserHttpContent self)
{
self._fetchResponse.ThrowIfDisposed();
byte[] data = await self.GetResponseData(CancellationToken.None).ConfigureAwait(true);
return new MemoryStream(data, writable: false);
}
byte[] data = await GetResponseData(CancellationToken.None).ConfigureAwait(false);
return new MemoryStream(data, writable: false);
}
protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context) =>
SerializeToStreamAsync(stream, context, CancellationToken.None);
protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context, CancellationToken cancellationToken)
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(stream, nameof(stream));
_fetchResponse.ThrowIfDisposed();
#if FEATURE_WASM_THREADS
return _fetchResponse.FetchResponse!.SynchronizationContext.Post(() => Impl(this, stream, cancellationToken));
#else
return Impl(this, stream, cancellationToken);
#endif
static async Task Impl(BrowserHttpContent self, Stream stream, CancellationToken cancellationToken)
{
self._fetchResponse.ThrowIfDisposed();
byte[] data = await self.GetResponseData(cancellationToken).ConfigureAwait(true);
await stream.WriteAsync(data, cancellationToken).ConfigureAwait(true);
}
byte[] data = await GetResponseData(cancellationToken).ConfigureAwait(false);
await stream.WriteAsync(data, cancellationToken).ConfigureAwait(false);
}
protected internal override bool TryComputeLength(out long length)
@ -605,52 +495,40 @@ namespace System.Net.Http
protected override void Dispose(bool disposing)
{
_fetchResponse.Dispose();
_controller.Dispose();
base.Dispose(disposing);
}
}
internal sealed class WasmHttpReadStream : Stream
internal sealed class BrowserHttpReadStream : Stream
{
private WasmFetchResponse _fetchResponse;
private BrowserHttpController _controller; // we own the object and have to dispose it
public WasmHttpReadStream(WasmFetchResponse fetchResponse)
public BrowserHttpReadStream(BrowserHttpController controller)
{
_fetchResponse = fetchResponse;
_controller = controller;
}
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(buffer, nameof(buffer));
_fetchResponse.ThrowIfDisposed();
#if FEATURE_WASM_THREADS
return await _fetchResponse.FetchResponse!.SynchronizationContext.Post(() => Impl(this, buffer, cancellationToken)).ConfigureAwait(true);
#else
return await Impl(this, buffer, cancellationToken).ConfigureAwait(true);
#endif
_controller.ThrowIfDisposed();
static async Task<int> Impl(WasmHttpReadStream self, Memory<byte> buffer, CancellationToken cancellationToken)
MemoryHandle pinBuffer = buffer.Pin();
int bytesCount;
try
{
self._fetchResponse.ThrowIfDisposed();
Task<int> promise;
using (Buffers.MemoryHandle handle = buffer.Pin())
{
#if FEATURE_WASM_THREADS
lock (self._fetchResponse.ThisLock)
{
#endif
self._fetchResponse.ThrowIfDisposed();
promise = GetStreamedResponseBytesUnsafe(self._fetchResponse, buffer, handle);
#if FEATURE_WASM_THREADS
} //lock
#endif
int response = await BrowserHttpInterop.CancelationHelper(promise, cancellationToken, self._fetchResponse.FetchResponse).ConfigureAwait(true);
return response;
}
_controller.ThrowIfDisposed();
unsafe static Task<int> GetStreamedResponseBytesUnsafe(WasmFetchResponse _fetchResponse, Memory<byte> buffer, Buffers.MemoryHandle handle)
=> BrowserHttpInterop.GetStreamedResponseBytes(_fetchResponse.FetchResponse!, (IntPtr)handle.Pointer, buffer.Length);
var promise = BrowserHttpInterop.GetStreamedResponseBytesUnsafe(_controller._jsController, buffer, pinBuffer);
bytesCount = await BrowserHttpInterop.CancellationHelper(promise, cancellationToken, _controller._jsController).ConfigureAwait(false);
}
finally
{
// this must be after await, because http_wasm_get_streamed_response_bytes is using the buffer in a continuation
pinBuffer.Dispose();
}
return bytesCount;
}
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
@ -665,7 +543,7 @@ namespace System.Net.Http
protected override void Dispose(bool disposing)
{
_fetchResponse.Dispose();
_controller.Dispose();
}
public override void Flush()

View file

@ -1,7 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.IO;
using System.Buffers;
using System.Net.Http.Headers;
using System.Runtime.InteropServices.JavaScript;
using System.Threading;
@ -17,47 +17,53 @@ namespace System.Net.Http
[JSImport("INTERNAL.http_wasm_supports_streaming_response")]
public static partial bool SupportsStreamingResponse();
[JSImport("INTERNAL.http_wasm_create_abort_controler")]
public static partial JSObject CreateAbortController();
[JSImport("INTERNAL.http_wasm_create_controller")]
public static partial JSObject CreateController();
[JSImport("INTERNAL.http_wasm_abort_request")]
public static partial void AbortRequest(
JSObject abortController);
JSObject httpController);
[JSImport("INTERNAL.http_wasm_abort_response")]
public static partial void AbortResponse(
JSObject fetchResponse);
[JSImport("INTERNAL.http_wasm_create_transform_stream")]
public static partial JSObject CreateTransformStream();
JSObject httpController);
[JSImport("INTERNAL.http_wasm_transform_stream_write")]
public static partial Task TransformStreamWrite(
JSObject transformStream,
JSObject httpController,
IntPtr bufferPtr,
int bufferLength);
public static unsafe Task TransformStreamWriteUnsafe(JSObject httpController, ReadOnlyMemory<byte> buffer, Buffers.MemoryHandle handle)
=> TransformStreamWrite(httpController, (nint)handle.Pointer, buffer.Length);
[JSImport("INTERNAL.http_wasm_transform_stream_close")]
public static partial Task TransformStreamClose(
JSObject transformStream);
[JSImport("INTERNAL.http_wasm_transform_stream_abort")]
public static partial void TransformStreamAbort(
JSObject transformStream);
JSObject httpController);
[JSImport("INTERNAL.http_wasm_get_response_header_names")]
private static partial string[] _GetResponseHeaderNames(
JSObject fetchResponse);
JSObject httpController);
[JSImport("INTERNAL.http_wasm_get_response_header_values")]
private static partial string[] _GetResponseHeaderValues(
JSObject fetchResponse);
JSObject httpController);
public static void GetResponseHeaders(JSObject fetchResponse, HttpHeaders resposeHeaders, HttpHeaders contentHeaders)
[JSImport("INTERNAL.http_wasm_get_response_status")]
public static partial int GetResponseStatus(
JSObject httpController);
[JSImport("INTERNAL.http_wasm_get_response_type")]
public static partial string GetResponseType(
JSObject httpController);
public static void GetResponseHeaders(JSObject httpController, HttpHeaders resposeHeaders, HttpHeaders contentHeaders)
{
string[] headerNames = _GetResponseHeaderNames(fetchResponse);
string[] headerValues = _GetResponseHeaderValues(fetchResponse);
string[] headerNames = _GetResponseHeaderNames(httpController);
string[] headerValues = _GetResponseHeaderValues(httpController);
// Some of the headers may not even be valid header types in .NET thus we use TryAddWithoutValidation
// CORS will only allow access to certain headers on browser.
for (int i = 0; i < headerNames.Length; i++)
{
if (!resposeHeaders.TryAddWithoutValidation(headerNames[i], headerValues[i]))
@ -67,43 +73,38 @@ namespace System.Net.Http
}
}
[JSImport("INTERNAL.http_wasm_fetch")]
public static partial Task<JSObject> Fetch(
public static partial Task Fetch(
JSObject httpController,
string uri,
string[] headerNames,
string[] headerValues,
string[] optionNames,
[JSMarshalAs<JSType.Array<JSType.Any>>] object?[] optionValues,
JSObject abortControler);
[JSMarshalAs<JSType.Array<JSType.Any>>] object?[] optionValues);
[JSImport("INTERNAL.http_wasm_fetch_stream")]
public static partial Task<JSObject> Fetch(
public static partial Task FetchStream(
JSObject httpController,
string uri,
string[] headerNames,
string[] headerValues,
string[] optionNames,
[JSMarshalAs<JSType.Array<JSType.Any>>] object?[] optionValues,
JSObject abortControler,
JSObject transformStream);
[JSMarshalAs<JSType.Array<JSType.Any>>] object?[] optionValues);
[JSImport("INTERNAL.http_wasm_fetch_bytes")]
private static partial Task<JSObject> FetchBytes(
private static partial Task FetchBytes(
JSObject httpController,
string uri,
string[] headerNames,
string[] headerValues,
string[] optionNames,
[JSMarshalAs<JSType.Array<JSType.Any>>] object?[] optionValues,
JSObject abortControler,
IntPtr bodyPtr,
int bodyLength);
public static unsafe Task<JSObject> Fetch(string uri, string[] headerNames, string[] headerValues, string[] optionNames, object?[] optionValues, JSObject abortControler, byte[] body)
public static unsafe Task FetchBytes(JSObject httpController, string uri, string[] headerNames, string[] headerValues, string[] optionNames, object?[] optionValues, MemoryHandle pinBuffer, int bodyLength)
{
fixed (byte* ptr = body)
{
return FetchBytes(uri, headerNames, headerValues, optionNames, optionValues, abortControler, (IntPtr)ptr, body.Length);
}
return FetchBytes(httpController, uri, headerNames, headerValues, optionNames, optionValues, (IntPtr)pinBuffer.Pointer, bodyLength);
}
[JSImport("INTERNAL.http_wasm_get_streamed_response_bytes")]
@ -112,6 +113,10 @@ namespace System.Net.Http
IntPtr bufferPtr,
int bufferLength);
public static unsafe Task<int> GetStreamedResponseBytesUnsafe(JSObject jsController, Memory<byte> buffer, MemoryHandle handle)
=> GetStreamedResponseBytes(jsController, (IntPtr)handle.Pointer, buffer.Length);
[JSImport("INTERNAL.http_wasm_get_response_length")]
public static partial Task<int> GetResponseLength(
JSObject fetchResponse);
@ -122,8 +127,10 @@ namespace System.Net.Http
[JSMarshalAs<JSType.MemoryView>] Span<byte> buffer);
public static async ValueTask CancelationHelper(Task promise, CancellationToken cancellationToken, JSObject? fetchResponse = null)
public static async Task CancellationHelper(Task promise, CancellationToken cancellationToken, JSObject jsController)
{
Http.CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
if (promise.IsCompletedSuccessfully)
{
return;
@ -132,46 +139,43 @@ namespace System.Net.Http
{
using (var operationRegistration = cancellationToken.Register(static s =>
{
(Task _promise, JSObject? _fetchResponse) = ((Task, JSObject?))s!;
CancelablePromise.CancelPromise(_promise, static (JSObject? __fetchResponse) =>
(Task _promise, JSObject _jsController) = ((Task, JSObject))s!;
CancelablePromise.CancelPromise(_promise, static (JSObject __jsController) =>
{
if (__fetchResponse != null)
if (!__jsController.IsDisposed)
{
AbortResponse(__fetchResponse);
AbortResponse(__jsController);
}
}, _fetchResponse);
}, (promise, fetchResponse)))
}, _jsController);
}, (promise, jsController)))
{
await promise.ConfigureAwait(true);
}
}
catch (OperationCanceledException oce) when (cancellationToken.IsCancellationRequested)
{
throw CancellationHelper.CreateOperationCanceledException(oce, cancellationToken);
Http.CancellationHelper.ThrowIfCancellationRequested(oce, cancellationToken);
}
catch (JSException jse)
{
if (jse.Message.StartsWith("AbortError", StringComparison.Ordinal))
{
throw CancellationHelper.CreateOperationCanceledException(jse, CancellationToken.None);
}
if (cancellationToken.IsCancellationRequested)
{
throw CancellationHelper.CreateOperationCanceledException(jse, cancellationToken);
throw Http.CancellationHelper.CreateOperationCanceledException(jse, CancellationToken.None);
}
Http.CancellationHelper.ThrowIfCancellationRequested(jse, cancellationToken);
throw new HttpRequestException(jse.Message, jse);
}
}
public static async ValueTask<T> CancelationHelper<T>(Task<T> promise, CancellationToken cancellationToken, JSObject? fetchResponse = null)
public static async Task<T> CancellationHelper<T>(Task<T> promise, CancellationToken cancellationToken, JSObject jsController)
{
Http.CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
if (promise.IsCompletedSuccessfully)
{
return promise.Result;
}
await CancelationHelper((Task)promise, cancellationToken, fetchResponse).ConfigureAwait(true);
return await promise.ConfigureAwait(true);
await CancellationHelper((Task)promise, cancellationToken, jsController).ConfigureAwait(false);
return promise.Result;
}
}
}

View file

@ -35,10 +35,18 @@ namespace System.Net.Http
/// <summary>Throws a cancellation exception if cancellation has been requested via <paramref name="cancellationToken"/>.</summary>
/// <param name="cancellationToken">The token to check for a cancellation request.</param>
internal static void ThrowIfCancellationRequested(CancellationToken cancellationToken)
{
ThrowIfCancellationRequested(innerException: null, cancellationToken);
}
/// <summary>Throws a cancellation exception if cancellation has been requested via <paramref name="cancellationToken"/>.</summary>
/// <param name="innerException">The inner exception to wrap. May be null.</param>
/// <param name="cancellationToken">The token to check for a cancellation request.</param>
internal static void ThrowIfCancellationRequested(Exception? innerException, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
ThrowOperationCanceledException(innerException: null, cancellationToken);
ThrowOperationCanceledException(innerException, cancellationToken);
}
}
}

View file

@ -16,8 +16,6 @@
<TargetPlatformIdentifier>$([MSBuild]::GetTargetPlatformIdentifier('$(TargetFramework)'))</TargetPlatformIdentifier>
<DefineConstants Condition="'$(TargetPlatformIdentifier)' == 'windows'">$(DefineConstants);TargetsWindows</DefineConstants>
<DefineConstants Condition="'$(TargetPlatformIdentifier)' == 'browser'">$(DefineConstants);TARGETS_BROWSER</DefineConstants>
<!-- Active issue https://github.com/dotnet/runtime/issues/96173 -->
<_XUnitBackgroundExec>false</_XUnitBackgroundExec>
</PropertyGroup>
<PropertyGroup Condition="'$(TargetOS)' == 'browser'">

View file

@ -64,7 +64,6 @@ namespace System.Xml.XmlSchemaTests
//-----------------------------------------------------------------------------------
[Fact]
[ActiveIssue("https://github.com/dotnet/runtime/issues/75123", typeof(PlatformDetection), nameof(PlatformDetection.IsWasmThreadingSupported))]
//[Variation(Desc = "v4 - ns = valid, URL = invalid")]
public void v4()
{

View file

@ -1,11 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<Suppressions xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<Suppression>
<DiagnosticId>CP0001</DiagnosticId>
<Target>T:System.Runtime.InteropServices.JavaScript.SynchronizationContextExtension</Target>
<Left>ref/net9.0/System.Runtime.InteropServices.JavaScript.dll</Left>
<Right>runtimes/browser/lib/net9.0/System.Runtime.InteropServices.JavaScript.dll</Right>
</Suppression>
<Suppression>
<DiagnosticId>CP0001</DiagnosticId>
<Target>T:System.Runtime.InteropServices.JavaScript.CancelablePromise</Target>
@ -18,10 +12,4 @@
<Left>ref/net9.0/System.Runtime.InteropServices.JavaScript.dll</Left>
<Right>runtimes/browser/lib/net9.0/System.Runtime.InteropServices.JavaScript.dll</Right>
</Suppression>
<Suppression>
<DiagnosticId>CP0002</DiagnosticId>
<Target>M:System.Runtime.InteropServices.JavaScript.JSHost.get_CurrentOrMainJSSynchronizationContext</Target>
<Left>ref/net9.0/System.Runtime.InteropServices.JavaScript.dll</Left>
<Right>runtimes/browser/lib/net9.0/System.Runtime.InteropServices.JavaScript.dll</Right>
</Suppression>
</Suppressions>

View file

@ -42,7 +42,6 @@
<Compile Include="System\Runtime\InteropServices\JavaScript\JSExportAttribute.cs" />
<Compile Include="System\Runtime\InteropServices\JavaScript\JSImportAttribute.cs" />
<Compile Include="System\Runtime\InteropServices\JavaScript\CancelablePromise.cs" />
<Compile Include="System\Runtime\InteropServices\JavaScript\SynchronizationContextExtensions.cs" />
<Compile Include="System\Runtime\InteropServices\JavaScript\JSProxyContext.cs" />
<Compile Include="System\Runtime\InteropServices\JavaScript\MarshalerType.cs" />

View file

@ -50,17 +50,5 @@ namespace System.Runtime.InteropServices.JavaScript
return JSHostImplementation.ImportAsync(moduleName, moduleUrl, cancellationToken);
}
public static SynchronizationContext CurrentOrMainJSSynchronizationContext
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get
{
#if FEATURE_WASM_THREADS
return (JSProxyContext.ExecutionContext ?? JSProxyContext.MainThreadContext).SynchronizationContext;
#else
return null!;
#endif
}
}
}
}

View file

@ -32,37 +32,44 @@ namespace System.Runtime.InteropServices.JavaScript
public static Task<T> RunAsync<T>(Func<Task<T>> body, CancellationToken cancellationToken)
{
var instance = new JSWebWorkerInstance<T>(body, null, cancellationToken);
var instance = new JSWebWorkerInstance<T>(body, cancellationToken);
return instance.Start();
}
public static Task RunAsync(Func<Task> body, CancellationToken cancellationToken)
{
var instance = new JSWebWorkerInstance<int>(null, body, cancellationToken);
var instance = new JSWebWorkerInstance<int>(async () =>
{
await body().ConfigureAwait(false);
return 0;
}, cancellationToken);
return instance.Start();
}
internal sealed class JSWebWorkerInstance<T> : IDisposable
{
private JSSynchronizationContext? _jsSynchronizationContext;
private TaskCompletionSource<T> _taskCompletionSource;
private Thread _thread;
private CancellationToken _cancellationToken;
private readonly TaskCompletionSource<T> _taskCompletionSource;
private readonly Thread _thread;
private readonly CancellationToken _cancellationToken;
private readonly Func<Task<T>> _body;
private CancellationTokenRegistration? _cancellationRegistration;
private Func<Task<T>>? _bodyRes;
private Func<Task>? _bodyVoid;
private Task? _resultTask;
private JSSynchronizationContext? _jsSynchronizationContext;
private Task<T>? _resultTask;
private bool _isDisposed;
public JSWebWorkerInstance(Func<Task<T>>? bodyRes, Func<Task>? bodyVoid, CancellationToken cancellationToken)
public JSWebWorkerInstance(Func<Task<T>> body, CancellationToken cancellationToken)
{
// Task created from this TCS is consumed by external caller, on outer thread.
// We don't want the continuations of that task to run on JSWebWorker
// only the tasks created inside of the callback should run in JSWebWorker
// TODO TaskCreationOptions.HideScheduler ?
_taskCompletionSource = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
_thread = new Thread(ThreadMain);
_resultTask = null;
_cancellationToken = cancellationToken;
_cancellationRegistration = null;
_bodyRes = bodyRes;
_bodyVoid = bodyVoid;
_body = body;
JSHostImplementation.SetHasExternalEventLoop(_thread);
}
@ -73,14 +80,20 @@ namespace System.Runtime.InteropServices.JavaScript
// give browser chance to load more threads
// until there at least one thread loaded, it doesn't make sense to `Start`
// because that would also hang, but in a way blocking the UI thread, much worse.
JavaScriptImports.ThreadAvailable().ContinueWith(t =>
JavaScriptImports.ThreadAvailable().ContinueWith(static (t, o) =>
{
var self = (JSWebWorkerInstance<T>)o!;
if (t.IsCompletedSuccessfully)
{
_thread.Start();
self._thread.Start();
}
return t;
}, _cancellationToken, TaskContinuationOptions.RunContinuationsAsynchronously, TaskScheduler.Current);
if (t.IsCanceled)
{
throw new OperationCanceledException("Cancelled while waiting for underlying WebWorker to become available.", self._cancellationToken);
}
throw t.Exception!;
// ideally this will execute on UI thread quickly: ExecuteSynchronously
}, this, _cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.FromCurrentSynchronizationContext());
}
else
{
@ -95,32 +108,26 @@ namespace System.Runtime.InteropServices.JavaScript
{
if (_cancellationToken.IsCancellationRequested)
{
PropagateCompletionAndDispose(Task.FromException<T>(new OperationCanceledException(_cancellationToken)));
PropagateCompletionAndDispose(Task.FromCanceled<T>(_cancellationToken));
return;
}
// receive callback when the cancellation is requested
_cancellationRegistration = _cancellationToken.Register(() =>
_cancellationRegistration = _cancellationToken.Register(static (o) =>
{
var self = (JSWebWorkerInstance<T>)o!;
// this could be executing on any thread
PropagateCompletionAndDispose(Task.FromException<T>(new OperationCanceledException(_cancellationToken)));
});
self.PropagateCompletionAndDispose(Task.FromCanceled<T>(self._cancellationToken));
}, this);
// JSSynchronizationContext also registers to _cancellationToken
_jsSynchronizationContext = JSSynchronizationContext.InstallWebWorkerInterop(false, _cancellationToken);
var childScheduler = TaskScheduler.FromCurrentSynchronizationContext();
if (_bodyRes != null)
{
_resultTask = _bodyRes();
}
else
{
_resultTask = _bodyVoid!();
}
// This code is exiting thread ThreadMain() before all promises are resolved.
// the continuation is executed by setTimeout() callback of the WebWorker thread.
_resultTask.ContinueWith(PropagateCompletionAndDispose, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, childScheduler);
_body().ContinueWith(PropagateCompletionAndDispose, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, childScheduler);
}
catch (Exception ex)
{
@ -129,7 +136,7 @@ namespace System.Runtime.InteropServices.JavaScript
}
// run actions on correct thread
private void PropagateCompletionAndDispose(Task result)
private void PropagateCompletionAndDispose(Task<T> result)
{
_resultTask = result;
@ -170,35 +177,7 @@ namespace System.Runtime.InteropServices.JavaScript
Dispose();
}
private void PropagateCompletion()
{
if (_resultTask!.IsFaulted)
{
if (_resultTask.Exception is AggregateException ag && ag.InnerException != null)
{
_taskCompletionSource.TrySetException(ag.InnerException);
}
else
{
_taskCompletionSource.TrySetException(_resultTask.Exception);
}
}
else if (_resultTask.IsCanceled)
{
_taskCompletionSource.TrySetCanceled();
}
else
{
if (_bodyRes != null)
{
_taskCompletionSource.TrySetResult(((Task<T>)_resultTask).Result);
}
else
{
_taskCompletionSource.TrySetResult(default!);
}
}
}
private void PropagateCompletion() => _taskCompletionSource.TrySetFromTask(_resultTask!);
private void Dispose(bool disposing)
{

View file

@ -49,6 +49,8 @@ namespace System.Runtime.InteropServices.JavaScript
lock (ctx)
{
PromiseHolder holder = ctx.GetPromiseHolder(slot.GCHandle);
// we want to run the continuations on the original thread which called the JSImport, so RunContinuationsAsynchronously, rather than ExecuteSynchronously
// TODO TaskCreationOptions.RunContinuationsAsynchronously
TaskCompletionSource tcs = new TaskCompletionSource(holder);
ToManagedCallback callback = (JSMarshalerArgument* arguments_buffer) =>
{
@ -98,6 +100,8 @@ namespace System.Runtime.InteropServices.JavaScript
lock (ctx)
{
var holder = ctx.GetPromiseHolder(slot.GCHandle);
// we want to run the continuations on the original thread which called the JSImport, so RunContinuationsAsynchronously, rather than ExecuteSynchronously
// TODO TaskCreationOptions.RunContinuationsAsynchronously
TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(holder);
ToManagedCallback callback = (JSMarshalerArgument* arguments_buffer) =>
{

View file

@ -1,152 +0,0 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Threading;
using System.Threading.Tasks;
namespace System.Runtime.InteropServices.JavaScript
{
/// <summary>
/// Extensions of SynchronizationContext which propagate errors and return values
/// </summary>
public static class SynchronizationContextExtension
{
public static void Send<T>(this SynchronizationContext self, Action<T> body, T value)
{
Exception? exc = default;
self.Send((_value) =>
{
try
{
body((T)_value!);
}
catch (Exception ex)
{
exc = ex;
}
}, value);
if (exc != null)
{
throw exc;
}
}
public static TRes Send<TRes>(this SynchronizationContext self, Func<TRes> body)
{
TRes? value = default;
Exception? exc = default;
self.Send((_) =>
{
try
{
value = body();
}
catch (Exception ex)
{
exc = ex;
}
}, null);
if (exc != null)
{
throw exc;
}
return value!;
}
public static Task<TRes> Post<TRes>(this SynchronizationContext self, Func<Task<TRes>> body)
{
TaskCompletionSource<TRes> tcs = new TaskCompletionSource<TRes>();
self.Post(async (_) =>
{
try
{
var value = await body().ConfigureAwait(false);
tcs.TrySetResult(value);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}, null);
return tcs.Task;
}
public static Task<TRes> Post<T1, TRes>(this SynchronizationContext? self, Func<T1, Task<TRes>> body, T1 p1)
{
if (self == null) return body(p1);
TaskCompletionSource<TRes> tcs = new TaskCompletionSource<TRes>();
self.Post(async (_) =>
{
try
{
var value = await body(p1).ConfigureAwait(false);
tcs.TrySetResult(value);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}, null);
return tcs.Task;
}
public static Task Post<T1>(this SynchronizationContext self, Func<T1, Task> body, T1 p1)
{
TaskCompletionSource tcs = new TaskCompletionSource();
self.Post(async (_) =>
{
try
{
await body(p1).ConfigureAwait(false);
tcs.TrySetResult();
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}, null);
return tcs.Task;
}
public static Task Post(this SynchronizationContext self, Func<Task> body)
{
TaskCompletionSource tcs = new TaskCompletionSource();
self.Post(async (_) =>
{
try
{
await body().ConfigureAwait(false);
tcs.TrySetResult();
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}, null);
return tcs.Task;
}
public static TRes Send<T1, TRes>(this SynchronizationContext self, Func<T1, TRes> body, T1 p1)
{
TRes? value = default;
Exception? exc = default;
self.Send((_) =>
{
try
{
value = body(p1);
}
catch (Exception ex)
{
exc = ex;
}
}, null);
if (exc != null)
{
throw exc;
}
return value!;
}
}
}

View file

@ -12,6 +12,8 @@
<DefineConstants Condition="'$(FeatureWasmThreads)' == 'true'">$(DefineConstants);FEATURE_WASM_THREADS</DefineConstants>
<!-- Use following lines to write the generated files to disk. -->
<EmitCompilerGeneratedFiles>true</EmitCompilerGeneratedFiles>
<!-- to see timing and which test aborted the runtime -->
<WasmXHarnessMonoArgs>$(WasmXHarnessMonoArgs) --setenv=XHARNESS_LOG_TEST_START=true</WasmXHarnessMonoArgs>
</PropertyGroup>
<!-- Make debugging easier -->
<PropertyGroup Condition="'$(Configuration)' == 'Debug'">

View file

@ -28,12 +28,36 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
// JS setTimeout till after JSWebWorker close
// synchronous .Wait for JS setTimeout on the same thread -> deadlock problem **7)**
public class WebWorkerTest
public class WebWorkerTest : IAsyncLifetime
{
const int TimeoutMilliseconds = 300;
const int TimeoutMilliseconds = 5000;
public static bool _isWarmupDone;
public async Task InitializeAsync()
{
if (_isWarmupDone)
{
return;
}
await Task.Delay(500);
_isWarmupDone = true;
}
public Task DisposeAsync() => Task.CompletedTask;
#region Executors
private CancellationTokenSource CreateTestCaseTimeoutSource()
{
var cts = new CancellationTokenSource(TimeoutMilliseconds);
cts.Token.Register(() =>
{
Console.WriteLine($"Unexpected test case timeout at {DateTime.Now.ToString("u")} ManagedThreadId:{Environment.CurrentManagedThreadId}");
});
return cts;
}
public static IEnumerable<object[]> GetTargetThreads()
{
return Enum.GetValues<ExecutorType>().Select(type => new object[] { new Executor(type) });
@ -55,7 +79,7 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
[Theory, MemberData(nameof(GetTargetThreads))]
public async Task Executor_Cancellation(Executor executor)
{
var cts = new CancellationTokenSource(TimeoutMilliseconds);
var cts = new CancellationTokenSource();
TaskCompletionSource ready = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var canceledTask = executor.Execute(() =>
@ -69,13 +93,13 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
cts.Cancel();
await Assert.ThrowsAsync<OperationCanceledException>(() => canceledTask);
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => canceledTask);
}
[Theory, MemberData(nameof(GetTargetThreads))]
public async Task JSDelay_Cancellation(Executor executor)
{
var cts = new CancellationTokenSource(TimeoutMilliseconds);
var cts = new CancellationTokenSource();
TaskCompletionSource ready = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var canceledTask = executor.Execute(async () =>
{
@ -90,15 +114,15 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
cts.Cancel();
await Assert.ThrowsAsync<OperationCanceledException>(() => canceledTask);
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => canceledTask);
}
[Fact]
public async Task JSSynchronizationContext_Send_Post_Items_Cancellation()
{
var cts = new CancellationTokenSource(TimeoutMilliseconds);
var cts = new CancellationTokenSource();
ManualResetEventSlim blocker=new ManualResetEventSlim(false);
ManualResetEventSlim blocker = new ManualResetEventSlim(false);
TaskCompletionSource never = new TaskCompletionSource();
SynchronizationContext capturedSynchronizationContext = null;
TaskCompletionSource jswReady = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
@ -157,7 +181,7 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
// this will unblock the current pending work item
blocker.Set();
await Assert.ThrowsAsync<OperationCanceledException>(() => canceledSend);
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => canceledSend);
await canceledPost; // this shouldn't throw
Assert.False(shouldNotHitSend);
@ -168,12 +192,12 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
[Fact]
public async Task JSSynchronizationContext_Send_Post_To_Canceled()
{
var cts = new CancellationTokenSource(TimeoutMilliseconds);
var cts = new CancellationTokenSource();
TaskCompletionSource never = new TaskCompletionSource();
SynchronizationContext capturedSynchronizationContext = null;
TaskCompletionSource jswReady = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
JSObject capturedGlobalThis=null;
JSObject capturedGlobalThis = null;
var canceledTask = JSWebWorker.RunAsync(() =>
{
@ -226,7 +250,7 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
[Fact]
public async Task JSWebWorker_Abandon_Running()
{
var cts = new CancellationTokenSource(TimeoutMilliseconds);
var cts = new CancellationTokenSource();
TaskCompletionSource never = new TaskCompletionSource();
TaskCompletionSource ready = new TaskCompletionSource();
@ -251,7 +275,7 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
[Fact]
public async Task JSWebWorker_Abandon_Running_JS()
{
var cts = new CancellationTokenSource(TimeoutMilliseconds);
var cts = new CancellationTokenSource();
TaskCompletionSource ready = new TaskCompletionSource();
@ -277,7 +301,7 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
[Theory, MemberData(nameof(GetTargetThreads))]
public async Task Executor_Propagates(Executor executor)
{
var cts = new CancellationTokenSource(TimeoutMilliseconds);
var cts = CreateTestCaseTimeoutSource();
bool hit = false;
var failedTask = executor.Execute(() =>
{
@ -297,7 +321,7 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
[Theory, MemberData(nameof(GetTargetThreads))]
public async Task ManagedConsole(Executor executor)
{
var cts = new CancellationTokenSource(TimeoutMilliseconds);
var cts = CreateTestCaseTimeoutSource();
await executor.Execute(() =>
{
Console.WriteLine("C# Hello from ManagedThreadId: " + Environment.CurrentManagedThreadId);
@ -308,7 +332,7 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
[Theory, MemberData(nameof(GetTargetThreads))]
public async Task JSConsole(Executor executor)
{
var cts = new CancellationTokenSource(TimeoutMilliseconds);
var cts = CreateTestCaseTimeoutSource();
await executor.Execute(() =>
{
WebWorkerTestHelper.Log("JS Hello from ManagedThreadId: " + Environment.CurrentManagedThreadId + " NativeThreadId: " + WebWorkerTestHelper.NativeThreadId);
@ -319,7 +343,7 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
[Theory, MemberData(nameof(GetTargetThreads))]
public async Task NativeThreadId(Executor executor)
{
var cts = new CancellationTokenSource(TimeoutMilliseconds);
var cts = CreateTestCaseTimeoutSource();
await executor.Execute(async () =>
{
await executor.StickyAwait(WebWorkerTestHelper.InitializeAsync(), cts.Token);
@ -343,7 +367,7 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
public async Task ThreadingTimer(Executor executor)
{
var hit = false;
var cts = new CancellationTokenSource(TimeoutMilliseconds);
var cts = CreateTestCaseTimeoutSource();
await executor.Execute(async () =>
{
TaskCompletionSource tcs = new TaskCompletionSource();
@ -365,7 +389,7 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
[Theory, MemberData(nameof(GetTargetThreads))]
public async Task JSDelay_ContinueWith(Executor executor)
{
var cts = new CancellationTokenSource(TimeoutMilliseconds);
var cts = CreateTestCaseTimeoutSource();
await executor.Execute(async () =>
{
await executor.StickyAwait(WebWorkerTestHelper.CreateDelay(), cts.Token);
@ -381,7 +405,7 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
[Theory, MemberData(nameof(GetTargetThreads))]
public async Task JSDelay_ConfigureAwait_True(Executor executor)
{
var cts = new CancellationTokenSource(TimeoutMilliseconds);
var cts = CreateTestCaseTimeoutSource();
await executor.Execute(async () =>
{
await executor.StickyAwait(WebWorkerTestHelper.CreateDelay(), cts.Token);
@ -396,7 +420,7 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
public async Task ManagedDelay_ContinueWith(Executor executor)
{
var hit = false;
var cts = new CancellationTokenSource(TimeoutMilliseconds);
var cts = CreateTestCaseTimeoutSource();
await executor.Execute(async () =>
{
await Task.Delay(10, cts.Token).ContinueWith(_ =>
@ -410,7 +434,7 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
[Theory, MemberData(nameof(GetTargetThreads))]
public async Task ManagedDelay_ConfigureAwait_True(Executor executor)
{
var cts = new CancellationTokenSource(TimeoutMilliseconds);
var cts = CreateTestCaseTimeoutSource();
await executor.Execute(async () =>
{
await Task.Delay(10, cts.Token).ConfigureAwait(true);
@ -422,7 +446,7 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
[Theory, MemberData(nameof(GetTargetThreads))]
public async Task ManagedYield(Executor executor)
{
var cts = new CancellationTokenSource(TimeoutMilliseconds);
var cts = CreateTestCaseTimeoutSource();
await executor.Execute(async () =>
{
await Task.Yield();
@ -474,7 +498,7 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
[Theory, MemberData(nameof(GetTargetThreads2x))]
public async Task JSObject_CapturesAffinity(Executor executor1, Executor executor2)
{
var cts = new CancellationTokenSource(TimeoutMilliseconds);
var cts = CreateTestCaseTimeoutSource();
var e1Job = async (Task e2done, TaskCompletionSource<JSObject> e1State) =>
{
@ -509,7 +533,7 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
[Theory, MemberData(nameof(GetTargetThreads))]
public async Task WebSocketClient_ContentInSameThread(Executor executor)
{
var cts = new CancellationTokenSource(TimeoutMilliseconds);
var cts = CreateTestCaseTimeoutSource();
var uri = new Uri(WebWorkerTestHelper.LocalWsEcho + "?guid=" + Guid.NewGuid());
var message = "hello";
@ -534,7 +558,7 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
[Theory, MemberData(nameof(GetTargetThreads2x))]
public Task WebSocketClient_ResponseCloseInDifferentThread(Executor executor1, Executor executor2)
{
var cts = new CancellationTokenSource(TimeoutMilliseconds);
var cts = CreateTestCaseTimeoutSource();
var uri = new Uri(WebWorkerTestHelper.LocalWsEcho + "?guid=" + Guid.NewGuid());
var message = "hello";
@ -569,7 +593,7 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
[Theory, MemberData(nameof(GetTargetThreads2x))]
public Task WebSocketClient_CancelInDifferentThread(Executor executor1, Executor executor2)
{
var cts = new CancellationTokenSource(TimeoutMilliseconds);
var cts = new CancellationTokenSource();
var uri = new Uri(WebWorkerTestHelper.LocalWsEcho + "?guid=" + Guid.NewGuid());
var message = ".delay5sec"; // this will make the loopback server slower
@ -592,7 +616,7 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
CancellationTokenSource cts2 = new CancellationTokenSource();
var resTask = client.ReceiveAsync(receive, cts2.Token);
cts2.Cancel();
var ex = await Assert.ThrowsAsync<OperationCanceledException>(() => resTask);
var ex = await Assert.ThrowsAnyAsync<OperationCanceledException>(() => resTask);
Assert.Equal(cts2.Token, ex.CancellationToken);
};
@ -600,5 +624,82 @@ namespace System.Runtime.InteropServices.JavaScript.Tests
}
#endregion
#region HTTP
[Theory, MemberData(nameof(GetTargetThreads))]
public async Task HttpClient_ContentInSameThread(Executor executor)
{
var cts = CreateTestCaseTimeoutSource();
var uri = WebWorkerTestHelper.GetOriginUrl() + "/_framework/blazor.boot.json";
await executor.Execute(async () =>
{
using var client = new HttpClient();
using var response = await client.GetAsync(uri);
response.EnsureSuccessStatusCode();
var body = await response.Content.ReadAsStringAsync();
Assert.StartsWith("{", body);
}, cts.Token);
}
private static HttpRequestOptionsKey<bool> WebAssemblyEnableStreamingRequestKey = new("WebAssemblyEnableStreamingRequest");
private static HttpRequestOptionsKey<bool> WebAssemblyEnableStreamingResponseKey = new("WebAssemblyEnableStreamingResponse");
private static string HelloJson = "{'hello':'world'}".Replace('\'', '"');
private static string EchoStart = "{\"Method\":\"POST\",\"Url\":\"/Echo.ashx";
private Task HttpClient_ActionInDifferentThread(string url, Executor executor1, Executor executor2, Func<HttpResponseMessage, Task> e2Job)
{
var cts = CreateTestCaseTimeoutSource();
var e1Job = async (Task e2done, TaskCompletionSource<HttpResponseMessage> e1State) =>
{
using var ms = new MemoryStream();
await ms.WriteAsync(Encoding.UTF8.GetBytes(HelloJson));
using var req = new HttpRequestMessage(HttpMethod.Post, url);
req.Options.Set(WebAssemblyEnableStreamingResponseKey, true);
req.Content = new StreamContent(ms);
using var client = new HttpClient();
var pr = client.SendAsync(req, HttpCompletionOption.ResponseHeadersRead);
using var response = await pr;
// share the state with the E2 continuation
e1State.SetResult(response);
await e2done;
};
return ActionsInDifferentThreads<HttpResponseMessage>(executor1, executor2, e1Job, e2Job, cts);
}
[Theory, MemberData(nameof(GetTargetThreads2x))]
public async Task HttpClient_ContentInDifferentThread(Executor executor1, Executor executor2)
{
var url = WebWorkerTestHelper.LocalHttpEcho + "?guid=" + Guid.NewGuid();
await HttpClient_ActionInDifferentThread(url, executor1, executor2, async (HttpResponseMessage response) =>
{
response.EnsureSuccessStatusCode();
var body = await response.Content.ReadAsStringAsync();
Assert.StartsWith(EchoStart, body);
});
}
[Theory, MemberData(nameof(GetTargetThreads2x))]
public async Task HttpClient_CancelInDifferentThread(Executor executor1, Executor executor2)
{
var url = WebWorkerTestHelper.LocalHttpEcho + "?delay10sec=true&guid=" + Guid.NewGuid();
await HttpClient_ActionInDifferentThread(url, executor1, executor2, async (HttpResponseMessage response) =>
{
await Assert.ThrowsAsync<TaskCanceledException>(async () =>
{
CancellationTokenSource cts = new CancellationTokenSource();
var promise = response.Content.ReadAsStringAsync(cts.Token);
cts.Cancel();
await promise;
});
});
}
#endregion
}
}

View file

@ -604,9 +604,9 @@
<!-- this sample is messy sandbox right now
<SmokeTestProject Include="$(MonoProjectRoot)sample\wasm\browser-threads-minimal\Wasm.Browser.Threads.Minimal.Sample.csproj" />
-->
<!-- ActiveIssue https://github.com/dotnet/runtime/issues/96628
<SmokeTestProject Include="$(MSBuildThisFileDirectory)System.Net.WebSockets.Client\tests\System.Net.WebSockets.Client.Tests.csproj" />
<SmokeTestProject Include="$(MSBuildThisFileDirectory)System.Runtime.InteropServices.JavaScript\tests\System.Runtime.InteropServices.JavaScript.UnitTests\System.Runtime.InteropServices.JavaScript.Tests.csproj" />
<!-- ActiveIssue https://github.com/dotnet/runtime/issues/88084
<SmokeTestProject Include="$(MSBuildThisFileDirectory)System.Net.Http\tests\FunctionalTests\System.Net.Http.Functional.Tests.csproj" />
-->
</ItemGroup>

View file

@ -336,7 +336,11 @@ type AssetBehaviors = SingleAssetBehaviors |
/**
* The javascript module for threads.
*/
| "symbols";
| "symbols"
/**
* Load segmentation rules file for Hybrid Globalization.
*/
| "segmentation-rules";
declare const enum GlobalizationMode {
/**
* Load sharded ICU data.

View file

@ -4,7 +4,7 @@
import { mono_wasm_cancel_promise } from "./cancelable-promise";
import cwraps, { profiler_c_functions } from "./cwraps";
import { mono_wasm_send_dbg_command_with_parms, mono_wasm_send_dbg_command, mono_wasm_get_dbg_command_info, mono_wasm_get_details, mono_wasm_release_object, mono_wasm_call_function_on, mono_wasm_debugger_resume, mono_wasm_detach_debugger, mono_wasm_raise_debug_event, mono_wasm_change_debugger_log_level, mono_wasm_debugger_attached } from "./debug";
import { http_wasm_supports_streaming_request, http_wasm_supports_streaming_response, http_wasm_create_abort_controler, http_wasm_abort_request, http_wasm_abort_response, http_wasm_create_transform_stream, http_wasm_transform_stream_write, http_wasm_transform_stream_close, http_wasm_transform_stream_abort, http_wasm_fetch, http_wasm_fetch_stream, http_wasm_fetch_bytes, http_wasm_get_response_header_names, http_wasm_get_response_header_values, http_wasm_get_response_bytes, http_wasm_get_response_length, http_wasm_get_streamed_response_bytes } from "./http";
import { http_wasm_supports_streaming_request, http_wasm_supports_streaming_response, http_wasm_create_controller, http_wasm_abort_request, http_wasm_abort_response, http_wasm_transform_stream_write, http_wasm_transform_stream_close, http_wasm_fetch, http_wasm_fetch_stream, http_wasm_fetch_bytes, http_wasm_get_response_header_names, http_wasm_get_response_header_values, http_wasm_get_response_bytes, http_wasm_get_response_length, http_wasm_get_streamed_response_bytes, http_wasm_get_response_type, http_wasm_get_response_status } from "./http";
import { exportedRuntimeAPI, Module, runtimeHelpers } from "./globals";
import { get_property, set_property, has_property, get_typeof_property, get_global_this, dynamic_import } from "./invoke-js";
import { mono_wasm_stringify_as_error_with_stack } from "./logging";
@ -71,13 +71,13 @@ export function export_internal(): any {
// BrowserHttpHandler
http_wasm_supports_streaming_request,
http_wasm_supports_streaming_response,
http_wasm_create_abort_controler,
http_wasm_create_controller,
http_wasm_get_response_type,
http_wasm_get_response_status,
http_wasm_abort_request,
http_wasm_abort_response,
http_wasm_create_transform_stream,
http_wasm_transform_stream_write,
http_wasm_transform_stream_close,
http_wasm_transform_stream_abort,
http_wasm_fetch,
http_wasm_fetch_stream,
http_wasm_fetch_bytes,

View file

@ -1,6 +1,8 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
import BuildConfiguration from "consts:configuration";
import { wrap_as_cancelable_promise } from "./cancelable-promise";
import { ENVIRONMENT_IS_NODE, Module, loaderHelpers, mono_assert } from "./globals";
import { assert_js_interop } from "./invoke-js";
@ -18,6 +20,11 @@ function verifyEnvironment() {
}
}
function commonAsserts(controller: HttpController) {
assert_js_interop();
mono_assert(controller, "expected controller");
}
export function http_wasm_supports_streaming_request(): boolean {
// Detecting streaming request support works like this:
// If the browser doesn't support a particular body type, it calls toString() on the object and uses the result as the body.
@ -45,19 +52,27 @@ export function http_wasm_supports_streaming_response(): boolean {
return typeof Response !== "undefined" && "body" in Response.prototype && typeof ReadableStream === "function";
}
export function http_wasm_create_abort_controler(): AbortController {
export function http_wasm_create_controller(): HttpController {
verifyEnvironment();
return new AbortController();
assert_js_interop();
const controller: HttpController = {
abortController: new AbortController()
};
return controller;
}
export function http_wasm_abort_request(abort_controller: AbortController): void {
abort_controller.abort();
export function http_wasm_abort_request(controller: HttpController): void {
if (controller.streamWriter) {
controller.streamWriter.abort();
}
http_wasm_abort_response(controller);
}
export function http_wasm_abort_response(res: ResponseExtension): void {
res.__abort_controller.abort();
if (res.__reader) {
res.__reader.cancel().catch((err) => {
export function http_wasm_abort_response(controller: HttpController): void {
if (BuildConfiguration === "Debug") commonAsserts(controller);
controller.abortController.abort();
if (controller.streamReader) {
controller.streamReader.cancel().catch((err) => {
if (err && err.name !== "AbortError") {
Module.err("Error in http_wasm_abort_response: " + err);
}
@ -66,57 +81,56 @@ export function http_wasm_abort_response(res: ResponseExtension): void {
}
}
export function http_wasm_create_transform_stream(): TransformStreamExtension {
const transform_stream = new TransformStream<Uint8Array, Uint8Array>() as TransformStreamExtension;
transform_stream.__writer = transform_stream.writable.getWriter();
return transform_stream;
}
export function http_wasm_transform_stream_write(ts: TransformStreamExtension, bufferPtr: VoidPtr, bufferLength: number): ControllablePromise<void> {
export function http_wasm_transform_stream_write(controller: HttpController, bufferPtr: VoidPtr, bufferLength: number): ControllablePromise<void> {
if (BuildConfiguration === "Debug") commonAsserts(controller);
mono_assert(bufferLength > 0, "expected bufferLength > 0");
// the bufferPtr is pinned by the caller
const view = new Span(bufferPtr, bufferLength, MemoryViewType.Byte);
const copy = view.slice() as Uint8Array;
return wrap_as_cancelable_promise(async () => {
mono_assert(ts.__fetch_promise, "expected fetch promise");
mono_assert(controller.streamWriter, "expected streamWriter");
mono_assert(controller.responsePromise, "expected fetch promise");
// race with fetch because fetch does not cancel the ReadableStream see https://bugs.chromium.org/p/chromium/issues/detail?id=1480250
await Promise.race([ts.__writer.ready, ts.__fetch_promise]);
await Promise.race([ts.__writer.write(copy), ts.__fetch_promise]);
await Promise.race([controller.streamWriter.ready, controller.responsePromise]);
await Promise.race([controller.streamWriter.write(copy), controller.responsePromise]);
});
}
export function http_wasm_transform_stream_close(ts: TransformStreamExtension): ControllablePromise<void> {
export function http_wasm_transform_stream_close(controller: HttpController): ControllablePromise<void> {
mono_assert(controller, "expected controller");
return wrap_as_cancelable_promise(async () => {
mono_assert(ts.__fetch_promise, "expected fetch promise");
mono_assert(controller.streamWriter, "expected streamWriter");
mono_assert(controller.responsePromise, "expected fetch promise");
// race with fetch because fetch does not cancel the ReadableStream see https://bugs.chromium.org/p/chromium/issues/detail?id=1480250
await Promise.race([ts.__writer.ready, ts.__fetch_promise]);
await Promise.race([ts.__writer.close(), ts.__fetch_promise]);
await Promise.race([controller.streamWriter.ready, controller.responsePromise]);
await Promise.race([controller.streamWriter.close(), controller.responsePromise]);
});
}
export function http_wasm_transform_stream_abort(ts: TransformStreamExtension): void {
ts.__writer.abort();
}
export function http_wasm_fetch_stream(url: string, header_names: string[], header_values: string[], option_names: string[], option_values: any[], abort_controller: AbortController, body: TransformStreamExtension): ControllablePromise<ResponseExtension> {
const fetch_promise = http_wasm_fetch(url, header_names, header_values, option_names, option_values, abort_controller, body.readable);
body.__fetch_promise = fetch_promise;
export function http_wasm_fetch_stream(controller: HttpController, url: string, header_names: string[], header_values: string[], option_names: string[], option_values: any[]): ControllablePromise<void> {
if (BuildConfiguration === "Debug") commonAsserts(controller);
const transformStream = new TransformStream<Uint8Array, Uint8Array>();
controller.streamWriter = transformStream.writable.getWriter();
const fetch_promise = http_wasm_fetch(controller, url, header_names, header_values, option_names, option_values, transformStream.readable);
return fetch_promise;
}
export function http_wasm_fetch_bytes(url: string, header_names: string[], header_values: string[], option_names: string[], option_values: any[], abort_controller: AbortController, bodyPtr: VoidPtr, bodyLength: number): ControllablePromise<ResponseExtension> {
export function http_wasm_fetch_bytes(controller: HttpController, url: string, header_names: string[], header_values: string[], option_names: string[], option_values: any[], bodyPtr: VoidPtr, bodyLength: number): ControllablePromise<void> {
if (BuildConfiguration === "Debug") commonAsserts(controller);
// the bodyPtr is pinned by the caller
const view = new Span(bodyPtr, bodyLength, MemoryViewType.Byte);
const copy = view.slice() as Uint8Array;
return http_wasm_fetch(url, header_names, header_values, option_names, option_values, abort_controller, copy);
return http_wasm_fetch(controller, url, header_names, header_values, option_names, option_values, copy);
}
export function http_wasm_fetch(url: string, header_names: string[], header_values: string[], option_names: string[], option_values: any[], abort_controller: AbortController, body: Uint8Array | ReadableStream | null): ControllablePromise<ResponseExtension> {
export function http_wasm_fetch(controller: HttpController, url: string, header_names: string[], header_values: string[], option_names: string[], option_values: any[], body: Uint8Array | ReadableStream | null): ControllablePromise<void> {
if (BuildConfiguration === "Debug") commonAsserts(controller);
verifyEnvironment();
assert_js_interop();
mono_assert(url && typeof url === "string", "expected url string");
mono_assert(header_names && header_values && Array.isArray(header_names) && Array.isArray(header_values) && header_names.length === header_values.length, "expected headerNames and headerValues arrays");
mono_assert(option_names && option_values && Array.isArray(option_names) && Array.isArray(option_values) && option_names.length === option_values.length, "expected headerNames and headerValues arrays");
const headers = new Headers();
for (let i = 0; i < header_names.length; i++) {
headers.append(header_names[i], header_values[i]);
@ -124,7 +138,7 @@ export function http_wasm_fetch(url: string, header_names: string[], header_valu
const options: any = {
body,
headers,
signal: abort_controller.signal
signal: controller.abortController.signal
};
if (typeof ReadableStream !== "undefined" && body instanceof ReadableStream) {
options.duplex = "half";
@ -132,101 +146,125 @@ export function http_wasm_fetch(url: string, header_names: string[], header_valu
for (let i = 0; i < option_names.length; i++) {
options[option_names[i]] = option_values[i];
}
return wrap_as_cancelable_promise(async () => {
const res = await loaderHelpers.fetch_like(url, options) as ResponseExtension;
res.__abort_controller = abort_controller;
return res;
// make the fetch cancellable
controller.responsePromise = wrap_as_cancelable_promise(() => {
return loaderHelpers.fetch_like(url, options);
});
}
function get_response_headers(res: ResponseExtension): void {
if (!res.__headerNames) {
res.__headerNames = [];
res.__headerValues = [];
// avoid processing headers if the fetch is cancelled
controller.responsePromise.then((res: Response) => {
controller.response = res;
controller.responseHeaderNames = [];
controller.responseHeaderValues = [];
if (res.headers && (<any>res.headers).entries) {
const entries: Iterable<string[]> = (<any>res.headers).entries();
for (const pair of entries) {
res.__headerNames.push(pair[0]);
res.__headerValues.push(pair[1]);
controller.responseHeaderNames.push(pair[0]);
controller.responseHeaderValues.push(pair[1]);
}
}
}
}).catch(() => {
// ignore
});
return controller.responsePromise;
}
export function http_wasm_get_response_header_names(res: ResponseExtension): string[] {
get_response_headers(res);
return res.__headerNames;
export function http_wasm_get_response_type(controller: HttpController): string | undefined {
if (BuildConfiguration === "Debug") commonAsserts(controller);
return controller.response?.type;
}
export function http_wasm_get_response_header_values(res: ResponseExtension): string[] {
get_response_headers(res);
return res.__headerValues;
export function http_wasm_get_response_status(controller: HttpController): number {
if (BuildConfiguration === "Debug") commonAsserts(controller);
return controller.response?.status ?? 0;
}
export function http_wasm_get_response_length(res: ResponseExtension): ControllablePromise<number> {
export function http_wasm_get_response_header_names(controller: HttpController): string[] {
if (BuildConfiguration === "Debug") commonAsserts(controller);
mono_assert(controller.responseHeaderNames, "expected responseHeaderNames");
return controller.responseHeaderNames;
}
export function http_wasm_get_response_header_values(controller: HttpController): string[] {
if (BuildConfiguration === "Debug") commonAsserts(controller);
mono_assert(controller.responseHeaderValues, "expected responseHeaderValues");
return controller.responseHeaderValues;
}
export function http_wasm_get_response_length(controller: HttpController): ControllablePromise<number> {
if (BuildConfiguration === "Debug") commonAsserts(controller);
return wrap_as_cancelable_promise(async () => {
const buffer = await res.arrayBuffer();
res.__buffer = buffer;
res.__source_offset = 0;
const buffer = await controller.response!.arrayBuffer();
controller.responseBuffer = buffer;
controller.currentBufferOffset = 0;
return buffer.byteLength;
});
}
export function http_wasm_get_response_bytes(res: ResponseExtension, view: Span): number {
mono_assert(res.__buffer, "expected resoved arrayBuffer");
if (res.__source_offset == res.__buffer!.byteLength) {
export function http_wasm_get_response_bytes(controller: HttpController, view: Span): number {
mono_assert(controller, "expected controller");
mono_assert(controller.responseBuffer, "expected resoved arrayBuffer");
mono_assert(controller.currentBufferOffset != undefined, "expected currentBufferOffset");
if (controller.currentBufferOffset == controller.responseBuffer!.byteLength) {
return 0;
}
const source_view = new Uint8Array(res.__buffer!, res.__source_offset);
const source_view = new Uint8Array(controller.responseBuffer!, controller.currentBufferOffset);
view.set(source_view, 0);
const bytes_read = Math.min(view.byteLength, source_view.byteLength);
res.__source_offset += bytes_read;
controller.currentBufferOffset += bytes_read;
return bytes_read;
}
export function http_wasm_get_streamed_response_bytes(res: ResponseExtension, bufferPtr: VoidPtr, bufferLength: number): ControllablePromise<number> {
export function http_wasm_get_streamed_response_bytes(controller: HttpController, bufferPtr: VoidPtr, bufferLength: number): ControllablePromise<number> {
if (BuildConfiguration === "Debug") commonAsserts(controller);
// the bufferPtr is pinned by the caller
const view = new Span(bufferPtr, bufferLength, MemoryViewType.Byte);
return wrap_as_cancelable_promise(async () => {
if (!res.__reader) {
res.__reader = res.body!.getReader();
mono_assert(controller.response, "expected response");
if (!controller.streamReader) {
controller.streamReader = controller.response.body!.getReader();
}
if (!res.__chunk) {
res.__chunk = await res.__reader.read();
res.__source_offset = 0;
if (!controller.currentStreamReaderChunk || controller.currentBufferOffset === undefined) {
controller.currentStreamReaderChunk = await controller.streamReader.read();
controller.currentBufferOffset = 0;
}
if (res.__chunk.done) {
if (controller.currentStreamReaderChunk.done) {
return 0;
}
const remaining_source = res.__chunk.value.byteLength - res.__source_offset;
const remaining_source = controller.currentStreamReaderChunk.value.byteLength - controller.currentBufferOffset;
mono_assert(remaining_source > 0, "expected remaining_source to be greater than 0");
const bytes_copied = Math.min(remaining_source, view.byteLength);
const source_view = res.__chunk.value.subarray(res.__source_offset, res.__source_offset + bytes_copied);
const source_view = controller.currentStreamReaderChunk.value.subarray(controller.currentBufferOffset, controller.currentBufferOffset + bytes_copied);
view.set(source_view, 0);
res.__source_offset += bytes_copied;
controller.currentBufferOffset += bytes_copied;
if (remaining_source == bytes_copied) {
res.__chunk = undefined;
controller.currentStreamReaderChunk = undefined;
}
return bytes_copied;
});
}
interface TransformStreamExtension extends TransformStream<Uint8Array, Uint8Array> {
__writer: WritableStreamDefaultWriter<Uint8Array>
__fetch_promise?: Promise<ResponseExtension>
}
interface HttpController {
abortController: AbortController
interface ResponseExtension extends Response {
__buffer?: ArrayBuffer
__reader?: ReadableStreamDefaultReader<Uint8Array>
__chunk?: ReadableStreamReadResult<Uint8Array>
__source_offset: number
__abort_controller: AbortController
__headerNames: string[];
__headerValues: string[];
// streaming request
streamReader?: ReadableStreamDefaultReader<Uint8Array>
// response
responsePromise?: ControllablePromise<any>
response?: Response
responseHeaderNames?: string[];
responseHeaderValues?: string[];
currentBufferOffset?: number
// non-streaming response
responseBuffer?: ArrayBuffer
// streaming response
streamWriter?: WritableStreamDefaultWriter<Uint8Array>
currentStreamReaderChunk?: ReadableStreamReadResult<Uint8Array>
}

View file

@ -98,13 +98,13 @@ export function setup_proxy_console(id: string, console: Console, origin: string
...console
};
setupWS();
const consoleUrl = `${origin}/console`.replace("https://", "wss://").replace("http://", "ws://");
consoleWebSocket = new WebSocket(consoleUrl);
consoleWebSocket.addEventListener("error", logWSError);
consoleWebSocket.addEventListener("close", logWSClose);
setupWS();
}
export function teardown_proxy_console(message?: string) {
@ -135,7 +135,7 @@ export function teardown_proxy_console(message?: string) {
}
function send(msg: string) {
if (consoleWebSocket.readyState === WebSocket.OPEN) {
if (consoleWebSocket && consoleWebSocket.readyState === WebSocket.OPEN) {
consoleWebSocket.send(msg);
}
else {