Use process stream in worker thread

This commit is contained in:
Ben Grant
2025-05-28 14:39:10 -07:00
parent aaa308a75d
commit 5bcc2a22cf
6 changed files with 105 additions and 33 deletions

View File

@@ -152,12 +152,12 @@ fn messageWithTypeAndLevel_(
else
Output.enable_ansi_colors_stdout;
const worker = if (global.bunVM().worker) |w|
if (w.kind == .node) w else null
const use_process_stdio = if (global.bunVM().worker) |w|
w.kind == .node
else
null;
var worker_writer = if (worker) |w|
w.stdioWriter(if (level == .Warning or level == .Error) .stderr else .stdout)
false;
var worker_writer = if (use_process_stdio)
global.processStdioWriter(if (level == .Warning or level == .Error) .stderr else .stdout)
else
null;

View File

@@ -3593,6 +3593,30 @@ extern "C" void Process__emitErrorEvent(Zig::GlobalObject* global, EncodedJSValu
}
}
extern "C" void Process__writeToStdio(JSGlobalObject* jsGlobalObject, int fd, const uint8_t* bytes, size_t len)
{
ASSERT(fd == 1 || fd == 2);
auto* globalObject = defaultGlobalObject(jsGlobalObject);
auto& vm = JSC::getVM(globalObject);
auto scope = DECLARE_THROW_SCOPE(vm);
auto* process = globalObject->processObject();
auto stream = process->get(globalObject, Identifier::fromString(vm, fd == 1 ? "stdout"_s : "stderr"_s));
RETURN_IF_EXCEPTION(scope, );
auto writeFn = stream.get(globalObject, Identifier::fromString(vm, "write"_s));
RETURN_IF_EXCEPTION(scope, );
auto callData = JSC::getCallData(writeFn);
if (callData.type == CallData::Type::None) {
scope.throwException(globalObject, createNotAFunctionError(globalObject, writeFn));
return;
}
MarkedArgumentBuffer args;
auto* buffer = WebCore::createBuffer(globalObject, { bytes, len });
args.append(buffer);
JSC::call(globalObject, writeFn, callData, stream, args);
RETURN_IF_EXCEPTION(scope, );
}
/* Source for Process.lut.h
@begin processObjectTable
_debugEnd Process_stubEmptyFunction Function 0

View File

@@ -882,6 +882,36 @@ pub const JSGlobalObject = opaque {
return @enumFromInt(ScriptExecutionContextIdentifier__forGlobalObject(global));
}
pub const StdioWriterFd = enum(c_int) {
stdout = 1,
stderr = 2,
};
extern fn Process__writeToStdio(global: *JSGlobalObject, fd: StdioWriterFd, bytes: [*]const u8, len: usize) void;
const ProcessStdioWriterContext = struct {
global: *JSGlobalObject,
fd: StdioWriterFd,
fn write(this: ProcessStdioWriterContext, bytes: []const u8) bun.JSError!usize {
Process__writeToStdio(this.global, this.fd, bytes.ptr, bytes.len);
if (this.global.hasException()) {
return error.JSError;
}
return bytes.len;
}
};
pub const ProcessStdioWriter = std.io.Writer(
ProcessStdioWriterContext,
bun.JSError,
ProcessStdioWriterContext.write,
);
/// Get a writer which calls process.stdout.write or process.stderr.write
pub fn processStdioWriter(global: *JSGlobalObject, fd: StdioWriterFd) ProcessStdioWriter {
return .{ .context = .{ .global = global, .fd = fd } };
}
pub const Extern = [_][]const u8{ "create", "getModuleRegistryMap", "resetModuleRegistryMap" };
comptime {

View File

@@ -566,12 +566,6 @@ void Worker::pushStdioToParent(PushStdioFd fd, std::span<const uint8_t> bytes)
});
}
extern "C" void WebWorker__pushStdioToParent(Worker* worker, int fd, const uint8_t* bytes, size_t len)
{
ASSERT(fd == 1 || fd == 2);
worker->pushStdioToParent(static_cast<Worker::PushStdioFd>(fd), { bytes, len });
}
JSC_DEFINE_HOST_FUNCTION(jsPushStdioToParent, (JSGlobalObject * lexicalGlobalObject, CallFrame* callFrame))
{
auto* globalObject = defaultGlobalObject(lexicalGlobalObject);

View File

@@ -64,16 +64,10 @@ pub const Status = enum(u8) {
terminated,
};
const PushStdioFd = enum(c_int) {
stdout = 1,
stderr = 2,
};
extern fn WebWorker__dispatchExit(?*jsc.JSGlobalObject, *CppWorker, i32) void;
extern fn WebWorker__dispatchOnline(cpp_worker: *CppWorker, *jsc.JSGlobalObject) void;
extern fn WebWorker__fireEarlyMessages(cpp_worker: *CppWorker, *jsc.JSGlobalObject) void;
extern fn WebWorker__dispatchError(*jsc.JSGlobalObject, *CppWorker, bun.String, JSValue) void;
extern fn WebWorker__pushStdioToParent(cpp_worker: *CppWorker, fd: PushStdioFd, bytes: [*]const u8, len: usize) void;
pub fn hasRequestedTerminate(this: *const WebWorker) bool {
return this.requested_terminate.load(.monotonic);
@@ -627,22 +621,6 @@ pub fn exitAndDeinit(this: *WebWorker) noreturn {
bun.exitThread();
}
const WriterContext = struct {
cpp_worker: *CppWorker,
fd: PushStdioFd,
fn write(this: WriterContext, bytes: []const u8) error{}!usize {
WebWorker__pushStdioToParent(this.cpp_worker, this.fd, bytes.ptr, bytes.len);
return bytes.len;
}
};
pub const Writer = std.io.Writer(WriterContext, error{}, WriterContext.write);
pub fn stdioWriter(this: *WebWorker, fd: PushStdioFd) Writer {
return .{ .context = .{ .cpp_worker = this.cpp_worker, .fd = fd } };
}
comptime {
@export(&create, .{ .name = "WebWorker__create" });
@export(&notifyNeedTermination, .{ .name = "WebWorker__notifyNeedTermination" });

View File

@@ -657,6 +657,52 @@ error
warn
`);
});
it("handles exceptions", async () => {
const cases = [
{
code: /* js */ `process.stdout.write = () => { throw new Error("write()"); }; console.log("hello");`,
expectedException: {
name: "Error",
message: "write()",
},
},
{
code: /* js */ `process.stdout.write = 6; console.log("hello");`,
expectedException: {
name: "TypeError",
message: expect.stringMatching(/is not a function.*is 6/),
},
},
{
code: /* js */ `Object.defineProperty(process.stdout, "write", { get() { throw new Error("write getter"); } }); console.log("hello");`,
expectedException: {
name: "Error",
message: "write getter",
},
},
{
code: /* js */ `Object.defineProperty(process, "stdout", { get() { throw new Error("stdout getter"); } }); console.log("hello");`,
expectedException: {
name: "Error",
message: "stdout getter",
},
},
];
for (const { code, expectedException } of cases) {
const worker = new Worker(code, { eval: true, stdout: true });
const stdoutPromise = readToEnd(worker.stdout);
const [exception] = await once(worker, "error");
expect(exception).toMatchObject(expectedException);
expect(await stdoutPromise).toBe("");
}
});
it("works if the entire process object is overridden", async () => {
const worker = new Worker(/* js */ `process = 5; console.log("hello");`, { eval: true, stdout: true });
expect(await readToEnd(worker.stdout)).toBe("hello\n");
});
});
});