Close the streams more

This commit is contained in:
Jarred Sumner
2022-11-23 21:31:38 -08:00
parent 21531f1e80
commit bddf484c2c
4 changed files with 94 additions and 29 deletions

View File

@@ -110,7 +110,7 @@ pub const Subprocess = struct {
pub fn done(this: *@This()) void {
if (this.* == .stream) {
if (this.stream.ptr == .File) this.stream.ptr.File.finish();
if (this.stream.ptr == .File) this.stream.ptr.File.setSignal(JSC.WebCore.Signal{});
this.stream.done();
return;
}
@@ -132,12 +132,11 @@ pub const Subprocess = struct {
}
};
pub fn init(stdio: Stdio, fd: i32, other_fd: i32, _: *JSC.JSGlobalObject) Readable {
pub fn init(stdio: Stdio, fd: i32, _: *JSC.JSGlobalObject) Readable {
return switch (stdio) {
.inherit => Readable{ .inherit = {} },
.ignore => Readable{ .ignore = {} },
.pipe => brk: {
_ = JSC.Node.Syscall.close(other_fd);
break :brk .{
.pipe = .{
.buffer = undefined,
@@ -164,8 +163,6 @@ pub const Subprocess = struct {
_ = JSC.Node.Syscall.close(fd);
},
.pipe => {
if (this.pipe == .stream and this.pipe.stream.ptr == .File)
this.pipe.stream.ptr.File.readable().FIFO.signal.clear();
this.pipe.done();
},
else => {},
@@ -692,7 +689,7 @@ pub const Subprocess = struct {
pub fn onReady(_: *Writable, _: ?JSC.WebCore.Blob.SizeType, _: ?JSC.WebCore.Blob.SizeType) void {}
pub fn onStart(_: *Writable) void {}
pub fn init(stdio: Stdio, fd: i32, other_fd: i32, globalThis: *JSC.JSGlobalObject) !Writable {
pub fn init(stdio: Stdio, fd: i32, globalThis: *JSC.JSGlobalObject) !Writable {
switch (stdio) {
.pipe => {
var sink = try globalThis.bunVM().allocator.create(JSC.WebCore.FileSink);
@@ -702,7 +699,6 @@ pub const Subprocess = struct {
.allocator = globalThis.bunVM().allocator,
.auto_close = true,
};
if (other_fd != bun.invalid_fd) _ = JSC.Node.Syscall.close(other_fd);
sink.mode = std.os.S.IFIFO;
if (stdio == .pipe) {
if (stdio.pipe) |readable| {
@@ -718,7 +714,6 @@ pub const Subprocess = struct {
return Writable{ .pipe = sink };
},
.array_buffer, .blob => {
if (other_fd != bun.invalid_fd) _ = JSC.Node.Syscall.close(other_fd);
var buffered_input: BufferedInput = .{ .fd = fd, .source = undefined };
switch (stdio) {
.array_buffer => |array_buffer| {
@@ -757,13 +752,14 @@ pub const Subprocess = struct {
pub fn close(this: *Writable) void {
return switch (this.*) {
.pipe => |pipe| {
_ = pipe.end(null);
pipe.close();
},
.pipe_to_readable_stream => |*pipe_to_readable_stream| {
_ = pipe_to_readable_stream.pipe.end(null);
},
.fd => |fd| {
_ = JSC.Node.Syscall.close(fd);
this.* = .{ .ignore = {} };
},
.buffered_input => {
this.buffered_input.deinit();
@@ -778,7 +774,7 @@ pub const Subprocess = struct {
this.closeProcess();
this.stdin.close();
this.stderr.close();
this.stdin.close();
this.stdout.close();
this.exit_promise.deinit();
this.on_exit_callback.deinit();
@@ -1049,19 +1045,16 @@ pub const Subprocess = struct {
globalThis.throw("failed to create stdin pipe: {s}", .{err});
return .zero;
} else undefined;
errdefer if (stdio[0].isPiped()) destroyPipe(stdin_pipe);
const stdout_pipe = if (stdio[1].isPiped()) os.pipe2(0) catch |err| {
globalThis.throw("failed to create stdout pipe: {s}", .{err});
return .zero;
} else undefined;
errdefer if (stdio[1].isPiped()) destroyPipe(stdout_pipe);
const stderr_pipe = if (stdio[2].isPiped()) os.pipe2(0) catch |err| {
globalThis.throw("failed to create stderr pipe: {s}", .{err});
return .zero;
} else undefined;
errdefer if (stdio[2].isPiped()) destroyPipe(stderr_pipe);
stdio[0].setUpChildIoPosixSpawn(
&actions,
@@ -1096,9 +1089,25 @@ pub const Subprocess = struct {
env = @ptrCast(@TypeOf(env), env_array.items.ptr);
}
const pid = switch (PosixSpawn.spawnZ(argv.items[0].?, actions, attr, @ptrCast([*:null]?[*:0]const u8, argv.items[0..].ptr), env)) {
.err => |err| return err.toJSC(globalThis),
.result => |pid_| pid_,
const pid = brk: {
defer {
if (stdio[0].isPiped()) {
_ = JSC.Node.Syscall.close(stdin_pipe[0]);
}
if (stdio[1].isPiped()) {
_ = JSC.Node.Syscall.close(stdout_pipe[1]);
}
if (stdio[2].isPiped()) {
_ = JSC.Node.Syscall.close(stderr_pipe[1]);
}
}
break :brk switch (PosixSpawn.spawnZ(argv.items[0].?, actions, attr, @ptrCast([*:null]?[*:0]const u8, argv.items[0..].ptr), env)) {
.err => |err| return err.toJSC(globalThis),
.result => |pid_| pid_,
};
};
const pidfd: std.os.fd_t = brk: {
@@ -1141,15 +1150,16 @@ pub const Subprocess = struct {
.globalThis = globalThis,
.pid = pid,
.pidfd = pidfd,
.stdin = Writable.init(stdio[std.os.STDIN_FILENO], stdin_pipe[1], stdin_pipe[0], globalThis) catch {
.stdin = Writable.init(stdio[std.os.STDIN_FILENO], stdin_pipe[1], globalThis) catch {
globalThis.throw("out of memory", .{});
return .zero;
},
.stdout = Readable.init(stdio[std.os.STDOUT_FILENO], stdout_pipe[0], stdout_pipe[1], globalThis),
.stderr = Readable.init(stdio[std.os.STDERR_FILENO], stderr_pipe[0], stderr_pipe[1], globalThis),
.stdout = Readable.init(stdio[std.os.STDOUT_FILENO], stdout_pipe[0], globalThis),
.stderr = Readable.init(stdio[std.os.STDERR_FILENO], stderr_pipe[0], globalThis),
.on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{},
.is_sync = is_sync,
};
if (subprocess.stdin == .pipe) {
subprocess.stdin.pipe.signal = JSC.WebCore.Signal.init(&subprocess.stdin);
}
@@ -1280,11 +1290,6 @@ pub const Subprocess = struct {
this.has_waitpid_task = true;
const pid = this.pid;
if (!sync) {
// signal to the other end we are definitely done
this.stdin.close();
}
switch (PosixSpawn.waitpid(pid, 0)) {
.err => |err| {
this.waitpid_err = err;
@@ -1321,9 +1326,9 @@ pub const Subprocess = struct {
}
fn onExit(this: *Subprocess, globalThis: *JSC.JSGlobalObject) void {
this.stdin.close();
this.stdout.close();
this.stderr.close();
// this.stdin.close();
// this.stdout.close();
// this.stderr.close();
defer this.updateHasPendingActivity();
this.has_waitpid_task = false;

View File

@@ -1427,6 +1427,8 @@ pub const FileSink = struct {
}
fn cleanup(this: *FileSink) void {
this.done = true;
if (this.poll_ref) |poll| {
this.poll_ref = null;
poll.deinit();
@@ -1464,7 +1466,6 @@ pub const FileSink = struct {
}
pub fn onHangup(this: *FileSink) void {
this.done = true;
this.signal.clear();
this.cleanup();
@@ -3634,7 +3635,8 @@ pub const FIFO = struct {
const read_result = this.read(this.buf, available_to_read);
if (read_result == .read and read_result.read.len == 0) {
this.unwatch(this.poll_ref.?.fd);
if (this.poll_ref != null)
this.unwatch(this.poll_ref.?.fd);
this.close();
return;
}

View File

@@ -0,0 +1,53 @@
import { it, test, expect } from "bun:test";
import { spawn } from "bun";
import { bunExe } from "./bunExe";
import { gcTick } from "gc";
const N = 100;
test("spawn can write to stdin multiple chunks", async () => {
for (let i = 0; i < N; i++) {
var exited;
await (async function () {
const proc = spawn({
cmd: [bunExe(), import.meta.dir + "/stdin-repro.js"],
stdout: "pipe",
stdin: "pipe",
stderr: "inherit",
env: {
BUN_DEBUG_QUIET_LOGS: 1,
},
});
exited = proc.exited;
var counter = 0;
var inCounter = 0;
const prom2 = (async function () {
while (inCounter++ < 4) {
await new Promise((resolve, reject) => setTimeout(resolve, 8));
proc.stdin.write("Wrote to stdin!");
await proc.stdin.flush();
}
await proc.stdin.end();
})();
const prom = (async function () {
try {
for await (var chunk of proc.stdout) {
expect(new TextDecoder().decode(chunk)).toBe("Wrote to stdin!\n");
counter++;
if (counter > 3) break;
}
} catch (e) {
console.log(e.stack);
throw e;
}
})();
await Promise.all([prom, prom2]);
expect(counter).toBe(4);
// proc.kill();
})();
await exited;
}
gcTick(true);
});

View File

@@ -0,0 +1,5 @@
while (true) {
for await (let chunk of Bun.stdin.stream()) {
console.log(new Buffer(chunk).toString());
}
}