diff --git a/doc/api/errors.md b/doc/api/errors.md index 71cae6300b26ed..74b8eb35855917 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -2890,6 +2890,17 @@ The WASI instance has already started. The WASI instance has not been started. + + +### `ERR_WEBASSEMBLY_RESPONSE` + + + +The `Response` that has been passed to `WebAssembly.compileStreaming` or to +`WebAssembly.instantiateStreaming` is not a valid WebAssembly response. + ### `ERR_WORKER_INIT_FAILED` diff --git a/lib/internal/bootstrap/pre_execution.js b/lib/internal/bootstrap/pre_execution.js index b64dfaf980fd92..e1b882b6dbe744 100644 --- a/lib/internal/bootstrap/pre_execution.js +++ b/lib/internal/bootstrap/pre_execution.js @@ -5,6 +5,7 @@ const { ObjectDefineProperties, ObjectDefineProperty, ObjectGetOwnPropertyDescriptor, + PromiseResolve, SafeMap, SafeWeakMap, StringPrototypeStartsWith, @@ -24,7 +25,11 @@ const { } = require('internal/util'); const { Buffer } = require('buffer'); -const { ERR_MANIFEST_ASSERT_INTEGRITY } = require('internal/errors').codes; +const { + ERR_INVALID_ARG_TYPE, + ERR_MANIFEST_ASSERT_INTEGRITY, + ERR_WEBASSEMBLY_RESPONSE, +} = require('internal/errors').codes; const assert = require('internal/assert'); function prepareMainThreadExecution(expandArgv1 = false, @@ -215,6 +220,44 @@ function setupFetch() { Request: lazyInterface('Request'), Response: lazyInterface('Response'), }); + + // The WebAssembly Web API: https://webassembly.github.io/spec/web-api + internalBinding('wasm_web_api').setImplementation((streamState, source) => { + (async () => { + const response = await PromiseResolve(source); + if (!(response instanceof lazyUndici().Response)) { + throw new ERR_INVALID_ARG_TYPE( + 'source', ['Response', 'Promise resolving to Response'], response); + } + + const contentType = response.headers.get('Content-Type'); + if (contentType !== 'application/wasm') { + throw new ERR_WEBASSEMBLY_RESPONSE( + `has unsupported MIME type '${contentType}'`); + } + + if (!response.ok) { + throw new ERR_WEBASSEMBLY_RESPONSE( + `has status code ${response.status}`); + } + + if (response.bodyUsed !== false) { + throw new ERR_WEBASSEMBLY_RESPONSE('body has already been used'); + } + + // Pass all data from the response body to the WebAssembly compiler. + for await (const chunk of response.body) { + streamState.push(chunk); + } + })().then(() => { + // No error occurred. Tell the implementation that the stream has ended. + streamState.finish(); + }, (err) => { + // An error occurred, either because the given object was not a valid + // and usable Response or because a network error occurred. + streamState.abort(err); + }); + }); } // TODO(aduh95): move this to internal/bootstrap/browser when the CLI flag is diff --git a/lib/internal/errors.js b/lib/internal/errors.js index a5c64080a59aae..80130d28a07805 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -1645,6 +1645,7 @@ E('ERR_VM_MODULE_NOT_MODULE', 'Provided module is not an instance of Module', Error); E('ERR_VM_MODULE_STATUS', 'Module status %s', Error); E('ERR_WASI_ALREADY_STARTED', 'WASI instance has already started', Error); +E('ERR_WEBASSEMBLY_RESPONSE', 'WebAssembly response %s', TypeError); E('ERR_WORKER_INIT_FAILED', 'Worker initialization failure: %s', Error); E('ERR_WORKER_INVALID_EXEC_ARGV', (errors, msg = 'invalid execArgv flags') => `Initiated Worker with ${msg}: ${ArrayPrototypeJoin(errors, ', ')}`, diff --git a/node.gyp b/node.gyp index 0a6eca012daad1..7f44b833e88380 100644 --- a/node.gyp +++ b/node.gyp @@ -543,6 +543,7 @@ 'src/node_util.cc', 'src/node_v8.cc', 'src/node_wasi.cc', + 'src/node_wasm_web_api.cc', 'src/node_watchdog.cc', 'src/node_worker.cc', 'src/node_zlib.cc', diff --git a/src/api/environment.cc b/src/api/environment.cc index 97261256858403..9164ec5a923224 100644 --- a/src/api/environment.cc +++ b/src/api/environment.cc @@ -5,6 +5,7 @@ #include "node_native_module_env.h" #include "node_platform.h" #include "node_v8_platform-inl.h" +#include "node_wasm_web_api.h" #include "uv.h" #if HAVE_INSPECTOR @@ -252,6 +253,12 @@ void SetIsolateMiscHandlers(v8::Isolate* isolate, const IsolateSettings& s) { s.allow_wasm_code_generation_callback : AllowWasmCodeGenerationCallback; isolate->SetAllowWasmCodeGenerationCallback(allow_wasm_codegen_cb); + Mutex::ScopedLock lock(node::per_process::cli_options_mutex); + if (per_process::cli_options->get_per_isolate_options()->get_per_env_options() + ->experimental_fetch) { + isolate->SetWasmStreamingCallback(wasm_web_api::StartStreamingCompilation); + } + if ((s.flags & SHOULD_NOT_SET_PROMISE_REJECTION_CALLBACK) == 0) { auto* promise_reject_cb = s.promise_reject_callback ? s.promise_reject_callback : PromiseRejectCallback; diff --git a/src/env.h b/src/env.h index efaeb53fbc7599..bb734cc3c8cbed 100644 --- a/src/env.h +++ b/src/env.h @@ -552,7 +552,9 @@ constexpr size_t kFsStatsBufferLength = V(tls_wrap_constructor_function, v8::Function) \ V(trace_category_state_function, v8::Function) \ V(udp_constructor_function, v8::Function) \ - V(url_constructor_function, v8::Function) + V(url_constructor_function, v8::Function) \ + V(wasm_streaming_compilation_impl, v8::Function) \ + V(wasm_streaming_object_constructor, v8::Function) \ class Environment; struct AllocatedBuffer; diff --git a/src/node_binding.cc b/src/node_binding.cc index 29b9ccdaed8b10..2991ee34746e0f 100644 --- a/src/node_binding.cc +++ b/src/node_binding.cc @@ -87,6 +87,7 @@ V(uv) \ V(v8) \ V(wasi) \ + V(wasm_web_api) \ V(watchdog) \ V(worker) \ V(zlib) diff --git a/src/node_wasm_web_api.cc b/src/node_wasm_web_api.cc new file mode 100644 index 00000000000000..d54f3530b78d08 --- /dev/null +++ b/src/node_wasm_web_api.cc @@ -0,0 +1,178 @@ +#include "node_wasm_web_api.h" + +#include "node_errors.h" + +namespace node { +namespace wasm_web_api { + +v8::Local WasmStreamingObject::Initialize(Environment* env) { + v8::Local templ = env->wasm_streaming_object_constructor(); + if (!templ.IsEmpty()) { + return templ; + } + + v8::Local t = env->NewFunctionTemplate(New); + t->Inherit(BaseObject::GetConstructorTemplate(env)); + t->InstanceTemplate()->SetInternalFieldCount( + WasmStreamingObject::kInternalFieldCount); + + env->SetProtoMethod(t, "push", Push); + env->SetProtoMethod(t, "finish", Finish); + env->SetProtoMethod(t, "abort", Abort); + + auto function = t->GetFunction(env->context()).ToLocalChecked(); + env->set_wasm_streaming_object_constructor(function); + return function; +} + +void WasmStreamingObject::RegisterExternalReferences( + ExternalReferenceRegistry* registry) { + registry->Register(Push); + registry->Register(Finish); + registry->Register(Abort); +} + +v8::MaybeLocal WasmStreamingObject::Create( + Environment* env, std::shared_ptr streaming) { + v8::Local ctor = Initialize(env); + v8::Local obj; + if (!ctor->NewInstance(env->context(), 0, nullptr).ToLocal(&obj)) { + return v8::MaybeLocal(); + } + + CHECK(streaming); + + WasmStreamingObject* ptr = Unwrap(obj); + CHECK_NOT_NULL(ptr); + ptr->streaming_ = streaming; + return obj; +} + +void WasmStreamingObject::New(const v8::FunctionCallbackInfo& args) { + CHECK(args.IsConstructCall()); + Environment* env = Environment::GetCurrent(args); + new WasmStreamingObject(env, args.This()); +} + +void WasmStreamingObject::Push( + const v8::FunctionCallbackInfo& args) { + WasmStreamingObject* obj; + ASSIGN_OR_RETURN_UNWRAP(&obj, args.Holder()); + CHECK(obj->streaming_); + + CHECK_EQ(args.Length(), 1); + v8::Local chunk = args[0]; + + // The start of the memory section backing the ArrayBuffer(View), the offset + // of the ArrayBuffer(View) within the memory section, and its size in bytes. + const void* bytes; + size_t offset; + size_t size; + + if (LIKELY(chunk->IsArrayBufferView())) { + v8::Local view = chunk.As(); + bytes = view->Buffer()->GetBackingStore()->Data(); + offset = view->ByteOffset(); + size = view->ByteLength(); + } else if (LIKELY(chunk->IsArrayBuffer())) { + v8::Local buffer = chunk.As(); + bytes = buffer->GetBackingStore()->Data(); + offset = 0; + size = buffer->ByteLength(); + } else { + return node::THROW_ERR_INVALID_ARG_TYPE( + Environment::GetCurrent(args), + "chunk must be an ArrayBufferView or an ArrayBuffer"); + } + + // Forward the data to V8. Internally, V8 will make a copy. + obj->streaming_->OnBytesReceived( + static_cast(bytes) + offset, size); +} + +void WasmStreamingObject::Finish( + const v8::FunctionCallbackInfo& args) { + WasmStreamingObject* obj; + ASSIGN_OR_RETURN_UNWRAP(&obj, args.Holder()); + CHECK(obj->streaming_); + + CHECK_EQ(args.Length(), 0); + obj->streaming_->Finish(); +} + +void WasmStreamingObject::Abort( + const v8::FunctionCallbackInfo& args) { + WasmStreamingObject* obj; + ASSIGN_OR_RETURN_UNWRAP(&obj, args.Holder()); + CHECK(obj->streaming_); + + CHECK_EQ(args.Length(), 1); + obj->streaming_->Abort(args[0]); +} + +void StartStreamingCompilation( + const v8::FunctionCallbackInfo& info) { + // V8 passes an instance of v8::WasmStreaming to this callback, which we can + // use to pass the WebAssembly module bytes to V8 as we receive them. + // Unfortunately, our fetch() implementation is a JavaScript dependency, so it + // is difficult to implement the required logic here. Instead, we create a + // a WasmStreamingObject that encapsulates v8::WasmStreaming and that we can + // pass to the JavaScript implementation. The JavaScript implementation can + // then push() bytes from the Response and eventually either finish() or + // abort() the operation. + + // Create the wrapper object. + std::shared_ptr streaming = + v8::WasmStreaming::Unpack(info.GetIsolate(), info.Data()); + Environment* env = Environment::GetCurrent(info); + v8::Local obj; + if (!WasmStreamingObject::Create(env, streaming).ToLocal(&obj)) { + // A JavaScript exception is pending. Let V8 deal with it. + return; + } + + // V8 always passes one argument to this callback. + CHECK_EQ(info.Length(), 1); + + // Prepare the JavaScript implementation for invocation. We will pass the + // WasmStreamingObject as the first argument, followed by the argument that we + // received from V8, i.e., the first argument passed to compileStreaming (or + // instantiateStreaming). + v8::Local impl = env->wasm_streaming_compilation_impl(); + CHECK(!impl.IsEmpty()); + v8::Local args[] = { obj, info[0] }; + + // Hand control to the JavaScript implementation. It should never throw an + // error, but if it does, we leave it to the calling V8 code to handle that + // gracefully. Otherwise, we assert that the JavaScript function does not + // return anything. + v8::MaybeLocal maybe_ret = + impl->Call(env->context(), info.This(), 2, args); + v8::Local ret; + CHECK_IMPLIES(maybe_ret.ToLocal(&ret), ret->IsUndefined()); +} + +// Called once by JavaScript during initialization. +void SetImplementation(const v8::FunctionCallbackInfo& info) { + Environment* env = Environment::GetCurrent(info); + env->set_wasm_streaming_compilation_impl(info[0].As()); +} + +void Initialize(v8::Local target, + v8::Local, + v8::Local context, + void*) { + Environment* env = Environment::GetCurrent(context); + env->SetMethod(target, "setImplementation", SetImplementation); +} + +void RegisterExternalReferences(ExternalReferenceRegistry* registry) { + registry->Register(SetImplementation); +} + +} // namespace wasm_web_api +} // namespace node + +NODE_MODULE_CONTEXT_AWARE_INTERNAL(wasm_web_api, node::wasm_web_api::Initialize) +NODE_MODULE_EXTERNAL_REFERENCE(wasm_web_api, + node::wasm_web_api::RegisterExternalReferences) diff --git a/src/node_wasm_web_api.h b/src/node_wasm_web_api.h new file mode 100644 index 00000000000000..d2ffe817380bdd --- /dev/null +++ b/src/node_wasm_web_api.h @@ -0,0 +1,54 @@ +#ifndef SRC_NODE_WASM_WEB_API_H_ +#define SRC_NODE_WASM_WEB_API_H_ + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#include "base_object-inl.h" +#include "v8.h" + +namespace node { +namespace wasm_web_api { + +// Wrapper for interacting with a v8::WasmStreaming instance from JavaScript. +class WasmStreamingObject final : public BaseObject { + public: + static v8::Local Initialize(Environment* env); + + static void RegisterExternalReferences(ExternalReferenceRegistry* registry); + + void MemoryInfo(MemoryTracker* tracker) const override {} + SET_MEMORY_INFO_NAME(WasmStreamingObject) + SET_SELF_SIZE(WasmStreamingObject) + + static v8::MaybeLocal Create( + Environment* env, std::shared_ptr streaming); + + private: + WasmStreamingObject(Environment* env, v8::Local object) + : BaseObject(env, object) { + MakeWeak(); + } + + ~WasmStreamingObject() override {} + + private: + static void New(const v8::FunctionCallbackInfo& args); + static void Push(const v8::FunctionCallbackInfo& args); + static void Finish(const v8::FunctionCallbackInfo& args); + static void Abort(const v8::FunctionCallbackInfo& args); + + std::shared_ptr streaming_; +}; + +// This is a v8::WasmStreamingCallback implementation that must be passed to +// v8::Isolate::SetWasmStreamingCallback when setting up the isolate in order to +// enable the WebAssembly.(compile|instantiate)Streaming APIs. +void StartStreamingCompilation( + const v8::FunctionCallbackInfo& args); + +} // namespace wasm_web_api +} // namespace node + +#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#endif // SRC_NODE_WASM_WEB_API_H_ diff --git a/test/parallel/test-bootstrap-modules.js b/test/parallel/test-bootstrap-modules.js index d5f69e8165f61c..d5bf6e133ebc99 100644 --- a/test/parallel/test-bootstrap-modules.js +++ b/test/parallel/test-bootstrap-modules.js @@ -42,6 +42,7 @@ const expectedModules = new Set([ 'Internal Binding util', 'Internal Binding uv', 'Internal Binding v8', + 'Internal Binding wasm_web_api', 'Internal Binding worker', 'NativeModule buffer', 'NativeModule events', diff --git a/test/parallel/test-fetch-disabled.mjs b/test/parallel/test-fetch-disabled.mjs index f06d484701c3ec..ea6b6807d8dbb5 100644 --- a/test/parallel/test-fetch-disabled.mjs +++ b/test/parallel/test-fetch-disabled.mjs @@ -8,3 +8,6 @@ assert.strictEqual(typeof globalThis.FormData, 'undefined'); assert.strictEqual(typeof globalThis.Headers, 'undefined'); assert.strictEqual(typeof globalThis.Request, 'undefined'); assert.strictEqual(typeof globalThis.Response, 'undefined'); + +assert.strictEqual(typeof WebAssembly.compileStreaming, 'undefined'); +assert.strictEqual(typeof WebAssembly.instantiateStreaming, 'undefined'); diff --git a/test/parallel/test-wasm-web-api.js b/test/parallel/test-wasm-web-api.js new file mode 100644 index 00000000000000..3d3f7e5eaa4dee --- /dev/null +++ b/test/parallel/test-wasm-web-api.js @@ -0,0 +1,225 @@ +'use strict'; + +const common = require('../common'); +const fixtures = require('../common/fixtures'); + +const assert = require('assert'); +const events = require('events'); +const fs = require('fs/promises'); +const { createServer } = require('http'); + +assert.strictEqual(typeof WebAssembly.compileStreaming, 'function'); +assert.strictEqual(typeof WebAssembly.instantiateStreaming, 'function'); + +const simpleWasmBytes = fixtures.readSync('simple.wasm'); + +// Sets up an HTTP server with the given response handler and calls fetch() to +// obtain a Response from the newly created server. +async function testRequest(handler) { + const server = createServer((_, res) => handler(res)).unref().listen(0); + await events.once(server, 'listening'); + const { port } = server.address(); + return fetch(`http://127.0.0.1:${port}/`); +} + +// Runs the given function both with the promise itself and as a continuation +// of the promise. We use this to test that the API accepts not just a Response +// but also a Promise that resolves to a Response. +function withPromiseAndResolved(makePromise, consume) { + return Promise.all([ + consume(makePromise()), + makePromise().then(consume), + ]); +} + +// The makeResponsePromise function must return a Promise that resolves to a +// Response. The checkResult function receives the Promise returned by +// WebAssembly.compileStreaming and must return a Promise itself. +function testCompileStreaming(makeResponsePromise, checkResult) { + return withPromiseAndResolved( + common.mustCall(makeResponsePromise, 2), + common.mustCall((response) => { + return checkResult(WebAssembly.compileStreaming(response)); + }, 2) + ); +} + +function testCompileStreamingSuccess(makeResponsePromise) { + return testCompileStreaming(makeResponsePromise, async (modPromise) => { + const mod = await modPromise; + assert.strictEqual(mod.constructor, WebAssembly.Module); + }); +} + +function testCompileStreamingRejection(makeResponsePromise, rejection) { + return testCompileStreaming(makeResponsePromise, (modPromise) => { + assert.strictEqual(modPromise.constructor, Promise); + return assert.rejects(modPromise, rejection); + }); +} + +function testCompileStreamingSuccessUsingFetch(responseCallback) { + return testCompileStreamingSuccess(() => testRequest(responseCallback)); +} + +function testCompileStreamingRejectionUsingFetch(responseCallback, rejection) { + return testCompileStreamingRejection(() => testRequest(responseCallback)); +} + +(async () => { + // A non-Response should cause a TypeError. + for (const invalid of [undefined, null, 0, true, 'foo', {}, [], Symbol()]) { + await withPromiseAndResolved(() => Promise.resolve(invalid), (arg) => { + return assert.rejects(() => WebAssembly.compileStreaming(arg), { + name: 'TypeError', + code: 'ERR_INVALID_ARG_TYPE', + message: /^The "source" argument .*$/ + }); + }); + } + + // When given a Promise, any rejection should be propagated as-is. + { + const err = new RangeError('foo'); + await assert.rejects(() => { + return WebAssembly.compileStreaming(Promise.reject(err)); + }, (actualError) => actualError === err); + } + + // A valid WebAssembly file with the correct MIME type. + await testCompileStreamingSuccessUsingFetch((res) => { + res.setHeader('Content-Type', 'application/wasm'); + res.end(simpleWasmBytes); + }); + + // The same valid WebAssembly file with the same MIME type, but using a + // Response whose body is a Buffer instead of calling fetch(). + await testCompileStreamingSuccess(() => { + return Promise.resolve(new Response(simpleWasmBytes, { + status: 200, + headers: { 'Content-Type': 'application/wasm' } + })); + }); + + // The same valid WebAssembly file with the same MIME type, but using a + // Response whose body is a ReadableStream instead of calling fetch(). + await testCompileStreamingSuccess(async () => { + const handle = await fs.open(fixtures.path('simple.wasm')); + const stream = handle.readableWebStream(); + return Promise.resolve(new Response(stream, { + status: 200, + headers: { 'Content-Type': 'application/wasm' } + })); + }); + + // A larger valid WebAssembly file with the correct MIME type that causes the + // client to pass it to the compiler in many separate chunks. For this, we use + // the same WebAssembly file as in the previous test but insert useless custom + // sections into the WebAssembly module to increase the file size without + // changing the relevant contents. + await testCompileStreamingSuccessUsingFetch((res) => { + res.setHeader('Content-Type', 'application/wasm'); + + // Send the WebAssembly magic and version first. + res.write(simpleWasmBytes.slice(0, 8), common.mustCall()); + + // Construct a 4KiB custom section. + const customSection = Buffer.concat([ + Buffer.from([ + 0, // Custom section. + 134, 32, // (134 & 0x7f) + 0x80 * 32 = 6 + 4096 bytes in this section. + 5, // The length of the following section name. + ]), + Buffer.from('?'.repeat(5)), // The section name + Buffer.from('\0'.repeat(4096)), // The actual section data + ]); + + // Now repeatedly send useless custom sections. These have no use for the + // WebAssembly compiler but they are syntactically valid. The client has to + // keep reading the stream until the very end to obtain the relevant + // sections within the module. This adds up to a few hundred kibibytes. + (function next(i) { + if (i < 100) { + while (res.write(customSection)); + res.once('drain', () => next(i + 1)); + } else { + // End the response body with the actual module contents. + res.end(simpleWasmBytes.slice(8)); + } + })(0); + }); + + // A valid WebAssembly file with an empty parameter in the (otherwise valid) + // MIME type. + await testCompileStreamingRejectionUsingFetch((res) => { + res.setHeader('Content-Type', 'application/wasm;'); + res.end(simpleWasmBytes); + }, { + name: 'TypeError', + code: 'ERR_WEBASSEMBLY_RESPONSE', + message: 'WebAssembly response has unsupported MIME type ' + + "'application/wasm;'" + }); + + // A valid WebAssembly file with an invalid MIME type. + await testCompileStreamingRejectionUsingFetch((res) => { + res.setHeader('Content-Type', 'application/octet-stream'); + res.end(simpleWasmBytes); + }, { + name: 'TypeError', + code: 'ERR_WEBASSEMBLY_RESPONSE', + message: 'WebAssembly response has unsupported MIME type ' + + "'application/octet-stream'" + }); + + // HTTP status code indiciating an error. + await testCompileStreamingRejectionUsingFetch((res) => { + res.statusCode = 418; + res.setHeader('Content-Type', 'application/wasm'); + res.end(simpleWasmBytes); + }, { + name: 'TypeError', + code: 'ERR_WEBASSEMBLY_RESPONSE', + message: /^WebAssembly response has status code 418$/ + }); + + // HTTP status code indiciating an error, but using a Response whose body is + // a Buffer instead of calling fetch(). + await testCompileStreamingSuccess(() => { + return Promise.resolve(new Response(simpleWasmBytes, { + status: 200, + headers: { 'Content-Type': 'application/wasm' } + })); + }); + + // Extra bytes after the WebAssembly file. + await testCompileStreamingRejectionUsingFetch((res) => { + res.setHeader('Content-Type', 'application/wasm'); + res.end(Buffer.concat([simpleWasmBytes, Buffer.from('foo')])); + }, { + name: 'CompileError', + message: /^WebAssembly\.compileStreaming\(\): .*$/ + }); + + // Missing bytes at the end of the WebAssembly file. + await testCompileStreamingRejectionUsingFetch((res) => { + res.setHeader('Content-Type', 'application/wasm'); + res.end(simpleWasmBytes.subarray(0, simpleWasmBytes.length - 3)); + }, { + name: 'CompileError', + message: /^WebAssembly\.compileStreaming\(\): .*$/ + }); + + // Incomplete HTTP response body. The TypeError might come as a surprise, but + // it originates from within fetch(). + await testCompileStreamingRejectionUsingFetch((res) => { + res.setHeader('Content-Length', simpleWasmBytes.length); + res.setHeader('Content-Type', 'application/wasm'); + res.write(simpleWasmBytes.slice(0, 5), common.mustSucceed(() => { + res.destroy(); + })); + }, { + name: 'TypeError', + message: /terminated/ + }); +})().then(common.mustCall());