From 5bcc2a22cf1ac07ca5d1c1abc53a51b8dc6100df Mon Sep 17 00:00:00 2001 From: Ben Grant Date: Wed, 28 May 2025 14:39:10 -0700 Subject: [PATCH] Use process stream in worker thread --- src/bun.js/ConsoleObject.zig | 10 ++-- src/bun.js/bindings/BunProcess.cpp | 24 ++++++++++ src/bun.js/bindings/JSGlobalObject.zig | 30 ++++++++++++ src/bun.js/bindings/webcore/Worker.cpp | 6 --- src/bun.js/web_worker.zig | 22 --------- .../worker_threads/worker_threads.test.ts | 46 +++++++++++++++++++ 6 files changed, 105 insertions(+), 33 deletions(-) diff --git a/src/bun.js/ConsoleObject.zig b/src/bun.js/ConsoleObject.zig index 20d6d7d429..0cbc015cb3 100644 --- a/src/bun.js/ConsoleObject.zig +++ b/src/bun.js/ConsoleObject.zig @@ -152,12 +152,12 @@ fn messageWithTypeAndLevel_( else Output.enable_ansi_colors_stdout; - const worker = if (global.bunVM().worker) |w| - if (w.kind == .node) w else null + const use_process_stdio = if (global.bunVM().worker) |w| + w.kind == .node else - null; - var worker_writer = if (worker) |w| - w.stdioWriter(if (level == .Warning or level == .Error) .stderr else .stdout) + false; + var worker_writer = if (use_process_stdio) + global.processStdioWriter(if (level == .Warning or level == .Error) .stderr else .stdout) else null; diff --git a/src/bun.js/bindings/BunProcess.cpp b/src/bun.js/bindings/BunProcess.cpp index 4bcffacd40..0cc0bec583 100644 --- a/src/bun.js/bindings/BunProcess.cpp +++ b/src/bun.js/bindings/BunProcess.cpp @@ -3593,6 +3593,30 @@ extern "C" void Process__emitErrorEvent(Zig::GlobalObject* global, EncodedJSValu } } +extern "C" void Process__writeToStdio(JSGlobalObject* jsGlobalObject, int fd, const uint8_t* bytes, size_t len) +{ + ASSERT(fd == 1 || fd == 2); + auto* globalObject = defaultGlobalObject(jsGlobalObject); + auto& vm = JSC::getVM(globalObject); + auto scope = DECLARE_THROW_SCOPE(vm); + + auto* process = globalObject->processObject(); + auto stream = process->get(globalObject, Identifier::fromString(vm, fd == 1 ? "stdout"_s : "stderr"_s)); + RETURN_IF_EXCEPTION(scope, ); + auto writeFn = stream.get(globalObject, Identifier::fromString(vm, "write"_s)); + RETURN_IF_EXCEPTION(scope, ); + auto callData = JSC::getCallData(writeFn); + if (callData.type == CallData::Type::None) { + scope.throwException(globalObject, createNotAFunctionError(globalObject, writeFn)); + return; + } + MarkedArgumentBuffer args; + auto* buffer = WebCore::createBuffer(globalObject, { bytes, len }); + args.append(buffer); + JSC::call(globalObject, writeFn, callData, stream, args); + RETURN_IF_EXCEPTION(scope, ); +} + /* Source for Process.lut.h @begin processObjectTable _debugEnd Process_stubEmptyFunction Function 0 diff --git a/src/bun.js/bindings/JSGlobalObject.zig b/src/bun.js/bindings/JSGlobalObject.zig index 5555d979d6..f0bb3013b3 100644 --- a/src/bun.js/bindings/JSGlobalObject.zig +++ b/src/bun.js/bindings/JSGlobalObject.zig @@ -882,6 +882,36 @@ pub const JSGlobalObject = opaque { return @enumFromInt(ScriptExecutionContextIdentifier__forGlobalObject(global)); } + pub const StdioWriterFd = enum(c_int) { + stdout = 1, + stderr = 2, + }; + extern fn Process__writeToStdio(global: *JSGlobalObject, fd: StdioWriterFd, bytes: [*]const u8, len: usize) void; + + const ProcessStdioWriterContext = struct { + global: *JSGlobalObject, + fd: StdioWriterFd, + + fn write(this: ProcessStdioWriterContext, bytes: []const u8) bun.JSError!usize { + Process__writeToStdio(this.global, this.fd, bytes.ptr, bytes.len); + if (this.global.hasException()) { + return error.JSError; + } + return bytes.len; + } + }; + + pub const ProcessStdioWriter = std.io.Writer( + ProcessStdioWriterContext, + bun.JSError, + ProcessStdioWriterContext.write, + ); + + /// Get a writer which calls process.stdout.write or process.stderr.write + pub fn processStdioWriter(global: *JSGlobalObject, fd: StdioWriterFd) ProcessStdioWriter { + return .{ .context = .{ .global = global, .fd = fd } }; + } + pub const Extern = [_][]const u8{ "create", "getModuleRegistryMap", "resetModuleRegistryMap" }; comptime { diff --git a/src/bun.js/bindings/webcore/Worker.cpp b/src/bun.js/bindings/webcore/Worker.cpp index 5262553aa9..870f157ff4 100644 --- a/src/bun.js/bindings/webcore/Worker.cpp +++ b/src/bun.js/bindings/webcore/Worker.cpp @@ -566,12 +566,6 @@ void Worker::pushStdioToParent(PushStdioFd fd, std::span bytes) }); } -extern "C" void WebWorker__pushStdioToParent(Worker* worker, int fd, const uint8_t* bytes, size_t len) -{ - ASSERT(fd == 1 || fd == 2); - worker->pushStdioToParent(static_cast(fd), { bytes, len }); -} - JSC_DEFINE_HOST_FUNCTION(jsPushStdioToParent, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame)) { auto* globalObject = defaultGlobalObject(lexicalGlobalObject); diff --git a/src/bun.js/web_worker.zig b/src/bun.js/web_worker.zig index 29559cd9e8..ba2e5f9632 100644 --- a/src/bun.js/web_worker.zig +++ b/src/bun.js/web_worker.zig @@ -64,16 +64,10 @@ pub const Status = enum(u8) { terminated, }; -const PushStdioFd = enum(c_int) { - stdout = 1, - stderr = 2, -}; - extern fn WebWorker__dispatchExit(?*jsc.JSGlobalObject, *CppWorker, i32) void; extern fn WebWorker__dispatchOnline(cpp_worker: *CppWorker, *jsc.JSGlobalObject) void; extern fn WebWorker__fireEarlyMessages(cpp_worker: *CppWorker, *jsc.JSGlobalObject) void; extern fn WebWorker__dispatchError(*jsc.JSGlobalObject, *CppWorker, bun.String, JSValue) void; -extern fn WebWorker__pushStdioToParent(cpp_worker: *CppWorker, fd: PushStdioFd, bytes: [*]const u8, len: usize) void; pub fn hasRequestedTerminate(this: *const WebWorker) bool { return this.requested_terminate.load(.monotonic); @@ -627,22 +621,6 @@ pub fn exitAndDeinit(this: *WebWorker) noreturn { bun.exitThread(); } -const WriterContext = struct { - cpp_worker: *CppWorker, - fd: PushStdioFd, - - fn write(this: WriterContext, bytes: []const u8) error{}!usize { - WebWorker__pushStdioToParent(this.cpp_worker, this.fd, bytes.ptr, bytes.len); - return bytes.len; - } -}; - -pub const Writer = std.io.Writer(WriterContext, error{}, WriterContext.write); - -pub fn stdioWriter(this: *WebWorker, fd: PushStdioFd) Writer { - return .{ .context = .{ .cpp_worker = this.cpp_worker, .fd = fd } }; -} - comptime { @export(&create, .{ .name = "WebWorker__create" }); @export(¬ifyNeedTermination, .{ .name = "WebWorker__notifyNeedTermination" }); diff --git a/test/js/node/worker_threads/worker_threads.test.ts b/test/js/node/worker_threads/worker_threads.test.ts index a575b00a0b..428a5b36c6 100644 --- a/test/js/node/worker_threads/worker_threads.test.ts +++ b/test/js/node/worker_threads/worker_threads.test.ts @@ -657,6 +657,52 @@ error warn `); }); + + it("handles exceptions", async () => { + const cases = [ + { + code: /* js */ `process.stdout.write = () => { throw new Error("write()"); }; console.log("hello");`, + expectedException: { + name: "Error", + message: "write()", + }, + }, + { + code: /* js */ `process.stdout.write = 6; console.log("hello");`, + expectedException: { + name: "TypeError", + message: expect.stringMatching(/is not a function.*is 6/), + }, + }, + { + code: /* js */ `Object.defineProperty(process.stdout, "write", { get() { throw new Error("write getter"); } }); console.log("hello");`, + expectedException: { + name: "Error", + message: "write getter", + }, + }, + { + code: /* js */ `Object.defineProperty(process, "stdout", { get() { throw new Error("stdout getter"); } }); console.log("hello");`, + expectedException: { + name: "Error", + message: "stdout getter", + }, + }, + ]; + + for (const { code, expectedException } of cases) { + const worker = new Worker(code, { eval: true, stdout: true }); + const stdoutPromise = readToEnd(worker.stdout); + const [exception] = await once(worker, "error"); + expect(exception).toMatchObject(expectedException); + expect(await stdoutPromise).toBe(""); + } + }); + + it("works if the entire process object is overridden", async () => { + const worker = new Worker(/* js */ `process = 5; console.log("hello");`, { eval: true, stdout: true }); + expect(await readToEnd(worker.stdout)).toBe("hello\n"); + }); }); });