diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 1bef12dc1b..2b38cd9ee7 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -56,7 +56,8 @@ pub const Flags = packed struct(u8) { has_stdin_destructor_called: bool = false, finalized: bool = false, deref_on_stdin_destroyed: bool = false, - _: u3 = 0, + is_stdin_a_readable_stream: bool = false, + _: u2 = 0, }; pub const SignalCode = bun.SignalCode; @@ -1431,7 +1432,7 @@ const Writable = union(enum) { // https://github.com/oven-sh/bun/pull/14092 bun.debugAssert(!subprocess.flags.deref_on_stdin_destroyed); const debug_ref_count = if (Environment.isDebug) subprocess.ref_count else 0; - pipe.onAttachedProcessExit(); + pipe.onAttachedProcessExit(&subprocess.process.status); if (Environment.isDebug) { bun.debugAssert(subprocess.ref_count.active_counts == debug_ref_count.active_counts); } @@ -1557,7 +1558,7 @@ pub fn onProcessExit(this: *Subprocess, process: *Process, status: bun.spawn.Sta jsc_vm.onSubprocessExit(process); - var stdin: ?*JSC.WebCore.FileSink = this.weak_file_sink_stdin_ptr; + var stdin: ?*JSC.WebCore.FileSink = if (this.stdin == .pipe) this.stdin.pipe else this.weak_file_sink_stdin_ptr; var existing_stdin_value = JSC.JSValue.zero; if (this_jsvalue != .zero) { if (JSC.Codegen.JSSubprocess.stdinGetCached(this_jsvalue)) |existing_value| { @@ -1567,7 +1568,9 @@ pub fn onProcessExit(this: *Subprocess, process: *Process, status: bun.spawn.Sta stdin = @alignCast(@ptrCast(JSC.WebCore.FileSink.JSSink.fromJS(existing_value))); } - existing_stdin_value = existing_value; + if (!this.flags.is_stdin_a_readable_stream) { + existing_stdin_value = existing_value; + } } } } @@ -1605,8 +1608,9 @@ pub fn onProcessExit(this: *Subprocess, process: *Process, status: bun.spawn.Sta if (stdin) |pipe| { this.weak_file_sink_stdin_ptr = null; this.flags.has_stdin_destructor_called = true; + // It is okay if it does call deref() here, as in that case it was truly ref'd. - pipe.onAttachedProcessExit(); + pipe.onAttachedProcessExit(&status); } var did_update_has_pending_activity = false; @@ -2437,6 +2441,7 @@ pub fn spawnMaybeSync( subprocess.process.setExitHandler(subprocess); promise_for_stream.ensureStillAlive(); + subprocess.flags.is_stdin_a_readable_stream = promise_for_stream != .zero; if (promise_for_stream != .zero and !globalThis.hasException()) { if (promise_for_stream.toError()) |err| { diff --git a/src/bun.js/webcore/FileSink.zig b/src/bun.js/webcore/FileSink.zig index 5174af3257..4c96bc743a 100644 --- a/src/bun.js/webcore/FileSink.zig +++ b/src/bun.js/webcore/FileSink.zig @@ -75,20 +75,23 @@ comptime { @export(&Bun__ForceFileSinkToBeSynchronousForProcessObjectStdio, .{ .name = "Bun__ForceFileSinkToBeSynchronousForProcessObjectStdio" }); } -pub fn onAttachedProcessExit(this: *FileSink) void { +pub fn onAttachedProcessExit(this: *FileSink, status: *const bun.spawn.Status) void { log("onAttachedProcessExit()", .{}); this.done = true; - this.writer.close(); - - this.pending.result = .{ .err = .fromCode(.PIPE, .write) }; - if (this.readable_stream.has()) { if (this.event_loop_handle.globalObject()) |global| { - if (this.readable_stream.get(global)) |stream| { - stream.cancel(global); + if (this.readable_stream.get(global)) |*stream| { + if (!status.isOK()) { + stream.cancel(global); + } else { + stream.done(global); + } } } } + this.writer.close(); + + this.pending.result = .{ .err = .fromCode(.PIPE, .write) }; this.runPending(); @@ -192,14 +195,15 @@ pub fn onReady(this: *FileSink) void { pub fn onClose(this: *FileSink) void { log("onClose()", .{}); - this.signal.close(null); if (this.readable_stream.has()) { if (this.event_loop_handle.globalObject()) |global| { if (this.readable_stream.get(global)) |stream| { - stream.cancel(global); + stream.done(global); } } } + + this.signal.close(null); } pub fn createWithPipe( diff --git a/src/shell/subproc.zig b/src/shell/subproc.zig index 5f796bcb9a..31d0bd575d 100644 --- a/src/shell/subproc.zig +++ b/src/shell/subproc.zig @@ -251,7 +251,7 @@ pub const ShellSubprocess = struct { .pipe => |pipe| { this.* = .{ .ignore = {} }; if (subprocess.process.hasExited() and !subprocess.flags.has_stdin_destructor_called) { - pipe.onAttachedProcessExit(); + pipe.onAttachedProcessExit(&subprocess.process.status); return pipe.toJS(globalThis); } else { subprocess.flags.has_stdin_destructor_called = false; diff --git a/test/js/bun/spawn/spawn-stdin-readable-stream.test.ts b/test/js/bun/spawn/spawn-stdin-readable-stream.test.ts index 9fef746dde..2d277352b1 100644 --- a/test/js/bun/spawn/spawn-stdin-readable-stream.test.ts +++ b/test/js/bun/spawn/spawn-stdin-readable-stream.test.ts @@ -1,5 +1,6 @@ import { spawn } from "bun"; -import { describe, expect, test } from "bun:test"; +import { heapStats } from "bun:jsc"; +import { describe, expect, mock, test } from "bun:test"; import { bunEnv, bunExe, expectMaxObjectTypeCount, getMaxFD } from "harness"; describe("spawn stdin ReadableStream", () => { @@ -11,7 +12,7 @@ describe("spawn stdin ReadableStream", () => { }, }); - const proc = spawn({ + await using proc = spawn({ cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, stdout: "pipe", @@ -34,7 +35,7 @@ describe("spawn stdin ReadableStream", () => { }, }); - const proc = spawn({ + await using proc = spawn({ cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, stdout: "pipe", @@ -57,7 +58,7 @@ describe("spawn stdin ReadableStream", () => { }, }); - const proc = spawn({ + await using proc = spawn({ cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, stdout: "pipe", @@ -81,7 +82,7 @@ describe("spawn stdin ReadableStream", () => { }, }); - const proc = spawn({ + await using proc = spawn({ cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, stdout: "pipe", @@ -106,7 +107,7 @@ describe("spawn stdin ReadableStream", () => { }, }); - const proc = spawn({ + await using proc = spawn({ cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, stdout: "pipe", @@ -132,7 +133,7 @@ describe("spawn stdin ReadableStream", () => { }, }); - const proc = spawn({ + await using proc = spawn({ cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, stdout: "pipe", @@ -153,7 +154,7 @@ describe("spawn stdin ReadableStream", () => { }, }); - const proc = spawn({ + await using proc = spawn({ cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, stdout: "pipe", @@ -181,7 +182,7 @@ describe("spawn stdin ReadableStream", () => { }, }); - const proc = spawn({ + await using proc = spawn({ cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, stdout: "pipe", @@ -210,7 +211,7 @@ describe("spawn stdin ReadableStream", () => { }, }); - const proc = spawn({ + await using proc = spawn({ cmd: [ bunExe(), "-e", @@ -254,7 +255,7 @@ describe("spawn stdin ReadableStream", () => { }, }); - const proc = spawn({ + await using proc = spawn({ cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, stdout: "pipe", @@ -280,7 +281,7 @@ describe("spawn stdin ReadableStream", () => { }, }); - const proc = spawn({ + await using proc = spawn({ cmd: [bunExe(), "-e", "process.exit(0)"], // exits immediately stdin: stream, env: bunEnv, @@ -304,7 +305,7 @@ describe("spawn stdin ReadableStream", () => { }, }); - const proc = spawn({ + await using proc = spawn({ cmd: [bunExe(), "-e", "process.exit(1)"], stdin: stream, env: bunEnv, @@ -328,7 +329,7 @@ describe("spawn stdin ReadableStream", () => { reader.releaseLock(); expect(() => { - spawn({ + const proc = spawn({ cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, env: bunEnv, @@ -336,19 +337,22 @@ describe("spawn stdin ReadableStream", () => { }).toThrow("'stdin' ReadableStream has already been used"); }); - test("ReadableStream with abort signal", async () => { + test("ReadableStream with abort signal calls cancel", async () => { const controller = new AbortController(); + const cancel = mock(); const stream = new ReadableStream({ start(controller) { controller.enqueue("data before abort\n"); }, - pull(controller) { + async pull(controller) { // Keep the stream open + // but don't block the event loop. + await Bun.sleep(1); controller.enqueue("more data\n"); }, + cancel, }); - - const proc = spawn({ + await using proc = spawn({ cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, stdout: "pipe", @@ -357,7 +361,7 @@ describe("spawn stdin ReadableStream", () => { }); // Give it some time to start - await Bun.sleep(50); + await Bun.sleep(10); // Abort the process controller.abort(); @@ -370,6 +374,7 @@ describe("spawn stdin ReadableStream", () => { // The process should have been killed expect(proc.killed).toBe(true); + expect(cancel).toHaveBeenCalledTimes(1); }); test("ReadableStream with backpressure", async () => { @@ -389,7 +394,7 @@ describe("spawn stdin ReadableStream", () => { }, }); - const proc = spawn({ + await using proc = spawn({ cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, stdout: "pipe", @@ -421,14 +426,14 @@ describe("spawn stdin ReadableStream", () => { }, }); - const proc1 = spawn({ + await using proc1 = spawn({ cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream1, stdout: "pipe", env: bunEnv, }); - const proc2 = spawn({ + await using proc2 = spawn({ cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream2, stdout: "pipe", @@ -443,38 +448,6 @@ describe("spawn stdin ReadableStream", () => { expect(await proc2.exited).toBe(0); }); - test("ReadableStream file descriptor cleanup", async () => { - const maxFD = getMaxFD(); - const iterations = 10; - - for (let i = 0; i < iterations; i++) { - const stream = new ReadableStream({ - start(controller) { - controller.enqueue(`iteration ${i}\n`); - controller.close(); - }, - }); - - const proc = spawn({ - cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], - stdin: stream, - stdout: "pipe", - }); - - const text = await new Response(proc.stdout).text(); - expect(text).toBe(`iteration ${i}\n`); - expect(await proc.exited).toBe(0); - } - - // Force garbage collection - Bun.gc(true); - await Bun.sleep(50); - - // Check that we didn't leak file descriptors - const newMaxFD = getMaxFD(); - expect(newMaxFD).toBeLessThanOrEqual(maxFD + 10); // Allow some variance - }); - test("ReadableStream with empty stream", async () => { const stream = new ReadableStream({ start(controller) { @@ -483,7 +456,7 @@ describe("spawn stdin ReadableStream", () => { }, }); - const proc = spawn({ + await using proc = spawn({ cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, stdout: "pipe", @@ -503,7 +476,7 @@ describe("spawn stdin ReadableStream", () => { }, }); - const proc = spawn({ + await using proc = spawn({ cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream, stdout: "pipe", @@ -534,7 +507,7 @@ describe("spawn stdin ReadableStream", () => { const transformedStream = originalStream.pipeThrough(upperCaseTransform); - const proc = spawn({ + await using proc = spawn({ cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: transformedStream, stdout: "pipe", @@ -557,7 +530,7 @@ describe("spawn stdin ReadableStream", () => { const [stream1, stream2] = originalStream.tee(); // Use the first branch for the process - const proc = spawn({ + await using proc = spawn({ cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], stdin: stream1, stdout: "pipe", @@ -574,29 +547,37 @@ describe("spawn stdin ReadableStream", () => { }); test("ReadableStream object type count", async () => { - const iterations = 5; + const iterations = 50; - for (let i = 0; i < iterations; i++) { - const stream = new ReadableStream({ - start(controller) { - controller.enqueue(`iteration ${i}`); - controller.close(); - }, - }); + async function main() { + async function iterate(i: number) { + const stream = new ReadableStream({ + async pull(controller) { + await Bun.sleep(0); + controller.enqueue(`iteration ${i}`); + controller.close(); + }, + }); - const proc = spawn({ - cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], - stdin: stream, - stdout: "pipe", - }); + await using proc = spawn({ + cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], + stdin: stream, + stdout: "pipe", + env: bunEnv, + }); - await new Response(proc.stdout).text(); - await proc.exited; + await Promise.all([new Response(proc.stdout).text(), proc.exited]); + } + + const promises = Array.from({ length: iterations }, (_, i) => iterate(i)); + await Promise.all(promises); } - // Force cleanup + await main(); + + await Bun.sleep(1); Bun.gc(true); - await Bun.sleep(100); + await Bun.sleep(1); // Check that we're not leaking objects await expectMaxObjectTypeCount(expect, "ReadableStream", 10);