Compare commits

...

3 Commits

Author SHA1 Message Date
Cursor Agent
a4f9f94af4 Implement robust cancellation handling across shell interpreter states
Co-authored-by: zack <zack@theradisic.com>
2025-06-30 20:43:02 +00:00
Cursor Agent
cc51e9f8e4 Refactor shell script parsing and cancellation handling
Co-authored-by: zack <zack@theradisic.com>
2025-06-28 01:07:18 +00:00
Cursor Agent
529fc777f9 Add shell script cancellation support with AbortSignal
Co-authored-by: zack <zack@theradisic.com>
2025-06-27 23:29:28 +00:00
36 changed files with 786 additions and 85 deletions

View File

@@ -170,6 +170,18 @@ export function createBunShellTemplateFunction(createShellInterpreter_, createPa
this.#hasRun = true;
let interp = createShellInterpreter(this.#resolve, this.#reject, this.#args!);
// Store the interpreter reference for abort signal handling
if (this.#args) {
(this.#args as any).__interpreter = interp;
// Check if we should cancel immediately
const abortSignal = (this.#args as any).__abortSignal;
if (abortSignal && abortSignal.aborted) {
interp.cancel();
}
}
this.#args = undefined;
interp.run();
}
@@ -238,6 +250,34 @@ export function createBunShellTemplateFunction(createShellInterpreter_, createPa
return this;
}
signal(signal: AbortSignal): this {
this.#throwIfRunning();
if (signal.aborted) {
// Signal is already aborted, cancel immediately when run
const args = this.#args;
if (args) {
// Store a flag to cancel when interpreter is created
(args as any).__abortSignal = signal;
}
} else {
// Listen for abort event
const args = this.#args;
if (args) {
signal.addEventListener('abort', () => {
// Get the interpreter from the parsed shell script
const jsInterpreter = (args as any).__interpreter;
if (jsInterpreter) {
jsInterpreter.cancel();
}
}, { once: true });
(args as any).__abortSignal = signal;
}
}
return this;
}
then(onfulfilled, onrejected) {
this.#run();

View File

@@ -626,6 +626,10 @@ pub fn start(this: *Builtin) Yield {
return this.callImpl(Yield, "start", .{});
}
pub fn cancel(this: *Builtin) void {
this.callImpl(void, "cancel", .{});
}
pub fn deinit(this: *Builtin) void {
this.callImpl(void, "deinit", .{});
@@ -646,7 +650,7 @@ pub fn stdBufferedBytelist(this: *Builtin, comptime io_kind: @Type(.enum_literal
@compileError("Bad IO" ++ @tagName(io_kind));
}
const io: *BuiltinIO = &@field(this, @tagName(io_kind));
const io: *BuiltinIO.Output = &@field(this, @tagName(io_kind));
return switch (io.*) {
.captured => if (comptime io_kind == .stdout) this.parentCmd().base.shell.buffered_stdout() else this.parentCmd().base.shell.buffered_stderr(),
else => null,

View File

@@ -819,7 +819,10 @@ pub const AsyncDeinitWriter = struct {
}
pub fn runFromMainThread(this: *@This()) void {
this.writer().deinitOnMainThread();
const iowriter = this.writer();
// Always deinitialize the writer, even if cancelled
// This is a cleanup task that must run
iowriter.deinitOnMainThread();
}
pub fn runFromMainThreadMini(this: *@This(), _: *void) void {

View File

@@ -62,41 +62,42 @@ pub fn setQuiet(this: *ParsedShellScript, _: *JSGlobalObject, _: *JSC.CallFrame)
return .js_undefined;
}
pub fn setEnv(this: *ParsedShellScript, globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
const value1 = callframe.argument(0).getObject() orelse {
return globalThis.throwInvalidArguments("env must be an object", .{});
pub fn setEnv(this: *ParsedShellScript, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
const arguments_ = callframe.arguments_old(2);
var arguments = JSC.CallFrame.ArgumentsSlice.init(globalThis.bunVM(), arguments_.slice());
const jsenv = arguments.nextEat() orelse {
return globalThis.throw("$`...`.env(): expected an object argument", .{});
};
var object_iter = try JSC.JSPropertyIterator(.{
.skip_empty_name = false,
.include_value = true,
}).init(globalThis, value1);
defer object_iter.deinit();
const obj_ref = jsenv.asObjectRef() orelse {
return globalThis.throw("$`...`.env(): expected an object argument", .{});
};
const obj: *JSC.JSObject = @ptrCast(obj_ref);
var iter = try JSC.JSPropertyIterator(.{ .skip_empty_name = false, .include_value = true }).init(globalThis, obj);
defer iter.deinit();
var env: EnvMap = EnvMap.init(bun.default_allocator);
env.ensureTotalCapacity(object_iter.len);
if (this.export_env == null) {
this.export_env = EnvMap.init(bun.default_allocator);
}
// If the env object does not include a $PATH, it must disable path lookup for argv[0]
// PATH = "";
while (try object_iter.next()) |key| {
const keyslice = key.toOwnedSlice(bun.default_allocator) catch bun.outOfMemory();
var value = object_iter.value;
while (try iter.next()) |key| {
var value = iter.value;
if (value.isUndefined()) continue;
const value_str = try value.getZigString(globalThis);
const slice = value_str.toOwnedSlice(bun.default_allocator) catch bun.outOfMemory();
const keyref = EnvStr.initRefCounted(keyslice);
defer keyref.deref();
const valueref = EnvStr.initRefCounted(slice);
defer valueref.deref();
const key_str = try key.toOwnedSlice(bun.default_allocator);
const val_slice = try (try value.getZigString(globalThis)).toOwnedSlice(bun.default_allocator);
this.export_env.?.insert(EnvStr.initRefCounted(key_str), EnvStr.initRefCounted(val_slice));
}
env.insert(keyref, valueref);
}
if (this.export_env) |*previous| {
previous.deinit();
}
this.export_env = env;
return .js_undefined;
}
pub fn cancel(this: *ParsedShellScript, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
// This is not used directly from ParsedShellScript, cancellation is handled in the interpreter
_ = this;
_ = globalThis;
_ = callframe;
return .js_undefined;
}

View File

@@ -71,6 +71,22 @@ pub const Yield = union(enum) {
return this.* == .done;
}
fn getInterpreter(this: Yield) ?*Interpreter {
return switch (this) {
.script => |x| x.base.interpreter,
.stmt => |x| x.base.interpreter,
.pipeline => |x| x.base.interpreter,
.cmd => |x| x.base.interpreter,
.assigns => |x| x.base.interpreter,
.expansion => |x| x.base.interpreter,
.@"if" => |x| x.base.interpreter,
.subshell => |x| x.base.interpreter,
.cond_expr => |x| x.base.interpreter,
.on_io_writer_chunk => null,
.suspended, .failed, .done => null,
};
}
pub fn run(this: Yield) void {
if (comptime Environment.isDebug) log("Yield({s}) _dbg_catch_exec_within_exec = {d} + 1 = {d}", .{ @tagName(this), _dbg_catch_exec_within_exec, _dbg_catch_exec_within_exec + 1 });
bun.debugAssert(_dbg_catch_exec_within_exec <= MAX_DEPTH);
@@ -80,48 +96,64 @@ pub const Yield = union(enum) {
if (comptime Environment.isDebug) _dbg_catch_exec_within_exec -= 1;
}
// A pipeline creates multiple "threads" of execution:
//
// ```bash
// cmd1 | cmd2 | cmd3
// ```
//
// We need to start cmd1, go back to the pipeline, start cmd2, and so
// on.
//
// This means we need to store a reference to the pipeline. And
// there can be nested pipelines, so we need a stack.
var sfb = std.heap.stackFallback(@sizeOf(*Pipeline) * 4, bun.default_allocator);
const alloc = sfb.get();
var pipeline_stack = std.ArrayList(*Pipeline).initCapacity(alloc, 4) catch bun.outOfMemory();
defer pipeline_stack.deinit();
var current_yield = this;
// Note that we're using labelled switch statements but _not_
// re-assigning `this`, so the `this` variable is stale after the first
// execution. Don't touch it.
state: switch (this) {
.pipeline => |x| {
pipeline_stack.append(x) catch bun.outOfMemory();
continue :state x.next();
},
.cmd => |x| continue :state x.next(),
.script => |x| continue :state x.next(),
.stmt => |x| continue :state x.next(),
.assigns => |x| continue :state x.next(),
.expansion => |x| continue :state x.next(),
.@"if" => |x| continue :state x.next(),
.subshell => |x| continue :state x.next(),
.cond_expr => |x| continue :state x.next(),
.on_io_writer_chunk => |x| {
const child = IOWriterChildPtr.fromAnyOpaque(x.child);
continue :state child.onIOWriterChunk(x.written, x.err);
},
.failed, .suspended, .done => {
if (drainPipelines(&pipeline_stack)) |yield| {
continue :state yield;
// re-assigning `current_yield`, so we need to be careful about state updates.
while (true) {
// Check for cancellation at the beginning of each iteration
if (current_yield.getInterpreter()) |interp| {
if (interp.is_cancelled.load(.monotonic)) {
// Begin graceful unwind
current_yield = switch (current_yield) {
.pipeline => |x| x.cancel(),
.cmd => |x| x.cancel(),
.script => |x| x.cancel(),
.stmt => |x| x.cancel(),
.assigns => |x| x.cancel(),
.expansion => |x| x.cancel(),
.@"if" => |x| x.cancel(),
.subshell => |x| x.cancel(),
.cond_expr => |x| x.cancel(),
else => current_yield,
};
if (current_yield == .suspended or current_yield == .done or current_yield == .failed) {
return;
}
continue;
}
return;
},
}
switch (current_yield) {
.pipeline => |x| {
pipeline_stack.append(x) catch bun.outOfMemory();
current_yield = x.next();
},
.cmd => |x| current_yield = x.next(),
.script => |x| current_yield = x.next(),
.stmt => |x| current_yield = x.next(),
.assigns => |x| current_yield = x.next(),
.expansion => |x| current_yield = x.next(),
.@"if" => |x| current_yield = x.next(),
.subshell => |x| current_yield = x.next(),
.cond_expr => |x| current_yield = x.next(),
.on_io_writer_chunk => |x| {
const child = IOWriterChildPtr.fromAnyOpaque(x.child);
current_yield = child.onIOWriterChunk(x.written, x.err);
},
.failed, .suspended, .done => {
if (drainPipelines(&pipeline_stack)) |yield| {
current_yield = yield;
continue;
}
return;
},
}
}
}

View File

@@ -20,6 +20,10 @@ pub fn start(this: *@This()) Yield {
return this.bltn().done(0);
}
pub fn cancel(this: *@This()) void {
_ = this;
}
pub fn deinit(this: *@This()) void {
this.buf.deinit(bun.default_allocator);
//basename

View File

@@ -243,6 +243,37 @@ pub fn onIOReaderDone(this: *Cat, err: ?JSC.SystemError) Yield {
pub fn deinit(_: *Cat) void {}
pub fn cancel(this: *Cat) void {
switch (this.state) {
.exec_stdin => {
// Cancel reader if needed
if (!this.state.exec_stdin.in_done) {
if (this.bltn().stdin.needsIO()) {
this.bltn().stdin.fd.removeReader(this);
}
this.state.exec_stdin.in_done = true;
}
// Cancel any pending chunks
if (this.bltn().stdout.needsIO()) |_| {
this.bltn().stdout.fd.writer.cancelChunks(this);
}
},
.exec_filepath_args => {
var exec = &this.state.exec_filepath_args;
if (exec.reader) |r| {
r.removeReader(this);
}
exec.deinit();
// Cancel any pending chunks
if (this.bltn().stdout.needsIO()) |_| {
this.bltn().stdout.fd.writer.cancelChunks(this);
}
},
.idle, .waiting_write_err, .done => {},
}
this.state = .done;
}
pub inline fn bltn(this: *Cat) *Builtin {
const impl: *Builtin.Impl = @alignCast(@fieldParentPtr("cat", this));
return @fieldParentPtr("impl", impl);

View File

@@ -107,6 +107,10 @@ pub inline fn bltn(this: *Cd) *Builtin {
return @fieldParentPtr("impl", impl);
}
pub fn cancel(this: *@This()) void {
_ = this;
}
pub fn deinit(this: *Cd) void {
log("({s}) deinit", .{@tagName(.cd)});
_ = this;

View File

@@ -50,6 +50,11 @@ const EbusyState = struct {
absolute_targets: bun.StringArrayHashMapUnmanaged(void) = .{},
absolute_srcs: bun.StringArrayHashMapUnmanaged(void) = .{},
pub fn cancel(this: *@This()) void {
// TODO: Add atomic cancellation flag for threaded execution
_ = this;
}
pub fn deinit(this: *EbusyState) void {
// The tasks themselves are freed in `ignoreEbusyErrorIfPossible()`
this.tasks.deinit(bun.default_allocator);
@@ -173,6 +178,11 @@ pub fn next(this: *Cp) Yield {
return this.bltn().done(0);
}
pub fn cancel(cp: *Cp) void {
// TODO: Add atomic cancellation flag for threaded execution
_ = cp;
}
pub fn deinit(cp: *Cp) void {
assert(cp.state == .done or cp.state == .waiting_write_err);
}
@@ -409,6 +419,14 @@ pub const ShellCpTask = struct {
pub fn runFromMainThread(this: *ShellCpTask) void {
debug("runFromMainThread", .{});
// Check if the interpreter has been cancelled
if (this.cp.bltn().parentCmd().base.interpreter.is_cancelled.load(.monotonic)) {
// Don't process the result if cancelled
this.deinit();
return;
}
this.cp.onShellCpTaskDone(this);
}

View File

@@ -20,6 +20,10 @@ pub fn start(this: *@This()) Yield {
return this.bltn().done(0);
}
pub fn cancel(this: *@This()) void {
_ = this;
}
pub fn deinit(this: *@This()) void {
this.buf.deinit(bun.default_allocator);
//dirname

View File

@@ -61,6 +61,10 @@ pub fn deinit(this: *Echo) void {
this.output.deinit();
}
pub fn cancel(this: *Echo) void {
_ = this;
}
pub inline fn bltn(this: *Echo) *Builtin {
const impl: *Builtin.Impl = @alignCast(@fieldParentPtr("echo", this));
return @fieldParentPtr("impl", impl);

View File

@@ -62,6 +62,10 @@ pub fn onIOWriterChunk(this: *Exit, _: usize, maybe_e: ?JSC.SystemError) Yield {
return this.next();
}
pub fn cancel(this: *@This()) void {
_ = this;
}
pub fn deinit(this: *Exit) void {
_ = this;
}

View File

@@ -110,6 +110,10 @@ pub fn start(this: *Export) Yield {
return this.bltn().done(0);
}
pub fn cancel(this: *@This()) void {
_ = this;
}
pub fn deinit(this: *Export) void {
log("({s}) deinit", .{@tagName(.@"export")});
_ = this;

View File

@@ -2,6 +2,10 @@ pub fn start(this: *@This()) Yield {
return this.bltn().done(1);
}
pub fn cancel(this: *@This()) void {
_ = this;
}
pub fn deinit(this: *@This()) void {
_ = this;
}

View File

@@ -112,6 +112,11 @@ fn next(this: *Ls) Yield {
return this.bltn().done(0);
}
pub fn cancel(this: *@This()) void {
// TODO: Add atomic cancellation flag for threaded execution
_ = this;
}
pub fn deinit(this: *Ls) void {
this.alloc_scope.endScope();
}
@@ -411,6 +416,14 @@ pub const ShellLsTask = struct {
pub fn runFromMainThread(this: *@This()) void {
debug("runFromMainThread", .{});
// Check if the interpreter has been cancelled
if (this.ls.bltn().parentCmd().base.interpreter.is_cancelled.load(.monotonic)) {
// Don't process the result if cancelled
this.deinit();
return;
}
this.ls.onShellLsTaskDone(this);
}

View File

@@ -159,6 +159,10 @@ const ShellMkdirOutputTaskVTable = struct {
}
};
pub fn cancel(this: *@This()) void {
_ = this;
}
pub fn deinit(this: *Mkdir) void {
_ = this;
}
@@ -220,6 +224,14 @@ pub const ShellMkdirTask = struct {
pub fn runFromMainThread(this: *@This()) void {
debug("{} runFromJS", .{this});
// Check if the interpreter has been cancelled
if (this.mkdir.bltn().parentCmd().base.interpreter.is_cancelled.load(.monotonic)) {
// Don't process the result if cancelled
this.deinit();
return;
}
this.mkdir.onShellMkdirTaskDone(this);
}

View File

@@ -55,6 +55,12 @@ pub const ShellMvCheckTargetTask = struct {
}
pub fn runFromMainThread(this: *@This()) void {
// Check if the interpreter has been cancelled
if (this.mv.bltn().parentCmd().base.interpreter.is_cancelled.load(.monotonic)) {
// Don't process the result if cancelled
return;
}
this.mv.checkTargetTaskDone(this);
}
@@ -158,6 +164,12 @@ pub const ShellMvBatchedTask = struct {
}
pub fn runFromMainThread(this: *@This()) void {
// Check if the interpreter has been cancelled
if (this.mv.bltn().parentCmd().base.interpreter.is_cancelled.load(.monotonic)) {
// Don't process the result if cancelled
return;
}
this.mv.batchedMoveTaskDone(this);
}
@@ -368,6 +380,11 @@ pub fn batchedMoveTaskDone(this: *Mv, task: *ShellMvBatchedTask) void {
}
}
pub fn cancel(this: *@This()) void {
// TODO: Add atomic cancellation flag for threaded execution
_ = this;
}
pub fn deinit(this: *Mv) void {
if (this.args.target_fd) |fd| fd.toOptional().close();
}

View File

@@ -68,6 +68,10 @@ pub fn onIOWriterChunk(this: *Pwd, _: usize, e: ?JSC.SystemError) Yield {
return this.next();
}
pub fn cancel(this: *@This()) void {
_ = this;
}
pub fn deinit(this: *Pwd) void {
_ = this;
}

View File

@@ -320,6 +320,11 @@ pub fn onIOWriterChunk(this: *Rm, _: usize, e: ?JSC.SystemError) Yield {
return this.bltn().done(1);
}
pub fn cancel(this: *@This()) void {
// TODO: Add atomic cancellation flag for threaded execution
_ = this;
}
pub fn deinit(this: *Rm) void {
_ = this;
}
@@ -519,6 +524,14 @@ pub const ShellRmTask = struct {
pub fn runFromMainThread(this: *DirTask) void {
debug("DirTask(0x{x}, path={s}) runFromMainThread", .{ @intFromPtr(this), this.path });
// Check if the interpreter has been cancelled
if (this.task_manager.rm.bltn().parentCmd().base.interpreter.is_cancelled.load(.monotonic)) {
// Don't process the result if cancelled
this.deinit();
return;
}
this.task_manager.rm.writeVerbose(this).run();
}
@@ -1162,11 +1175,18 @@ pub const ShellRmTask = struct {
}
pub fn runFromMainThread(this: *ShellRmTask) void {
// Check if the interpreter has been cancelled
if (this.rm.bltn().parentCmd().base.interpreter.is_cancelled.load(.monotonic)) {
// Don't process the result if cancelled
this.deinit();
return;
}
this.rm.onShellRmTaskDone(this);
}
pub fn runFromMainThreadMini(this: *ShellRmTask, _: *void) void {
this.rm.onShellRmTaskDone(this);
this.runFromMainThread();
}
pub fn deinit(this: *ShellRmTask) void {

View File

@@ -122,6 +122,10 @@ pub fn onIOWriterChunk(this: *@This(), _: usize, maybe_e: ?JSC.SystemError) Yiel
}
}
pub fn cancel(this: *@This()) void {
_ = this;
}
pub fn deinit(this: *@This()) void {
this.buf.deinit(bun.default_allocator);
//seq

View File

@@ -21,6 +21,10 @@ pub fn format(this: *const Touch, comptime fmt: []const u8, opts: std.fmt.Format
try writer.print("Touch(0x{x}, state={s})", .{ @intFromPtr(this), @tagName(this.state) });
}
pub fn cancel(this: *@This()) void {
_ = this;
}
pub fn deinit(this: *Touch) void {
log("{} deinit", .{this});
}
@@ -209,6 +213,14 @@ pub const ShellTouchTask = struct {
pub fn runFromMainThread(this: *@This()) void {
debug("{} runFromJS", .{this});
// Check if the interpreter has been cancelled
if (this.touch.bltn().parentCmd().base.interpreter.is_cancelled.load(.monotonic)) {
// Don't process the result if cancelled
this.deinit();
return;
}
this.touch.onShellTouchTaskDone(this);
}

View File

@@ -2,6 +2,10 @@ pub fn start(this: *@This()) Yield {
return this.bltn().done(0);
}
pub fn cancel(this: *@This()) void {
_ = this;
}
pub fn deinit(this: *@This()) void {
_ = this;
}

View File

@@ -123,6 +123,10 @@ pub fn onIOWriterChunk(this: *Which, _: usize, e: ?JSC.SystemError) Yield {
return this.argComplete();
}
pub fn cancel(this: *@This()) void {
_ = this;
}
pub fn deinit(this: *Which) void {
log("({s}) deinit", .{@tagName(.which)});
_ = this;

View File

@@ -50,6 +50,20 @@ pub inline fn bltn(this: *@This()) *Builtin {
pub fn deinit(_: *@This()) void {}
pub fn cancel(this: *@This()) void {
switch (this.state) {
.waiting_io => {
// Cancel any pending chunks
if (this.bltn().stdout.needsIO()) |_| {
this.bltn().stdout.fd.writer.cancelChunks(this);
}
// The concurrent task will stop when it sees the cancelled state
this.state = .done;
},
.idle, .err, .done => {},
}
}
pub const YesTask = struct {
evtloop: JSC.EventLoopHandle,
concurrent_task: JSC.EventLoopTask,
@@ -67,6 +81,17 @@ pub const YesTask = struct {
pub fn runFromMainThread(this: *@This()) void {
const yes: *Yes = @fieldParentPtr("task", this);
// Check if the interpreter has been cancelled
if (yes.bltn().parentCmd().base.interpreter.is_cancelled.load(.monotonic)) {
// Don't process if cancelled
return;
}
// Check if we should stop
if (yes.state == .done or yes.state == .err) {
return;
}
// Manually make safeguard since this task should not be created if output does not need IO
yes.bltn().stdout.enqueueFmt(yes, "{s}\n", .{yes.expletive}, .output_needs_io).run();

View File

@@ -134,6 +134,7 @@ pub const OutputNeedsIOSafeGuard = enum(u0) { output_needs_io };
pub const CallstackGuard = enum(u0) { __i_know_what_i_am_doing };
pub const ExitCode = u16;
pub const CANCELLED_EXIT_CODE: ExitCode = 130; // 128 + SIGINT
pub const StateKind = enum(u8) {
script,
@@ -300,6 +301,8 @@ pub const Interpreter = struct {
__alloc_scope: if (bun.Environment.enableAllocScopes) bun.AllocationScope else void,
is_cancelled: std.atomic.Value(bool) = .{ .raw = false },
// Here are all the state nodes:
pub const State = @import("./states/Base.zig");
pub const Script = @import("./states/Script.zig");
@@ -1166,21 +1169,43 @@ pub const Interpreter = struct {
this.exit_code = exit_code;
const this_jsvalue = this.this_jsvalue;
if (this_jsvalue != .zero) {
if (JSC.Codegen.JSShellInterpreter.resolveGetCached(this_jsvalue)) |resolve| {
const loop = this.event_loop.js;
const globalThis = this.globalThis;
this.this_jsvalue = .zero;
this.keep_alive.disable();
loop.enter();
_ = resolve.call(globalThis, .js_undefined, &.{
JSValue.jsNumberFromU16(exit_code),
this.getBufferedStdout(globalThis),
this.getBufferedStderr(globalThis),
}) catch |err| globalThis.reportActiveExceptionAsUnhandled(err);
JSC.Codegen.JSShellInterpreter.resolveSetCached(this_jsvalue, globalThis, .js_undefined);
JSC.Codegen.JSShellInterpreter.rejectSetCached(this_jsvalue, globalThis, .js_undefined);
loop.exit();
// ... existing code ...
const loop = this.event_loop.js;
const globalThis = this.globalThis;
this.this_jsvalue = .zero;
this.keep_alive.disable();
loop.enter();
// Check if cancelled and reject with DOMException
if (exit_code == CANCELLED_EXIT_CODE) {
if (JSC.Codegen.JSShellInterpreter.rejectGetCached(this_jsvalue)) |reject| {
// Create DOMException with AbortError
const abort_error = globalThis.createDOMExceptionInstance(.AbortError, "The operation was aborted", .{}) catch |err| {
globalThis.reportActiveExceptionAsUnhandled(err);
JSC.Codegen.JSShellInterpreter.resolveSetCached(this_jsvalue, globalThis, .js_undefined);
JSC.Codegen.JSShellInterpreter.rejectSetCached(this_jsvalue, globalThis, .js_undefined);
loop.exit();
return .done;
};
_ = reject.call(globalThis, .js_undefined, &.{abort_error}) catch |err| globalThis.reportActiveExceptionAsUnhandled(err);
JSC.Codegen.JSShellInterpreter.resolveSetCached(this_jsvalue, globalThis, .js_undefined);
JSC.Codegen.JSShellInterpreter.rejectSetCached(this_jsvalue, globalThis, .js_undefined);
}
} else {
// Normal case - resolve with exit code
if (JSC.Codegen.JSShellInterpreter.resolveGetCached(this_jsvalue)) |resolve| {
_ = resolve.call(globalThis, .js_undefined, &.{
JSValue.jsNumberFromU16(exit_code),
this.getBufferedStdout(globalThis),
this.getBufferedStderr(globalThis),
}) catch |err| globalThis.reportActiveExceptionAsUnhandled(err);
JSC.Codegen.JSShellInterpreter.resolveSetCached(this_jsvalue, globalThis, .js_undefined);
JSC.Codegen.JSShellInterpreter.rejectSetCached(this_jsvalue, globalThis, .js_undefined);
}
}
loop.exit();
}
} else {
this.flags.done = true;
@@ -1288,6 +1313,12 @@ pub const Interpreter = struct {
return JSC.JSValue.jsBoolean(this.hasPendingActivity());
}
pub fn cancel(this: *ThisInterpreter, _: *JSGlobalObject, _: *JSC.CallFrame) bun.JSError!JSC.JSValue {
log("Interpreter(0x{x}) cancel()", .{@intFromPtr(this)});
this.is_cancelled.store(true, .seq_cst);
return .js_undefined;
}
pub fn getStarted(this: *ThisInterpreter, globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
_ = globalThis; // autofix
_ = callframe; // autofix
@@ -1479,6 +1510,20 @@ pub fn StatePtrUnion(comptime TypesValue: anytype) type {
unknownTag(this.tagInt());
}
/// Signals to the state node to cancel execution
pub fn cancel(this: @This()) Yield {
const tags = comptime std.meta.fields(Ptr.Tag);
inline for (tags) |tag| {
if (this.tagInt() == tag.value) {
const Ty = comptime Ptr.typeFromTag(tag.value);
Ptr.assert_type(Ty);
var casted = this.as(Ty);
return casted.cancel();
}
}
unknownTag(this.tagInt());
}
pub fn unknownTag(tag: Ptr.TagInt) noreturn {
return bun.Output.panic("Unknown tag for shell state node: {d}\n", .{tag});
}

View File

@@ -14,6 +14,7 @@ state: union(enum) {
},
err: bun.shell.ShellErr,
done,
cancelled,
},
ctx: AssignCtx,
owned: bool = true,
@@ -116,12 +117,45 @@ pub fn next(this: *Assigns) Yield {
},
.done => unreachable,
.err => return this.parent.childDone(this, 1),
.cancelled => return this.parent.childDone(this, bun.shell.interpret.CANCELLED_EXIT_CODE),
}
}
return this.parent.childDone(this, 0);
}
pub fn cancel(this: *Assigns) Yield {
log("Assigns(0x{x}) cancel", .{@intFromPtr(this)});
// If already done or cancelled, nothing to do
if (this.state == .done or this.state == .cancelled) {
return .suspended;
}
// Handle cancellation based on current state
switch (this.state) {
.idle => {
// Nothing to clean up in idle state
},
.expanding => |*expanding| {
// Cancel the expansion
_ = expanding.expansion.cancel();
// Clean up the result buffer
expanding.current_expansion_result.clearAndFree();
},
.err => {
// Already in error state
},
.done, .cancelled => {},
}
// Set state to cancelled
this.state = .cancelled;
// Report cancellation to parent
return this.parent.childDone(this, bun.shell.interpret.CANCELLED_EXIT_CODE);
}
pub fn childDone(this: *Assigns, child: ChildPtr, exit_code: ExitCode) Yield {
if (child.ptr.is(Expansion)) {
bun.assert(this.state == .expanding);

View File

@@ -142,12 +142,36 @@ pub fn deinit(this: *Async) void {
_ = this;
}
pub fn cancel(this: *Async) Yield {
log("Async(0x{x}) cancel", .{@intFromPtr(this)});
// Cancel the child if executing
if (this.state == .exec) {
if (this.state.exec.child) |child| {
_ = child.cancel();
}
}
// Set state to done with cancelled exit code
this.state = .{ .done = bun.shell.interpret.CANCELLED_EXIT_CODE };
this.enqueueSelf();
return .suspended;
}
pub fn actuallyDeinit(this: *Async) void {
this.io.deref();
bun.destroy(this);
}
pub fn runFromMainThread(this: *Async) void {
// Check if the interpreter has been cancelled
if (this.base.interpreter.is_cancelled.load(.monotonic)) {
// Don't process if cancelled
this.actuallyDeinit();
return;
}
this.next().run();
}

View File

@@ -141,6 +141,18 @@ pub fn childDone(this: *Binary, child: ChildPtr, exit_code: ExitCode) Yield {
return this.parent.childDone(this, exit_code);
}
pub fn cancel(this: *Binary) Yield {
log("Binary(0x{x}) cancel", .{@intFromPtr(this)});
// Cancel the currently executing child if any
if (this.currently_executing) |child| {
_ = child.cancel();
}
// Report cancellation to parent
return this.parent.childDone(this, bun.shell.interpret.CANCELLED_EXIT_CODE);
}
pub fn deinit(this: *Binary) void {
if (this.currently_executing) |child| {
child.deinit();

View File

@@ -51,6 +51,7 @@ state: union(enum) {
exec,
done,
waiting_write_err,
cancelled,
},
/// If a subprocess and its stdout/stderr exit immediately, we queue
@@ -86,6 +87,13 @@ pub const ShellAsyncSubprocessDone = struct {
pub fn runFromMainThread(this: *ShellAsyncSubprocessDone) void {
log("{} runFromMainThread", .{this});
defer this.deinit();
// Check if the interpreter has been cancelled
if (this.cmd.base.interpreter.is_cancelled.load(.monotonic)) {
// Don't process the result if cancelled
return;
}
this.cmd.parent.childDone(this.cmd, this.cmd.exit_code orelse 0).run();
}
@@ -334,6 +342,7 @@ pub fn next(this: *Cmd) Yield {
bun.shell.unreachableState("Cmd.next", "exec");
},
.done => unreachable,
.cancelled => return this.parent.childDone(this, bun.shell.interpret.CANCELLED_EXIT_CODE),
}
}
@@ -694,6 +703,64 @@ pub fn onExit(this: *Cmd, exit_code: ExitCode) void {
}
}
pub fn cancel(this: *Cmd) Yield {
log("Cmd(0x{x}) cancel", .{@intFromPtr(this)});
// If already done or cancelled, nothing to do
if (this.state == .done or this.state == .cancelled) {
return .suspended;
}
// Handle cancellation based on current state
switch (this.state) {
.idle => {
// Nothing to clean up in idle state
},
.expanding_assigns => |*assigns| {
// Cancel the assigns expansion
_ = assigns.cancel();
},
.expanding_redirect => |*redirect| {
// Cancel the expansion
_ = redirect.expansion.cancel();
},
.expanding_args => |*args| {
// Cancel the expansion
_ = args.expansion.cancel();
},
.exec => {
// Cancel the underlying execution
switch (this.exec) {
.none => {},
.subproc => |*subproc| {
// Try to kill the subprocess with SIGTERM
_ = subproc.child.tryKill(@intFromEnum(bun.SignalCode.SIGTERM));
},
.bltn => |*builtin| {
// Call cancel on the builtin
builtin.cancel();
},
}
},
.waiting_write_err => {
// Cancel any pending IO chunks
if (this.io.stdout == .fd) {
this.io.stdout.fd.writer.cancelChunks(this);
}
if (this.io.stderr == .fd) {
this.io.stderr.fd.writer.cancelChunks(this);
}
},
.done, .cancelled => {},
}
// Set state to cancelled
this.state = .cancelled;
this.exit_code = bun.shell.interpret.CANCELLED_EXIT_CODE;
return .suspended;
}
// TODO check that this also makes sure that the poll ref is killed because if it isn't then this Cmd pointer will be stale and so when the event for pid exit happens it will cause crash
pub fn deinit(this: *Cmd) void {
log("Cmd(0x{x}, {s}) cmd deinit", .{ @intFromPtr(this), @tagName(this.exec) });

View File

@@ -19,6 +19,7 @@ state: union(enum) {
},
waiting_write_err,
done,
cancelled,
} = .idle,
args: std.ArrayList([:0]const u8),
@@ -35,6 +36,13 @@ pub const ShellCondExprStatTask = struct {
pub fn runFromMainThread(this: *ShellCondExprStatTask) void {
defer this.deinit();
// Check if the interpreter has been cancelled
if (this.condexpr.base.interpreter.is_cancelled.load(.monotonic)) {
// Don't process the result if cancelled
return;
}
const ret = this.result.?;
this.result = null;
this.condexpr.onStatTaskComplete(ret);
@@ -156,6 +164,7 @@ pub fn next(this: *CondExpr) Yield {
},
.waiting_write_err => return .suspended,
.done => assert(false),
.cancelled => return .suspended,
}
}
@@ -210,6 +219,49 @@ fn doStat(this: *CondExpr) Yield {
return .suspended;
}
pub fn cancel(this: *CondExpr) Yield {
log("CondExpr(0x{x}) cancel", .{@intFromPtr(this)});
// If already done or cancelled, nothing to do
if (this.state == .done or this.state == .cancelled) {
return .suspended;
}
// Handle cancellation based on current state
switch (this.state) {
.idle => {
// Nothing to clean up in idle state
},
.expanding_args => |*args| {
// Cancel the expansion
_ = args.expansion.cancel();
},
.waiting_stat => {
// Stat task is running in thread pool, can't cancel it directly
// It will check cancellation when it completes
},
.stat_complete => {
// Already have stat result, just need to clean up
},
.waiting_write_err => {
// Cancel any pending IO chunks
if (this.io.stdout == .fd) {
this.io.stdout.fd.writer.cancelChunks(this);
}
if (this.io.stderr == .fd) {
this.io.stderr.fd.writer.cancelChunks(this);
}
},
.done, .cancelled => {},
}
// Set state to cancelled
this.state = .cancelled;
// Report cancellation to parent
return this.parent.childDone(this, bun.shell.interpret.CANCELLED_EXIT_CODE);
}
pub fn deinit(this: *CondExpr) void {
this.io.deinit();
for (this.args.items) |item| {

View File

@@ -20,6 +20,7 @@ state: union(enum) {
glob,
done,
err: bun.shell.ShellErr,
cancelled,
},
child_state: union(enum) {
idle,
@@ -146,11 +147,54 @@ pub fn init(
expansion.current_out = std.ArrayList(u8).init(expansion.base.allocator());
}
pub fn deinit(expansion: *Expansion) void {
log("Expansion(0x{x}) deinit", .{@intFromPtr(expansion)});
expansion.current_out.deinit();
expansion.io.deinit();
expansion.base.endScope();
pub fn cancel(this: *Expansion) Yield {
log("Expansion(0x{x}) cancel", .{@intFromPtr(this)});
// If already done or cancelled, nothing to do
if (this.state == .done or this.state == .cancelled) {
return .suspended;
}
// Handle cancellation based on current state
switch (this.state) {
.normal => {
// If a command substitution is running, cancel it
if (this.child_state == .cmd_subst) {
_ = this.child_state.cmd_subst.cmd.cancel();
}
// Clean up current output buffer
this.current_out.clearAndFree();
},
.braces => {
// Clean up any brace expansion state
this.current_out.clearAndFree();
},
.glob => {
// Cancel glob walk if in progress
if (this.child_state == .glob) {
this.child_state.glob.walker.deinit(true);
}
this.current_out.clearAndFree();
},
.err => {
// Already in error state, just clean up
this.current_out.clearAndFree();
},
.done, .cancelled => {},
}
// Set state to cancelled
this.state = .cancelled;
// Report cancellation to parent
return this.parent.childDone(this, bun.shell.interpret.CANCELLED_EXIT_CODE);
}
pub fn deinit(this: *Expansion) void {
log("Expansion(0x{x}) deinit", .{@intFromPtr(this)});
this.current_out.deinit();
this.io.deinit();
this.base.endScope();
}
pub fn start(this: *Expansion) Yield {
@@ -271,6 +315,7 @@ pub fn next(this: *Expansion) Yield {
return this.transitionToGlobState();
},
.done, .err => unreachable,
.cancelled => return this.parent.childDone(this, bun.shell.interpret.CANCELLED_EXIT_CODE),
}
}
@@ -816,6 +861,15 @@ pub const ShellGlobTask = struct {
pub fn runFromMainThread(this: *This) void {
debug("runFromJS", .{});
// Check if the interpreter has been cancelled
if (this.expansion.base.interpreter.is_cancelled.load(.monotonic)) {
// Don't process the result if cancelled
this.ref.unref(this.event_loop);
this.deinit();
return;
}
this.expansion.onGlobWalkDone(this).run();
this.ref.unref(this.event_loop);
}

View File

@@ -148,6 +148,13 @@ pub fn next(this: *If) Yield {
return this.parent.childDone(this, 0);
}
pub fn cancel(this: *If) Yield {
log("If(0x{x}) cancel", .{@intFromPtr(this)});
// Report cancellation to parent
return this.parent.childDone(this, bun.shell.interpret.CANCELLED_EXIT_CODE);
}
pub fn deinit(this: *If) void {
log("{} deinit", .{this});
this.io.deref();

View File

@@ -21,6 +21,7 @@ state: union(enum) {
done: struct {
exit_code: ExitCode = 0,
},
cancelled,
} = .{ .starting_cmds = .{ .idx = 0 } },
pub const ParentPtr = StatePtrUnion(.{
@@ -188,6 +189,7 @@ pub fn next(this: *Pipeline) Yield {
.pending => shell.unreachableState("Pipeline.next", "pending"),
.waiting_write_err => shell.unreachableState("Pipeline.next", "waiting_write_err"),
.done => return this.parent.childDone(this, this.state.done.exit_code),
.cancelled => return this.parent.childDone(this, bun.shell.interpret.CANCELLED_EXIT_CODE),
}
}
@@ -260,6 +262,77 @@ pub fn childDone(this: *Pipeline, child: ChildPtr, exit_code: ExitCode) Yield {
return .suspended;
}
pub fn cancel(this: *Pipeline) Yield {
log("Pipeline(0x{x}) cancel", .{@intFromPtr(this)});
// If already done or cancelled, nothing to do
if (this.state == .done or this.state == .cancelled) {
return .suspended;
}
// Handle cancellation based on current state
switch (this.state) {
.starting_cmds => {
// We're still starting commands, need to clean up what we've started
// Close all pipes to unblock any processes stuck on I/O
if (this.pipes) |pipes| {
for (pipes) |*pipe| {
closefd(pipe[0]);
closefd(pipe[1]);
}
}
// Cancel all commands that have been started
if (this.cmds) |cmds| {
for (cmds[0..this.state.starting_cmds.idx]) |*cmd_or_result| {
switch (cmd_or_result.*) {
.cmd => |cmd| {
// Cancel the command
_ = cmd.call("cancel", .{}, Yield);
},
.result => {
// Already finished, nothing to do
},
}
}
}
},
.pending => {
// All commands have been started, close pipes and cancel all commands
if (this.pipes) |pipes| {
for (pipes) |*pipe| {
closefd(pipe[0]);
closefd(pipe[1]);
}
}
if (this.cmds) |cmds| {
for (cmds) |*cmd_or_result| {
switch (cmd_or_result.*) {
.cmd => |cmd| {
// Cancel the command
_ = cmd.call("cancel", .{}, Yield);
},
.result => {
// Already finished, nothing to do
},
}
}
}
},
.waiting_write_err => {
// Cancel any pending write operations
// The IO writer chunks will be cancelled when we set the state
},
.done, .cancelled => {},
}
// Set state to cancelled
this.state = .cancelled;
return .suspended;
}
pub fn deinit(this: *Pipeline) void {
// If commands was zero then we didn't allocate anything
if (this.cmds == null) return;

View File

@@ -91,6 +91,13 @@ pub fn childDone(this: *Script, child: ChildPtr, exit_code: ExitCode) Yield {
return this.next();
}
pub fn cancel(this: *Script) Yield {
log("Script(0x{x}) cancel", .{@intFromPtr(this)});
// Simply finish with cancelled exit code
return this.finish(bun.shell.interpret.CANCELLED_EXIT_CODE);
}
pub fn deinit(this: *Script) void {
log("Script(0x{x}) deinit", .{@intFromPtr(this)});
this.io.deref();

View File

@@ -124,6 +124,18 @@ pub fn childDone(this: *Stmt, child: ChildPtr, exit_code: ExitCode) Yield {
return this.next();
}
pub fn cancel(this: *Stmt) Yield {
log("Stmt(0x{x}) cancel", .{@intFromPtr(this)});
// Cancel the currently executing child if any
if (this.currently_executing) |child| {
_ = child.cancel();
}
// Report cancellation to parent
return this.parent.childDone(this, bun.shell.interpret.CANCELLED_EXIT_CODE);
}
pub fn deinit(this: *Stmt) void {
log("Stmt(0x{x}) deinit", .{@intFromPtr(this)});
this.io.deinit();

View File

@@ -13,6 +13,7 @@ state: union(enum) {
exec,
wait_write_err,
done,
cancelled,
} = .idle,
redirection_file: std.ArrayList(u8),
exit_code: ExitCode = 0,
@@ -123,6 +124,7 @@ pub fn next(this: *Subshell) Yield {
},
.wait_write_err, .exec => return .suspended,
.done => @panic("This should not be possible."),
.cancelled => return .suspended,
}
}
@@ -170,6 +172,46 @@ pub fn onIOWriterChunk(this: *Subshell, _: usize, err: ?JSC.SystemError) Yield {
return this.parent.childDone(this, this.exit_code);
}
pub fn cancel(this: *Subshell) Yield {
log("Subshell(0x{x}) cancel", .{@intFromPtr(this)});
// If already done or cancelled, nothing to do
if (this.state == .done or this.state == .cancelled) {
return .suspended;
}
// Handle cancellation based on current state
switch (this.state) {
.idle => {
// Nothing to clean up in idle state
},
.expanding_redirect => |*redirect| {
// Cancel the expansion
_ = redirect.expansion.cancel();
},
.exec => {
// The Script child is running, it will be cancelled by the parent
// when it receives the cancellation
},
.wait_write_err => {
// Cancel any pending IO chunks
if (this.io.stdout == .fd) {
this.io.stdout.fd.writer.cancelChunks(this);
}
if (this.io.stderr == .fd) {
this.io.stderr.fd.writer.cancelChunks(this);
}
},
.done, .cancelled => {},
}
// Set state to cancelled
this.state = .cancelled;
// Report cancellation to parent
return this.parent.childDone(this, bun.shell.interpret.CANCELLED_EXIT_CODE);
}
pub fn deinit(this: *Subshell) void {
this.base.shell.deinit();
this.io.deref();