Add more tests for Bun.spawn lifecycle and address edgecase (#6904)

* FIxup spawn ref / unref

* Fix test failures

* Add test for #3480

* windows

* 🪟

* Skip on linux

* Fix test

---------

Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
This commit is contained in:
Jarred Sumner
2023-11-05 00:00:19 -07:00
committed by GitHub
parent e954bce301
commit 98f20170a3
16 changed files with 305 additions and 220 deletions

View File

@@ -179,11 +179,11 @@ void bun_on_tick_after(void* ctx);
void us_loop_run_bun_tick(struct us_loop_t *loop, int64_t timeoutMs, void* tickCallbackContext) {
us_loop_integrate(loop);
if (loop->num_polls == 0)
return;
us_loop_integrate(loop);
if (tickCallbackContext) {
bun_on_tick_before(tickCallbackContext);
}

View File

@@ -32,8 +32,7 @@ pub const KeepAlive = struct {
return;
this.status = .inactive;
loop.num_polls -= 1;
loop.active -|= 1;
loop.subActive(1);
}
/// Only intended to be used from EventLoop.Pollable
@@ -42,8 +41,7 @@ pub const KeepAlive = struct {
return;
this.status = .active;
loop.num_polls += 1;
loop.active += 1;
loop.addActive(1);
}
pub fn init() KeepAlive {
@@ -55,7 +53,7 @@ pub const KeepAlive = struct {
if (this.status != .active)
return;
this.status = .inactive;
vm.event_loop_handle.?.unref();
vm.event_loop_handle.?.subActive(1);
}
/// From another thread, Prevent a poll from keeping the process alive.
@@ -159,17 +157,17 @@ pub const FilePoll = struct {
poll.flags = flags;
}
pub fn onKQueueEvent(poll: *FilePoll, loop: *Loop, kqueue_event: *const std.os.system.kevent64_s) void {
pub fn onKQueueEvent(poll: *FilePoll, _: *Loop, kqueue_event: *const std.os.system.kevent64_s) void {
if (KQueueGenerationNumber != u0)
std.debug.assert(poll.generation_number == kqueue_event.ext[0]);
poll.updateFlags(Flags.fromKQueueEvent(kqueue_event.*));
poll.onUpdate(loop, kqueue_event.data);
poll.onUpdate(kqueue_event.data);
}
pub fn onEpollEvent(poll: *FilePoll, loop: *Loop, epoll_event: *std.os.linux.epoll_event) void {
pub fn onEpollEvent(poll: *FilePoll, _: *Loop, epoll_event: *std.os.linux.epoll_event) void {
poll.updateFlags(Flags.fromEpollEvent(epoll_event.*));
poll.onUpdate(loop, 0);
poll.onUpdate(0);
}
pub fn clearEvent(poll: *FilePoll, flag: Flags) void {
@@ -213,9 +211,7 @@ pub const FilePoll = struct {
}
fn deinitPossiblyDefer(this: *FilePoll, vm: *JSC.VirtualMachine, loop: *Loop, polls: *FilePoll.Store, force_unregister: bool) void {
if (this.isRegistered()) {
_ = this.unregister(loop, force_unregister);
}
_ = this.unregister(loop, force_unregister);
this.owner = Deactivated.owner;
const was_ever_registered = this.flags.contains(.was_ever_registered);
@@ -235,14 +231,11 @@ pub const FilePoll = struct {
const kqueue_or_epoll = if (Environment.isMac) "kevent" else "epoll";
pub fn onUpdate(poll: *FilePoll, loop: *Loop, size_or_offset: i64) void {
pub fn onUpdate(poll: *FilePoll, size_or_offset: i64) void {
if (poll.flags.contains(.one_shot) and !poll.flags.contains(.needs_rearm)) {
if (poll.flags.contains(.has_incremented_poll_count)) {
loop.active -|= @as(u32, @intFromBool(!poll.flags.contains(.disable)));
poll.flags.remove(.has_incremented_poll_count);
}
poll.flags.insert(.needs_rearm);
}
var ptr = poll.owner;
switch (ptr.tag()) {
@field(Owner.Tag, "FIFO") => {
@@ -310,8 +303,10 @@ pub const FilePoll = struct {
needs_rearm,
has_incremented_poll_count,
has_incremented_active_count,
closed,
disable,
keeps_event_loop_alive,
nonblocking,
@@ -448,53 +443,48 @@ pub const FilePoll = struct {
return !this.flags.contains(.needs_rearm) and (this.flags.contains(.poll_readable) or this.flags.contains(.poll_writable) or this.flags.contains(.poll_process));
}
pub inline fn isKeepingProcessAlive(this: *const FilePoll) bool {
return !this.flags.contains(.disable) and this.isActive();
}
pub inline fn canDisableKeepingProcessAlive(this: *const FilePoll) bool {
return !this.flags.contains(.disable) and this.flags.contains(.has_incremented_poll_count);
}
/// Make calling ref() on this poll into a no-op.
/// This decrements the active counter if it was previously incremented
/// "active" controls whether or not the event loop should potentially idle
pub fn disableKeepingProcessAlive(this: *FilePoll, vm: *JSC.VirtualMachine) void {
if (this.flags.contains(.disable))
return;
this.flags.insert(.disable);
vm.event_loop_handle.?.active -= @as(u32, @intFromBool(this.flags.contains(.has_incremented_poll_count)));
vm.event_loop_handle.?.subActive(@as(u32, @intFromBool(this.flags.contains(.has_incremented_active_count))));
this.flags.remove(.keeps_event_loop_alive);
this.flags.remove(.has_incremented_active_count);
}
pub inline fn canEnableKeepingProcessAlive(this: *const FilePoll) bool {
return this.flags.contains(.disable) and this.flags.contains(.has_incremented_poll_count);
return this.flags.contains(.keeps_event_loop_alive) and this.flags.contains(.has_incremented_poll_count);
}
pub fn enableKeepingProcessAlive(this: *FilePoll, vm: *JSC.VirtualMachine) void {
if (!this.flags.contains(.disable))
if (this.flags.contains(.closed))
return;
this.flags.remove(.disable);
vm.event_loop_handle.?.active += @as(u32, @intFromBool(this.flags.contains(.has_incremented_poll_count)));
}
pub fn canActivate(this: *const FilePoll) bool {
return !this.flags.contains(.has_incremented_poll_count);
vm.event_loop_handle.?.addActive(@as(u32, @intFromBool(!this.flags.contains(.has_incremented_active_count))));
this.flags.insert(.keeps_event_loop_alive);
this.flags.insert(.has_incremented_active_count);
}
/// Only intended to be used from EventLoop.Pollable
pub fn deactivate(this: *FilePoll, loop: *Loop) void {
std.debug.assert(this.flags.contains(.has_incremented_poll_count));
fn deactivate(this: *FilePoll, loop: *Loop) void {
loop.num_polls -= @as(i32, @intFromBool(this.flags.contains(.has_incremented_poll_count)));
loop.active -|= @as(u32, @intFromBool(!this.flags.contains(.disable) and this.flags.contains(.has_incremented_poll_count)));
this.flags.remove(.has_incremented_poll_count);
loop.subActive(@as(u32, @intFromBool(this.flags.contains(.has_incremented_active_count))));
this.flags.remove(.keeps_event_loop_alive);
this.flags.remove(.has_incremented_active_count);
}
/// Only intended to be used from EventLoop.Pollable
pub fn activate(this: *FilePoll, loop: *Loop) void {
fn activate(this: *FilePoll, loop: *Loop) void {
this.flags.remove(.closed);
loop.num_polls += @as(i32, @intFromBool(!this.flags.contains(.has_incremented_poll_count)));
loop.active += @as(u32, @intFromBool(!this.flags.contains(.disable) and !this.flags.contains(.has_incremented_poll_count)));
this.flags.insert(.has_incremented_poll_count);
if (this.flags.contains(.keeps_event_loop_alive)) {
loop.addActive(@as(u32, @intFromBool(!this.flags.contains(.has_incremented_active_count))));
this.flags.insert(.has_incremented_active_count);
}
}
pub fn init(vm: *JSC.VirtualMachine, fd: bun.FileDescriptor, flags: Flags.Struct, comptime Type: type, owner: *Type) *FilePoll {
@@ -515,31 +505,26 @@ pub const FilePoll = struct {
return poll;
}
pub inline fn canRef(this: *const FilePoll) bool {
if (this.flags.contains(.disable))
return false;
return !this.flags.contains(.has_incremented_poll_count);
}
pub inline fn canUnref(this: *const FilePoll) bool {
return this.flags.contains(.has_incremented_poll_count);
}
/// Prevent a poll from keeping the process alive.
pub fn unref(this: *FilePoll, vm: *JSC.VirtualMachine) void {
if (!this.canUnref())
return;
log("unref", .{});
this.deactivate(vm.event_loop_handle.?);
this.disableKeepingProcessAlive(vm);
}
/// Allow a poll to keep the process alive.
pub fn ref(this: *FilePoll, vm: *JSC.VirtualMachine) void {
if (!this.canRef())
if (this.flags.contains(.closed))
return;
log("ref", .{});
this.activate(vm.event_loop_handle.?);
this.enableKeepingProcessAlive(vm);
}
pub fn onEnded(this: *FilePoll, vm: *JSC.VirtualMachine) void {
this.flags.remove(.keeps_event_loop_alive);
this.flags.insert(.closed);
this.deactivate(vm.event_loop_handle.?);
}
pub fn onTick(loop: *Loop, tagged_pointer: ?*anyopaque) callconv(.C) void {
@@ -611,6 +596,7 @@ pub const FilePoll = struct {
);
this.flags.insert(.was_ever_registered);
if (JSC.Maybe(void).errnoSys(ctl, .epoll_ctl)) |errno| {
this.deactivate(loop);
return errno;
}
} else if (comptime Environment.isMac) {
@@ -697,6 +683,7 @@ pub const FilePoll = struct {
const errno = std.c.getErrno(rc);
if (errno != .SUCCESS) {
this.deactivate(loop);
return JSC.Maybe(void){
.err = bun.sys.Error.fromCode(errno, .kqueue),
};
@@ -704,8 +691,7 @@ pub const FilePoll = struct {
} else {
bun.todo(@src(), {});
}
if (this.canActivate())
this.activate(loop);
this.activate(loop);
this.flags.insert(switch (flag) {
.readable => .poll_readable,
.process => if (comptime Environment.isLinux) .poll_readable else .poll_process,
@@ -725,7 +711,10 @@ pub const FilePoll = struct {
}
pub fn unregisterWithFd(this: *FilePoll, loop: *Loop, fd: bun.UFileDescriptor, force_unregister: bool) JSC.Maybe(void) {
defer this.deactivate(loop);
if (!(this.flags.contains(.poll_readable) or this.flags.contains(.poll_writable) or this.flags.contains(.poll_process) or this.flags.contains(.poll_machport))) {
// no-op
return JSC.Maybe(void).success;
}
@@ -853,8 +842,6 @@ pub const FilePoll = struct {
this.flags.remove(.poll_writable);
this.flags.remove(.poll_process);
this.flags.remove(.poll_machport);
if (this.isActive())
this.deactivate(loop);
return JSC.Maybe(void).success;
}

View File

@@ -148,7 +148,7 @@ pub const FilePoll = struct {
}
pub inline fn isKeepingProcessAlive(this: *const FilePoll) bool {
return !this.flags.contains(.disable) and this.isActive();
return !this.flags.contains(.closed) and this.isActive();
}
pub fn isRegistered(this: *const FilePoll) bool {
@@ -157,9 +157,9 @@ pub const FilePoll = struct {
/// Make calling ref() on this poll into a no-op.
pub fn disableKeepingProcessAlive(this: *FilePoll, vm: *JSC.VirtualMachine) void {
if (this.flags.contains(.disable))
if (this.flags.contains(.closed))
return;
this.flags.insert(.disable);
this.flags.insert(.closed);
vm.event_loop_handle.?.active_handles -= @as(u32, @intFromBool(this.flags.contains(.has_incremented_poll_count)));
}
@@ -183,6 +183,8 @@ pub const FilePoll = struct {
this.deinitWithVM(vm);
}
pub const deinitForceUnregister = deinit;
pub fn unregister(this: *FilePoll, loop: *Loop) bool {
_ = loop;
uv.uv_unref(@ptrFromInt(this.fd));
@@ -234,9 +236,9 @@ pub const FilePoll = struct {
}
pub fn enableKeepingProcessAlive(this: *FilePoll, vm: *JSC.VirtualMachine) void {
if (!this.flags.contains(.disable))
if (!this.flags.contains(.closed))
return;
this.flags.remove(.disable);
this.flags.remove(.closed);
vm.event_loop_handle.?.active_handles += @as(u32, @intFromBool(this.flags.contains(.has_incremented_poll_count)));
}
@@ -248,18 +250,18 @@ pub const FilePoll = struct {
/// Only intended to be used from EventLoop.Pollable
pub fn deactivate(this: *FilePoll, loop: *Loop) void {
std.debug.assert(this.flags.contains(.has_incremented_poll_count));
loop.active_handles -= @as(u32, @intFromBool(!this.flags.contains(.disable) and this.flags.contains(.has_incremented_poll_count)));
loop.active_handles -= @as(u32, @intFromBool(this.flags.contains(.has_incremented_poll_count)));
this.flags.remove(.has_incremented_poll_count);
}
/// Only intended to be used from EventLoop.Pollable
pub fn activate(this: *FilePoll, loop: *Loop) void {
loop.active_handles += @as(u32, @intFromBool(!this.flags.contains(.disable) and !this.flags.contains(.has_incremented_poll_count)));
loop.active_handles += @as(u32, @intFromBool(!this.flags.contains(.closed) and !this.flags.contains(.has_incremented_poll_count)));
this.flags.insert(.has_incremented_poll_count);
}
pub inline fn canRef(this: *const FilePoll) bool {
if (this.flags.contains(.disable))
if (this.flags.contains(.closed))
return false;
return !this.flags.contains(.has_incremented_poll_count);

View File

@@ -289,7 +289,7 @@ pub const PosixSpawn = struct {
/// `execve` method.
pub fn waitpid(pid: pid_t, flags: u32) Maybe(WaitPidResult) {
const Status = c_int;
var status: Status = undefined;
var status: Status = 0;
while (true) {
const rc = system.waitpid(pid, &status, @as(c_int, @intCast(flags)));
switch (errno(rc)) {

View File

@@ -99,7 +99,7 @@ pub const Subprocess = struct {
if (this.poll == .poll_ref) {
if (this.poll.poll_ref) |poll| {
if (poll.isRegistered()) {
if (poll.isActive() or poll.isRegistered()) {
return true;
}
}
@@ -113,8 +113,9 @@ pub const Subprocess = struct {
pub fn updateHasPendingActivity(this: *Subprocess) void {
@fence(.SeqCst);
if (Environment.isDebug) {
log("updateHasPendingActivity: real: {}", .{
if (comptime Environment.isDebug) {
log("updateHasPendingActivity() {any} -> {any}", .{
this.has_pending_activity.value,
this.hasPendingActivityNonThreadsafe(),
});
}
@@ -126,12 +127,6 @@ pub const Subprocess = struct {
pub fn hasPendingActivity(this: *Subprocess) callconv(.C) bool {
@fence(.Acquire);
if (Environment.isDebug) {
log("hasPendingActivity: {} (real: {})", .{
this.has_pending_activity.load(.Acquire),
this.hasPendingActivityNonThreadsafe(),
});
}
return this.has_pending_activity.load(.Acquire);
}
@@ -167,7 +162,7 @@ pub const Subprocess = struct {
switch (this.poll) {
.poll_ref => if (this.poll.poll_ref) |poll| {
if (deactivate_poll_ref) {
poll.disableKeepingProcessAlive(vm);
poll.onEnded(vm);
} else {
poll.unref(vm);
}
@@ -419,11 +414,11 @@ pub const Subprocess = struct {
}
pub fn hasKilled(this: *const Subprocess) bool {
return this.flags.killed or this.exit_code != null;
return this.exit_code != null or this.signal_code != null;
}
pub fn tryKill(this: *Subprocess, sig: i32) JSC.Node.Maybe(void) {
if (this.hasKilled()) {
if (this.hasExited()) {
return .{ .result = {} };
}
@@ -462,7 +457,6 @@ pub const Subprocess = struct {
}
}
this.flags.killed = true;
return .{ .result = {} };
}
@@ -1094,8 +1088,17 @@ pub const Subprocess = struct {
this: *Subprocess,
globalThis: *JSGlobalObject,
) callconv(.C) JSValue {
if (this.exit_code) |code| {
return JSC.JSPromise.resolvedPromiseValue(globalThis, JSC.JSValue.jsNumber(code));
if (this.hasExited()) {
const waitpid_error = this.waitpid_err;
if (this.exit_code) |code| {
return JSC.JSPromise.resolvedPromiseValue(globalThis, JSValue.jsNumber(code));
} else if (waitpid_error) |err| {
return JSC.JSPromise.rejectedPromiseValue(globalThis, err.toJSC(globalThis));
} else if (this.signal_code != null) {
return JSC.JSPromise.resolvedPromiseValue(globalThis, JSValue.jsNumber(128 +% @intFromEnum(this.signal_code.?)));
} else {
@panic("Subprocess.getExited() has exited but has no exit code or signal code. This is a bug.");
}
}
if (!this.exit_promise.has()) {
@@ -1147,7 +1150,6 @@ pub const Subprocess = struct {
globalThis.throwTODO("spawn() is not yet implemented on Windows");
return .zero;
}
var arena = @import("root").bun.ArenaAllocator.init(bun.default_allocator);
defer arena.deinit();
var allocator = arena.allocator();
@@ -1631,7 +1633,9 @@ pub const Subprocess = struct {
.process,
true,
)) {
.result => {},
.result => {
subprocess.poll.poll_ref.?.enableKeepingProcessAlive(jsc_vm);
},
.err => |err| {
if (err.getErrno() != .SRCH) {
@panic("This shouldn't happen");
@@ -1697,7 +1701,9 @@ pub const Subprocess = struct {
.process,
true,
)) {
.result => {},
.result => {
subprocess.poll.poll_ref.?.enableKeepingProcessAlive(jsc_vm);
},
.err => |err| {
if (err.getErrno() != .SRCH) {
@panic("This shouldn't happen");
@@ -1797,6 +1803,7 @@ pub const Subprocess = struct {
const pid = this.pid;
var waitpid_result = waitpid_result_;
while (true) {
switch (waitpid_result) {
.err => |err| {
@@ -1808,9 +1815,16 @@ pub const Subprocess = struct {
this.exit_code = @as(u8, @truncate(std.os.W.EXITSTATUS(result.status)));
}
// True if the process terminated due to receipt of a signal.
if (std.os.W.IFSIGNALED(result.status)) {
this.signal_code = @as(SignalCode, @enumFromInt(@as(u8, @truncate(std.os.W.TERMSIG(result.status)))));
} else if (std.os.W.IFSTOPPED(result.status)) {
} else if (
// https://developer.apple.com/library/archive/documentation/System/Conceptual/ManPages_iPhoneOS/man2/waitpid.2.html
// True if the process has not terminated, but has stopped and can
// be restarted. This macro can be true only if the wait call spec-ified specified
// ified the WUNTRACED option or if the child process is being
// traced (see ptrace(2)).
std.os.W.IFSTOPPED(result.status)) {
this.signal_code = @as(SignalCode, @enumFromInt(@as(u8, @truncate(std.os.W.STOPSIG(result.status)))));
}
}
@@ -1853,12 +1867,59 @@ pub const Subprocess = struct {
}
}
fn runOnExit(this: *Subprocess, globalThis: *JSC.JSGlobalObject, this_jsvalue: JSC.JSValue) void {
const waitpid_error = this.waitpid_err;
this.waitpid_err = null;
if (this.exit_promise.trySwap()) |promise| {
if (this.exit_code) |code| {
promise.asAnyPromise().?.resolve(globalThis, JSValue.jsNumber(code));
} else if (waitpid_error) |err| {
promise.asAnyPromise().?.reject(globalThis, err.toJSC(globalThis));
} else if (this.signal_code != null) {
promise.asAnyPromise().?.resolve(globalThis, JSValue.jsNumber(128 +% @intFromEnum(this.signal_code.?)));
} else {
// crash in debug mode
if (comptime Environment.allow_assert)
unreachable;
}
}
if (this.on_exit_callback.trySwap()) |callback| {
const waitpid_value: JSValue =
if (waitpid_error) |err|
err.toJSC(globalThis)
else
JSC.JSValue.jsUndefined();
const this_value = if (this_jsvalue.isEmptyOrUndefinedOrNull()) JSC.JSValue.jsUndefined() else this_jsvalue;
this_value.ensureStillAlive();
const args = [_]JSValue{
this_value,
this.getExitCode(globalThis),
this.getSignalCode(globalThis),
waitpid_value,
};
const result = callback.callWithThis(
globalThis,
this_value,
&args,
);
if (result.isAnyError()) {
globalThis.bunVM().onUnhandledError(globalThis, result);
}
}
}
fn onExit(
this: *Subprocess,
globalThis: *JSC.JSGlobalObject,
this_jsvalue: JSC.JSValue,
) void {
log("onExit {d}, code={d}", .{ this.pid, if (this.exit_code) |e| @as(i32, @intCast(e)) else -1 });
log("onExit({d}) = {d}, \"{s}\"", .{ this.pid, if (this.exit_code) |e| @as(i32, @intCast(e)) else -1, if (this.signal_code) |code| @tagName(code) else "" });
defer this.updateHasPendingActivity();
this_jsvalue.ensureStillAlive();
@@ -1889,53 +1950,7 @@ pub const Subprocess = struct {
this.globalThis.bunVM().enqueueTask(JSC.Task.init(&holder.task));
}
if (this.exit_promise.trySwap()) |promise| {
const waitpid_error = this.waitpid_err;
this.waitpid_err = null;
if (this.exit_code) |code| {
promise.asAnyPromise().?.resolve(globalThis, JSValue.jsNumber(code));
} else if (this.signal_code != null) {
promise.asAnyPromise().?.resolve(globalThis, this.getSignalCode(globalThis));
} else if (waitpid_error) |err| {
promise.asAnyPromise().?.reject(globalThis, err.toJSC(globalThis));
} else {
// crash in debug mode
if (comptime Environment.allow_assert)
unreachable;
}
}
}
if (this.on_exit_callback.trySwap()) |callback| {
const waitpid_error = this.waitpid_err;
this.waitpid_err = null;
const waitpid_value: JSValue =
if (waitpid_error) |err|
err.toJSC(globalThis)
else
JSC.JSValue.jsUndefined();
const this_value = if (this_jsvalue.isEmptyOrUndefinedOrNull()) JSC.JSValue.jsUndefined() else this_jsvalue;
this_value.ensureStillAlive();
const args = [_]JSValue{
this_value,
this.getExitCode(globalThis),
this.getSignalCode(globalThis),
waitpid_value,
};
const result = callback.callWithThis(
globalThis,
this_value,
&args,
);
if (result.isAnyError()) {
globalThis.bunVM().onUnhandledError(globalThis, result);
}
this.runOnExit(globalThis, this_jsvalue);
}
}

View File

@@ -1257,9 +1257,9 @@ pub const FileSink = struct {
pub fn updateRef(this: *FileSink, value: bool) void {
if (this.poll_ref) |poll| {
if (value)
poll.enableKeepingProcessAlive(JSC.VirtualMachine.get())
poll.ref(JSC.VirtualMachine.get())
else
poll.disableKeepingProcessAlive(JSC.VirtualMachine.get());
poll.unref(JSC.VirtualMachine.get());
}
}
@@ -1572,7 +1572,7 @@ pub const FileSink = struct {
if (this.poll_ref) |poll| {
this.poll_ref = null;
poll.deinit();
poll.deinitForceUnregister();
}
if (this.auto_close) {
@@ -1743,7 +1743,7 @@ pub const FileSink = struct {
if (signal_close) {
if (this.poll_ref) |poll| {
this.poll_ref = null;
poll.deinit();
poll.deinitForceUnregister();
}
this.fd = bun.invalid_fd;
@@ -3811,6 +3811,7 @@ pub const FIFO = struct {
this.close_on_empty_read = true;
if (this.poll_ref) |poll| {
poll.flags.insert(.hup);
poll.disableKeepingProcessAlive(JSC.VirtualMachine.get());
}
this.pending.result = .{ .done = {} };
@@ -3820,12 +3821,7 @@ pub const FIFO = struct {
pub fn close(this: *FIFO) void {
if (this.poll_ref) |poll| {
this.poll_ref = null;
if (comptime Environment.isLinux) {
// force target fd to be removed from epoll
poll.deinitForceUnregister();
} else {
poll.deinit();
}
poll.deinit();
}
const fd = this.fd;
@@ -4624,6 +4620,10 @@ pub const FileReader = struct {
}
};
pub fn toBlob(this: *Readable) Blob {
if (this.isClosed()) return Blob.initEmpty(JSC.VirtualMachine.get().global);
}
pub fn deinit(this: *Readable) void {
switch (this.*) {
.FIFO => {
@@ -4748,7 +4748,7 @@ pub const FileReader = struct {
},
};
this.lazy_readable.readable.FIFO.watch(readable_file.fd);
this.lazy_readable.readable.FIFO.pollRef().ref(this.globalThis().bunVM());
this.lazy_readable.readable.FIFO.pollRef().enableKeepingProcessAlive(this.globalThis().bunVM());
if (!(blob.data.file.is_atty orelse false)) {
this.lazy_readable.readable.FIFO.poll_ref.?.flags.insert(.nonblocking);
}
@@ -4822,9 +4822,9 @@ pub const FileReader = struct {
.FIFO => {
if (this.lazy_readable.readable.FIFO.poll_ref) |poll| {
if (value) {
poll.enableKeepingProcessAlive(this.globalThis().bunVM());
poll.ref(this.globalThis().bunVM());
} else {
poll.disableKeepingProcessAlive(this.globalThis().bunVM());
poll.unref(this.globalThis().bunVM());
}
}
},
@@ -4909,6 +4909,7 @@ pub fn NewReadyWatcher(
std.debug.assert(
this.poll_ref.?.unregister(JSC.VirtualMachine.get().event_loop_handle.?, false) == .result,
);
this.poll_ref.?.disableKeepingProcessAlive(JSC.VirtualMachine.get());
}
pub fn pollRef(this: *Context) *Async.FilePoll {

View File

@@ -969,7 +969,7 @@ pub const TestCommand = struct {
const file_end = reporter.jest.files.len;
for (file_start..file_end) |module_id| {
const module = reporter.jest.files.items(.module_scope)[module_id];
const module: *jest.DescribeScope = reporter.jest.files.items(.module_scope)[module_id];
vm.onUnhandledRejectionCtx = null;
vm.onUnhandledRejection = jest.TestRunnerTask.onUnhandledRejection;

View File

@@ -910,6 +910,16 @@ pub const PosixLoop = extern struct {
return this.active > 0;
}
// This exists as a method so that we can stick a debugger in here
pub fn addActive(this: *PosixLoop, value: u32) void {
this.active +|= value;
}
// This exists as a method so that we can stick a debugger in here
pub fn subActive(this: *PosixLoop, value: u32) void {
this.active -|= value;
}
pub fn unrefCount(this: *PosixLoop, count: i32) void {
log("unref x {d}", .{count});
this.num_polls -|= count;

View File

@@ -10,9 +10,10 @@ const N = 100;
test("spawn can write to stdin multiple chunks", async () => {
const maxFD = openSync("/dev/null", "w");
for (let i = 0; i < N; i++) {
const tmperr = join(tmpdir(), "stdin-repro-error.log." + i);
var exited;
await (async function () {
const tmperr = join(tmpdir(), "stdin-repro-error.log." + i);
const proc = spawn({
cmd: [bunExe(), import.meta.dir + "/stdin-repro.js"],
stdout: "pipe",
@@ -43,7 +44,7 @@ test("spawn can write to stdin multiple chunks", async () => {
if (inCounter === 4) break;
}
proc.stdin!.end();
await proc.stdin!.end();
})();
await Promise.all([prom, prom2]);

View File

@@ -500,7 +500,7 @@ describe("spawn unref and kill should not hang", () => {
it("kill and await exited", async () => {
for (let i = 0; i < 10; i++) {
const proc = spawn({
cmd: ["sleep", "0"],
cmd: ["sleep", "0.001"],
stdout: "ignore",
stderr: "ignore",
stdin: "ignore",
@@ -514,7 +514,7 @@ describe("spawn unref and kill should not hang", () => {
it("unref", async () => {
for (let i = 0; i < 100; i++) {
const proc = spawn({
cmd: ["sleep", "0"],
cmd: ["sleep", "0.001"],
stdout: "ignore",
stderr: "ignore",
stdin: "ignore",
@@ -528,7 +528,7 @@ describe("spawn unref and kill should not hang", () => {
it("kill and unref", async () => {
for (let i = 0; i < 100; i++) {
const proc = spawn({
cmd: ["sleep", "0"],
cmd: ["sleep", "0.001"],
stdout: "ignore",
stderr: "ignore",
stdin: "ignore",
@@ -543,7 +543,7 @@ describe("spawn unref and kill should not hang", () => {
it("unref and kill", async () => {
for (let i = 0; i < 100; i++) {
const proc = spawn({
cmd: ["sleep", "0"],
cmd: ["sleep", "0.001"],
stdout: "ignore",
stderr: "ignore",
stdin: "ignore",
@@ -555,36 +555,7 @@ describe("spawn unref and kill should not hang", () => {
expect().pass();
});
it("unref and kill after sleep", async () => {
for (let i = 0; i < 100; i++) {
const proc = spawn({
cmd: ["sleep", "0"],
stdout: "ignore",
stderr: "ignore",
stdin: "ignore",
});
await Bun.sleep(1);
proc.unref();
proc.kill();
await proc.exited;
}
expect().pass();
});
it("kill and unref after sleep", async () => {
for (let i = 0; i < 100; i++) {
const proc = spawn({
cmd: ["sleep", "0"],
stdout: "ignore",
stderr: "ignore",
stdin: "ignore",
});
await Bun.sleep(1);
proc.kill();
proc.unref();
await proc.exited;
}
expect().pass();
});
it("should not hang after unref", async () => {
const proc = spawn({
cmd: [bunExe(), path.join(import.meta.dir, "does-not-hang.js")],
@@ -594,3 +565,90 @@ describe("spawn unref and kill should not hang", () => {
expect().pass();
});
});
async function runTest(sleep: string, order = ["sleep", "kill", "unref", "exited"]) {
console.log("running", order.join(","));
for (let i = 0; i < 100; i++) {
const proc = spawn({
cmd: ["sleep", sleep],
stdout: "ignore",
stderr: "ignore",
stdin: "ignore",
});
for (let action of order) {
switch (action) {
case "sleep": {
await Bun.sleep(1);
break;
}
case "kill": {
proc.kill();
break;
}
case "unref": {
proc.unref();
break;
}
case "exited": {
expect(await proc.exited).toBeNumber();
break;
}
default: {
throw new Error("unknown action");
}
}
}
}
expect().pass();
}
describe("should not hang", () => {
for (let sleep of ["0.001", "0"]) {
describe("sleep " + sleep, () => {
for (let order of [
["sleep", "kill", "unref", "exited"],
["sleep", "unref", "kill", "exited"],
["kill", "sleep", "unref", "exited"],
["kill", "unref", "sleep", "exited"],
["unref", "sleep", "kill", "exited"],
["unref", "kill", "sleep", "exited"],
["exited", "sleep", "kill", "unref"],
["exited", "sleep", "unref", "kill"],
["exited", "kill", "sleep", "unref"],
["exited", "kill", "unref", "sleep"],
["exited", "unref", "sleep", "kill"],
["exited", "unref", "kill", "sleep"],
["unref", "exited"],
["exited", "unref"],
["kill", "exited"],
["exited"],
]) {
const name = order.join(",");
const fn = runTest.bind(undefined, sleep, order);
it(name, fn);
}
});
}
});
it("#3480", async () => {
try {
var server = Bun.serve({
port: 0,
fetch: (req, res) => {
Bun.spawnSync(["echo", "1"], {});
return new Response("Hello world!");
},
});
const response = await fetch("http://" + server.hostname + ":" + server.port);
expect(await response.text()).toBe("Hello world!");
expect(response.ok);
} finally {
server!.stop(true);
}
});

View File

@@ -1,10 +1,12 @@
var stdout = Bun.stdout.writer();
console.error("Started");
var count = 0;
for await (let chunk of Bun.stdin.stream()) {
// const file = Bun.file("/tmp/testpipe");
const file = Bun.stdin;
for await (let chunk of file.stream()) {
const str = new Buffer(chunk).toString();
stdout.write(str);
stdout.flush();
await stdout.flush();
count++;
}
console.error("Finished with", count);

View File

@@ -200,7 +200,7 @@ describe("ChildProcess spawn bad stdio", () => {
});
}
it.todo("should handle normal execution of child process", async () => {
it("should handle normal execution of child process", async () => {
await createChild({}, (err, stdout, stderr) => {
strictEqual(err, null);
strictEqual(stdout, "");
@@ -386,7 +386,7 @@ describe("child_process default options", () => {
});
describe("child_process double pipe", () => {
it.skip("should allow two pipes to be used at once", done => {
it.skipIf(process.platform === "linux")("should allow two pipes to be used at once", done => {
// const { mustCallAtLeast, mustCall } = createCallCheckCtx(done);
const mustCallAtLeast = fn => fn;
const mustCall = fn => fn;

View File

@@ -22,7 +22,7 @@ describe("ChildProcess.spawn()", () => {
resolve(true);
});
// @ts-ignore
proc.spawn({ file: "bun", args: ["bun", "-v"] });
proc.spawn({ file: bunExe(), args: [bunExe(), "-v"] });
});
expect(result).toBe(true);
});
@@ -34,7 +34,7 @@ describe("ChildProcess.spawn()", () => {
resolve(true);
});
// @ts-ignore
proc.spawn({ file: "bun", args: ["bun", "-v"] });
proc.spawn({ file: bunExe(), args: [bunExe(), "-v"] });
proc.kill();
});
expect(result).toBe(true);
@@ -60,8 +60,8 @@ describe("spawn()", () => {
expect(!!child2).toBe(false);
});
it.todo("should allow stdout to be read via Node stream.Readable `data` events", async () => {
const child = spawn("bun", ["-v"]);
it("should allow stdout to be read via Node stream.Readable `data` events", async () => {
const child = spawn(bunExe(), ["-v"]);
const result: string = await new Promise(resolve => {
child.stdout.on("error", e => {
console.error(e);
@@ -77,8 +77,8 @@ describe("spawn()", () => {
expect(SEMVER_REGEX.test(result.trim())).toBe(true);
});
it.todo("should allow stdout to be read via .read() API", async () => {
const child = spawn("bun", ["-v"]);
it("should allow stdout to be read via .read() API", async () => {
const child = spawn(bunExe(), ["-v"]);
const result: string = await new Promise((resolve, reject) => {
let finalData = "";
child.stdout.on("error", e => {
@@ -97,10 +97,10 @@ describe("spawn()", () => {
});
it("should accept stdio option with 'ignore' for no stdio fds", async () => {
const child1 = spawn("bun", ["-v"], {
const child1 = spawn(bunExe(), ["-v"], {
stdio: "ignore",
});
const child2 = spawn("bun", ["-v"], {
const child2 = spawn(bunExe(), ["-v"], {
stdio: ["ignore", "ignore", "ignore"],
});
@@ -177,7 +177,7 @@ describe("spawn()", () => {
resolve = resolve1;
});
process.env.NO_COLOR = "1";
const child = spawn("node", ["--help"], { argv0: "bun" });
const child = spawn("node", ["--help"], { argv0: bunExe() });
delete process.env.NO_COLOR;
let msg = "";
@@ -216,9 +216,9 @@ describe("spawn()", () => {
});
describe("execFile()", () => {
it.todo("should execute a file", async () => {
it("should execute a file", async () => {
const result: Buffer = await new Promise((resolve, reject) => {
execFile("bun", ["-v"], { encoding: "buffer" }, (error, stdout, stderr) => {
execFile(bunExe(), ["-v"], { encoding: "buffer" }, (error, stdout, stderr) => {
if (error) {
reject(error);
}
@@ -230,7 +230,7 @@ describe("execFile()", () => {
});
describe("exec()", () => {
it.todo("should execute a command in a shell", async () => {
it("should execute a command in a shell", async () => {
const result: Buffer = await new Promise((resolve, reject) => {
exec("bun -v", { encoding: "buffer" }, (error, stdout, stderr) => {
if (error) {
@@ -242,7 +242,7 @@ describe("exec()", () => {
expect(SEMVER_REGEX.test(result.toString().trim())).toBe(true);
});
it.todo("should return an object w/ stdout and stderr when promisified", async () => {
it("should return an object w/ stdout and stderr when promisified", async () => {
const result = await promisify(exec)("bun -v");
expect(typeof result).toBe("object");
expect(typeof result.stdout).toBe("string");
@@ -262,8 +262,8 @@ describe("spawnSync()", () => {
});
describe("execFileSync()", () => {
it.todo("should execute a file synchronously", () => {
const result = execFileSync("bun", ["-v"], { encoding: "utf8" });
it("should execute a file synchronously", () => {
const result = execFileSync(bunExe(), ["-v"], { encoding: "utf8" });
expect(SEMVER_REGEX.test(result.trim())).toBe(true);
});
@@ -277,7 +277,7 @@ describe("execFileSync()", () => {
});
describe("execSync()", () => {
it.todo("should execute a command in the shell synchronously", () => {
it("should execute a command in the shell synchronously", () => {
const result = execSync("bun -v", { encoding: "utf8" });
expect(SEMVER_REGEX.test(result.trim())).toBe(true);
});

View File

@@ -7,7 +7,14 @@ function closeHandler() {
console.log("closeHandler called");
}
const p = spawn("bun", ["--version"]);
let bunExe = process.execPath;
if ((process.versions.bun || "").endsWith("_debug")) {
bunExe = "bun-debug";
} else if (bunExe.endsWith("node")) {
bunExe = "bun";
}
const p = spawn(bunExe, ["--version"]);
p.on("exit", exitHandler);
p.on("close", closeHandler);

View File

@@ -443,7 +443,7 @@ describe("websocket in subprocess", () => {
subprocess.kill();
expect(await subprocess.exited).toBe("SIGHUP");
expect(await subprocess.exited).toBe(129);
});
it("should exit with invalid url", async () => {

View File

@@ -20,11 +20,13 @@ it("macros should not lead to seg faults under any given input", async () => {
writeFileSync(join(testDir, "macro.ts"), "export function fn(str) { return str; }");
writeFileSync(join(testDir, "index.ts"), "import { fn } from './macro' assert { type: 'macro' };\nfn(`©${''}`);");
const { stdout, exitCode } = Bun.spawnSync({
cmd: [bunExe(), "build", join(testDir, "index.ts")],
const { stderr, exitCode } = Bun.spawnSync({
cmd: [bunExe(), "build", "--minify", join(testDir, "index.ts")],
env: bunEnv,
stderr: "inherit",
stderr: "pipe",
});
expect(exitCode).toBe(0);
expect(stderr.toString().trim()).toStartWith('error: "Cannot convert argument type to JS');
expect(exitCode).not.toBe(0);
expect(exitCode).toBe(1);
});