This commit is contained in:
Jarred Sumner
2025-06-23 20:49:06 -07:00
parent e78ef3c8a3
commit bc630b46d5
4 changed files with 80 additions and 90 deletions

View File

@@ -56,7 +56,8 @@ pub const Flags = packed struct(u8) {
has_stdin_destructor_called: bool = false,
finalized: bool = false,
deref_on_stdin_destroyed: bool = false,
_: u3 = 0,
is_stdin_a_readable_stream: bool = false,
_: u2 = 0,
};
pub const SignalCode = bun.SignalCode;
@@ -1431,7 +1432,7 @@ const Writable = union(enum) {
// https://github.com/oven-sh/bun/pull/14092
bun.debugAssert(!subprocess.flags.deref_on_stdin_destroyed);
const debug_ref_count = if (Environment.isDebug) subprocess.ref_count else 0;
pipe.onAttachedProcessExit();
pipe.onAttachedProcessExit(&subprocess.process.status);
if (Environment.isDebug) {
bun.debugAssert(subprocess.ref_count.active_counts == debug_ref_count.active_counts);
}
@@ -1557,7 +1558,7 @@ pub fn onProcessExit(this: *Subprocess, process: *Process, status: bun.spawn.Sta
jsc_vm.onSubprocessExit(process);
var stdin: ?*JSC.WebCore.FileSink = this.weak_file_sink_stdin_ptr;
var stdin: ?*JSC.WebCore.FileSink = if (this.stdin == .pipe) this.stdin.pipe else this.weak_file_sink_stdin_ptr;
var existing_stdin_value = JSC.JSValue.zero;
if (this_jsvalue != .zero) {
if (JSC.Codegen.JSSubprocess.stdinGetCached(this_jsvalue)) |existing_value| {
@@ -1567,7 +1568,9 @@ pub fn onProcessExit(this: *Subprocess, process: *Process, status: bun.spawn.Sta
stdin = @alignCast(@ptrCast(JSC.WebCore.FileSink.JSSink.fromJS(existing_value)));
}
existing_stdin_value = existing_value;
if (!this.flags.is_stdin_a_readable_stream) {
existing_stdin_value = existing_value;
}
}
}
}
@@ -1605,8 +1608,9 @@ pub fn onProcessExit(this: *Subprocess, process: *Process, status: bun.spawn.Sta
if (stdin) |pipe| {
this.weak_file_sink_stdin_ptr = null;
this.flags.has_stdin_destructor_called = true;
// It is okay if it does call deref() here, as in that case it was truly ref'd.
pipe.onAttachedProcessExit();
pipe.onAttachedProcessExit(&status);
}
var did_update_has_pending_activity = false;
@@ -2437,6 +2441,7 @@ pub fn spawnMaybeSync(
subprocess.process.setExitHandler(subprocess);
promise_for_stream.ensureStillAlive();
subprocess.flags.is_stdin_a_readable_stream = promise_for_stream != .zero;
if (promise_for_stream != .zero and !globalThis.hasException()) {
if (promise_for_stream.toError()) |err| {

View File

@@ -75,20 +75,23 @@ comptime {
@export(&Bun__ForceFileSinkToBeSynchronousForProcessObjectStdio, .{ .name = "Bun__ForceFileSinkToBeSynchronousForProcessObjectStdio" });
}
pub fn onAttachedProcessExit(this: *FileSink) void {
pub fn onAttachedProcessExit(this: *FileSink, status: *const bun.spawn.Status) void {
log("onAttachedProcessExit()", .{});
this.done = true;
this.writer.close();
this.pending.result = .{ .err = .fromCode(.PIPE, .write) };
if (this.readable_stream.has()) {
if (this.event_loop_handle.globalObject()) |global| {
if (this.readable_stream.get(global)) |stream| {
stream.cancel(global);
if (this.readable_stream.get(global)) |*stream| {
if (!status.isOK()) {
stream.cancel(global);
} else {
stream.done(global);
}
}
}
}
this.writer.close();
this.pending.result = .{ .err = .fromCode(.PIPE, .write) };
this.runPending();
@@ -192,14 +195,15 @@ pub fn onReady(this: *FileSink) void {
pub fn onClose(this: *FileSink) void {
log("onClose()", .{});
this.signal.close(null);
if (this.readable_stream.has()) {
if (this.event_loop_handle.globalObject()) |global| {
if (this.readable_stream.get(global)) |stream| {
stream.cancel(global);
stream.done(global);
}
}
}
this.signal.close(null);
}
pub fn createWithPipe(

View File

@@ -251,7 +251,7 @@ pub const ShellSubprocess = struct {
.pipe => |pipe| {
this.* = .{ .ignore = {} };
if (subprocess.process.hasExited() and !subprocess.flags.has_stdin_destructor_called) {
pipe.onAttachedProcessExit();
pipe.onAttachedProcessExit(&subprocess.process.status);
return pipe.toJS(globalThis);
} else {
subprocess.flags.has_stdin_destructor_called = false;

View File

@@ -1,5 +1,6 @@
import { spawn } from "bun";
import { describe, expect, test } from "bun:test";
import { heapStats } from "bun:jsc";
import { describe, expect, mock, test } from "bun:test";
import { bunEnv, bunExe, expectMaxObjectTypeCount, getMaxFD } from "harness";
describe("spawn stdin ReadableStream", () => {
@@ -11,7 +12,7 @@ describe("spawn stdin ReadableStream", () => {
},
});
const proc = spawn({
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
@@ -34,7 +35,7 @@ describe("spawn stdin ReadableStream", () => {
},
});
const proc = spawn({
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
@@ -57,7 +58,7 @@ describe("spawn stdin ReadableStream", () => {
},
});
const proc = spawn({
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
@@ -81,7 +82,7 @@ describe("spawn stdin ReadableStream", () => {
},
});
const proc = spawn({
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
@@ -106,7 +107,7 @@ describe("spawn stdin ReadableStream", () => {
},
});
const proc = spawn({
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
@@ -132,7 +133,7 @@ describe("spawn stdin ReadableStream", () => {
},
});
const proc = spawn({
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
@@ -153,7 +154,7 @@ describe("spawn stdin ReadableStream", () => {
},
});
const proc = spawn({
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
@@ -181,7 +182,7 @@ describe("spawn stdin ReadableStream", () => {
},
});
const proc = spawn({
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
@@ -210,7 +211,7 @@ describe("spawn stdin ReadableStream", () => {
},
});
const proc = spawn({
await using proc = spawn({
cmd: [
bunExe(),
"-e",
@@ -254,7 +255,7 @@ describe("spawn stdin ReadableStream", () => {
},
});
const proc = spawn({
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
@@ -280,7 +281,7 @@ describe("spawn stdin ReadableStream", () => {
},
});
const proc = spawn({
await using proc = spawn({
cmd: [bunExe(), "-e", "process.exit(0)"], // exits immediately
stdin: stream,
env: bunEnv,
@@ -304,7 +305,7 @@ describe("spawn stdin ReadableStream", () => {
},
});
const proc = spawn({
await using proc = spawn({
cmd: [bunExe(), "-e", "process.exit(1)"],
stdin: stream,
env: bunEnv,
@@ -328,7 +329,7 @@ describe("spawn stdin ReadableStream", () => {
reader.releaseLock();
expect(() => {
spawn({
const proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
env: bunEnv,
@@ -336,19 +337,22 @@ describe("spawn stdin ReadableStream", () => {
}).toThrow("'stdin' ReadableStream has already been used");
});
test("ReadableStream with abort signal", async () => {
test("ReadableStream with abort signal calls cancel", async () => {
const controller = new AbortController();
const cancel = mock();
const stream = new ReadableStream({
start(controller) {
controller.enqueue("data before abort\n");
},
pull(controller) {
async pull(controller) {
// Keep the stream open
// but don't block the event loop.
await Bun.sleep(1);
controller.enqueue("more data\n");
},
cancel,
});
const proc = spawn({
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
@@ -357,7 +361,7 @@ describe("spawn stdin ReadableStream", () => {
});
// Give it some time to start
await Bun.sleep(50);
await Bun.sleep(10);
// Abort the process
controller.abort();
@@ -370,6 +374,7 @@ describe("spawn stdin ReadableStream", () => {
// The process should have been killed
expect(proc.killed).toBe(true);
expect(cancel).toHaveBeenCalledTimes(1);
});
test("ReadableStream with backpressure", async () => {
@@ -389,7 +394,7 @@ describe("spawn stdin ReadableStream", () => {
},
});
const proc = spawn({
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
@@ -421,14 +426,14 @@ describe("spawn stdin ReadableStream", () => {
},
});
const proc1 = spawn({
await using proc1 = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream1,
stdout: "pipe",
env: bunEnv,
});
const proc2 = spawn({
await using proc2 = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream2,
stdout: "pipe",
@@ -443,38 +448,6 @@ describe("spawn stdin ReadableStream", () => {
expect(await proc2.exited).toBe(0);
});
test("ReadableStream file descriptor cleanup", async () => {
const maxFD = getMaxFD();
const iterations = 10;
for (let i = 0; i < iterations; i++) {
const stream = new ReadableStream({
start(controller) {
controller.enqueue(`iteration ${i}\n`);
controller.close();
},
});
const proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
});
const text = await new Response(proc.stdout).text();
expect(text).toBe(`iteration ${i}\n`);
expect(await proc.exited).toBe(0);
}
// Force garbage collection
Bun.gc(true);
await Bun.sleep(50);
// Check that we didn't leak file descriptors
const newMaxFD = getMaxFD();
expect(newMaxFD).toBeLessThanOrEqual(maxFD + 10); // Allow some variance
});
test("ReadableStream with empty stream", async () => {
const stream = new ReadableStream({
start(controller) {
@@ -483,7 +456,7 @@ describe("spawn stdin ReadableStream", () => {
},
});
const proc = spawn({
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
@@ -503,7 +476,7 @@ describe("spawn stdin ReadableStream", () => {
},
});
const proc = spawn({
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
@@ -534,7 +507,7 @@ describe("spawn stdin ReadableStream", () => {
const transformedStream = originalStream.pipeThrough(upperCaseTransform);
const proc = spawn({
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: transformedStream,
stdout: "pipe",
@@ -557,7 +530,7 @@ describe("spawn stdin ReadableStream", () => {
const [stream1, stream2] = originalStream.tee();
// Use the first branch for the process
const proc = spawn({
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream1,
stdout: "pipe",
@@ -574,29 +547,37 @@ describe("spawn stdin ReadableStream", () => {
});
test("ReadableStream object type count", async () => {
const iterations = 5;
const iterations = 50;
for (let i = 0; i < iterations; i++) {
const stream = new ReadableStream({
start(controller) {
controller.enqueue(`iteration ${i}`);
controller.close();
},
});
async function main() {
async function iterate(i: number) {
const stream = new ReadableStream({
async pull(controller) {
await Bun.sleep(0);
controller.enqueue(`iteration ${i}`);
controller.close();
},
});
const proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
});
await using proc = spawn({
cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"],
stdin: stream,
stdout: "pipe",
env: bunEnv,
});
await new Response(proc.stdout).text();
await proc.exited;
await Promise.all([new Response(proc.stdout).text(), proc.exited]);
}
const promises = Array.from({ length: iterations }, (_, i) => iterate(i));
await Promise.all(promises);
}
// Force cleanup
await main();
await Bun.sleep(1);
Bun.gc(true);
await Bun.sleep(100);
await Bun.sleep(1);
// Check that we're not leaking objects
await expectMaxObjectTypeCount(expect, "ReadableStream", 10);