diff --git a/src/shell/Builtin.zig b/src/shell/Builtin.zig index 0379b3f0fd..9ef3b77501 100644 --- a/src/shell/Builtin.zig +++ b/src/shell/Builtin.zig @@ -392,6 +392,13 @@ pub fn init( }, }; }, + .yes => { + cmd.exec.bltn.impl = .{ + .yes = Yes{ + .alloc_scope = shell.AllocScope.beginScope(bun.default_allocator), + }, + }; + }, inline else => |tag| { cmd.exec.bltn.impl = @unionInit(Impl, @tagName(tag), .{}); }, diff --git a/src/shell/IOWriter.zig b/src/shell/IOWriter.zig index bdb28bb6f6..55d09cf5bd 100644 --- a/src/shell/IOWriter.zig +++ b/src/shell/IOWriter.zig @@ -216,12 +216,13 @@ pub fn cancelChunks(this: *IOWriter, ptr_: anytype) void { ChildPtr => ptr_, else => ChildPtr.init(ptr_), }; + const actual_ptr = ptr.ptr.repr._ptr; if (this.writers.len() == 0) return; const idx = this.writer_idx; const slice: []Writer = this.writers.sliceMutable(); if (idx >= slice.len) return; for (slice[idx..]) |*w| { - if (w.ptr.ptr.repr._ptr == ptr.ptr.repr._ptr) { + if (w.ptr.ptr.repr._ptr == actual_ptr) { w.setDead(); } } @@ -233,6 +234,10 @@ const Writer = struct { written: usize = 0, bytelist: ?*bun.ByteList = null, + pub fn format(this: Writer, comptime _: []const u8, _: std.fmt.FormatOptions, writer: anytype) !void { + try std.fmt.format(writer, "Writer(0x{x}, {s})", .{ this.ptr.ptr.repr._ptr, @tagName(this.ptr.ptr.tag()) }); + } + pub fn wroteEverything(this: *const Writer) bool { return this.written >= this.len; } @@ -246,6 +251,7 @@ const Writer = struct { } pub fn setDead(this: *Writer) void { + log("Writer setDead {s}(0x{x})", .{ @tagName(this.ptr.ptr.tag()), this.ptr.ptr.repr._ptr }); this.ptr.ptr = ChildPtrRaw.Null; } }; @@ -339,10 +345,10 @@ pub fn onWritePollable(this: *IOWriter, amount: usize, status: bun.io.WriteStatu // We wrote everything if (!not_fully_written) return; - // We did not write everything. - // This seems to happen in a pipeline where the command which - // _reads_ the output of the previous command closes before the - // previous command. + // We did not write everything. This means the other end of the + // socket/pipe closed and we got EPIPE. + // + // An example: // // Example: `ls . | echo hi` // @@ -357,7 +363,6 @@ pub fn onWritePollable(this: *IOWriter, amount: usize, status: bun.io.WriteStatu // We don't support signals right now. In fact we don't even have a way to kill the shell. // // So for a quick hack we're just going to have all writes return an error. - bun.assert(this.flags.is_socket); bun.Output.debugWarn("IOWriter(0x{x}, fd={}) received done without fully writing data", .{ @intFromPtr(this), this.fd }); this.flags.broken_pipe = true; this.brokenPipeForWriters(); @@ -387,15 +392,17 @@ pub fn onWritePollable(this: *IOWriter, amount: usize, status: bun.io.WriteStatu pub fn brokenPipeForWriters(this: *IOWriter) void { bun.assert(this.flags.broken_pipe); var offset: usize = 0; - for (this.writers.sliceMutable()) |*w| { + const writers = this.writers.sliceMutable()[this.writer_idx..]; + for (writers) |*w| { if (w.isDead()) { offset += w.len; continue; } - log("IOWriter(0x{x}, fd={}) brokenPipeForWriters {s}(0x{x})", .{ @intFromPtr(this), this.fd, @tagName(w.ptr.ptr.tag()), @intFromPtr(w.ptr.ptr.ptr()) }); + log("IOWriter(0x{x}, fd={}) brokenPipeForWriters Writer(0x{x}) {s}(0x{x})", .{ @intFromPtr(this), this.fd, @intFromPtr(w), @tagName(w.ptr.ptr.tag()), w.ptr.ptr.repr._ptr }); const err: JSC.SystemError = bun.sys.Error.fromCode(.PIPE, .write).toSystemError(); w.ptr.onIOWriterChunk(0, err).run(); offset += w.len; + this.cancelChunks(w.ptr); } this.total_bytes_written = 0; diff --git a/src/shell/builtin/yes.zig b/src/shell/builtin/yes.zig index bb6ed6445d..c5fc3552b0 100644 --- a/src/shell/builtin/yes.zig +++ b/src/shell/builtin/yes.zig @@ -1,14 +1,67 @@ -state: enum { idle, waiting_io, err, done } = .idle, +state: enum { idle, waiting_write_err, waiting_io, err, done } = .idle, expletive: []const u8 = "y", task: YesTask = undefined, +buffer: []u8 = "", +buffer_used: usize = 0, +alloc_scope: shell.AllocScope, pub fn start(this: *@This()) Yield { const args = this.bltn().argsSlice(); - if (args.len > 0) { - this.expletive = std.mem.sliceTo(args[0], 0); + // count + var bufalloc: usize = 0; + if (args.len == 0) { + bufalloc = 2; // "y\n" + } else { + // Sum all args + spaces between + newline + for (args, 0..) |arg, i| { + const arg_slice = std.mem.sliceTo(arg, 0); + bufalloc += arg_slice.len; + if (i < args.len - 1) bufalloc += 1; // space + } + bufalloc += 1; // newline } + // Use at least BUFSIZ (8192) for better performance + const BUFSIZ = 8192; + if (bufalloc <= BUFSIZ / 2) { + bufalloc = BUFSIZ; + } + + this.buffer = this.alloc_scope.allocator().alloc(u8, bufalloc) catch bun.outOfMemory(); + + // Fill buffer with one copy of the output + this.buffer_used = 0; + if (args.len == 0) { + @memcpy(this.buffer[0..1], "y"); + this.buffer[1] = '\n'; + this.buffer_used = 2; + } else { + for (args, 0..) |arg, i| { + const arg_slice = std.mem.sliceTo(arg, 0); + @memcpy(this.buffer[this.buffer_used .. this.buffer_used + arg_slice.len], arg_slice); + this.buffer_used += arg_slice.len; + if (i < args.len - 1) { + this.buffer[this.buffer_used] = ' '; + this.buffer_used += 1; + } + } + this.buffer[this.buffer_used] = '\n'; + this.buffer_used += 1; + } + + // Fill larger buffer by repeating the pattern + const copysize = this.buffer_used; + var copies = bufalloc / copysize; + var filled = this.buffer_used; + while (copies > 1) : (copies -= 1) { + const remaining = bufalloc - filled; + const to_copy = @min(copysize, remaining); + @memcpy(this.buffer[filled .. filled + to_copy], this.buffer[0..to_copy]); + filled += to_copy; + } + this.buffer_used = filled; + if (this.bltn().stdout.needsIO()) |safeguard| { const evtloop = this.bltn().eventLoop(); this.task = .{ @@ -16,22 +69,47 @@ pub fn start(this: *@This()) Yield { .concurrent_task = JSC.EventLoopTask.fromEventLoop(evtloop), }; this.state = .waiting_io; - this.task.enqueue(); - return this.bltn().stdout.enqueueFmt(this, "{s}\n", .{this.expletive}, safeguard); + return this.bltn().stdout.enqueue(this, this.buffer[0..this.buffer_used], safeguard); } - var res: Maybe(usize) = undefined; - while (true) { - res = this.bltn().writeNoIO(.stdout, this.expletive); - if (res == .err) { - return this.bltn().done(1); - } - res = this.bltn().writeNoIO(.stdout, "\n"); - if (res == .err) { - return this.bltn().done(1); - } + this.task = .{ + .evtloop = this.bltn().eventLoop(), + .concurrent_task = JSC.EventLoopTask.fromEventLoop(this.task.evtloop), + }; + return this.writeNoIO(); +} + +/// We write 4 8kb chunks and then suspend execution to the task. +/// This is to avoid blocking the main thread forever. +fn writeNoIO(this: *@This()) Yield { + if (this.writeOnceNoIO(this.buffer[0..this.buffer_used])) |yield| return yield; + if (this.writeOnceNoIO(this.buffer[0..this.buffer_used])) |yield| return yield; + if (this.writeOnceNoIO(this.buffer[0..this.buffer_used])) |yield| return yield; + if (this.writeOnceNoIO(this.buffer[0..this.buffer_used])) |yield| return yield; + this.task.enqueue(); + return .suspended; +} + +fn writeOnceNoIO(this: *@This(), buf: []const u8) ?Yield { + switch (this.bltn().writeNoIO(.stdout, buf)) { + .result => {}, + .err => |e| { + this.state = .waiting_write_err; + const errbuf = this.bltn().fmtErrorArena(.yes, "{s}\n", .{e.name()}); + return this.writeFailingError(errbuf, 1); + }, } - @compileError(unreachable); + return null; +} + +pub fn writeFailingError(this: *Yes, buf: []const u8, exit_code: shell.ExitCode) Yield { + if (this.bltn().stderr.needsIO()) |safeguard| { + this.state = .waiting_write_err; + return this.bltn().stderr.enqueue(this, buf, safeguard); + } + + _ = this.bltn().writeNoIO(.stderr, buf); + return this.bltn().done(exit_code); } pub fn onIOWriterChunk(this: *@This(), _: usize, maybe_e: ?JSC.SystemError) Yield { @@ -40,7 +118,11 @@ pub fn onIOWriterChunk(this: *@This(), _: usize, maybe_e: ?JSC.SystemError) Yiel this.state = .err; return this.bltn().done(1); } - return .suspended; + if (this.state == .waiting_write_err) { + return this.bltn().done(1); + } + bun.assert(this.bltn().stdout.needsIO() != null); + return this.bltn().stdout.enqueue(this, this.buffer[0..this.buffer_used], .output_needs_io); } pub inline fn bltn(this: *@This()) *Builtin { @@ -48,8 +130,14 @@ pub inline fn bltn(this: *@This()) *Builtin { return @fieldParentPtr("impl", impl); } -pub fn deinit(_: *@This()) void {} +pub fn deinit(this: *@This()) void { + this.alloc_scope.allocator().free(this.buffer); + this.alloc_scope.endScope(); +} +/// This task is used when we write `yes` output to stdout and stdout does not +/// require IO. After writing a bit, we suspend execution to this task so we +/// don't just block the main thread forever. pub const YesTask = struct { evtloop: JSC.EventLoopHandle, concurrent_task: JSC.EventLoopTask, @@ -66,11 +154,7 @@ pub const YesTask = struct { pub fn runFromMainThread(this: *@This()) void { const yes: *Yes = @fieldParentPtr("task", this); - - // Manually make safeguard since this task should not be created if output does not need IO - yes.bltn().stdout.enqueueFmt(yes, "{s}\n", .{yes.expletive}, .output_needs_io).run(); - - this.enqueue(); + yes.writeNoIO().run(); } pub fn runFromMainThreadMini(this: *@This(), _: *void) void { @@ -88,5 +172,4 @@ const Builtin = Interpreter.Builtin; const IO = shell.IO; const Yes = @This(); const JSC = bun.JSC; -const Maybe = bun.sys.Maybe; const std = @import("std"); diff --git a/test/js/bun/shell/commands/yes.test.ts b/test/js/bun/shell/commands/yes.test.ts index fb301a5b37..7a69c1c634 100644 --- a/test/js/bun/shell/commands/yes.test.ts +++ b/test/js/bun/shell/commands/yes.test.ts @@ -19,6 +19,6 @@ describe("yes", async () => { test("ignores other arguments", async () => { const buffer = Buffer.alloc(17); await $`yes ab cd ef > ${buffer}`; - expect(buffer.toString()).toEqual("ab\nab\nab\nab\nab\nab"); + expect(buffer.toString()).toEqual("ab cd ef\nab cd ef"); }); }); diff --git a/test/js/bun/shell/epipe.test.ts b/test/js/bun/shell/epipe.test.ts new file mode 100644 index 0000000000..27f6eabe83 --- /dev/null +++ b/test/js/bun/shell/epipe.test.ts @@ -0,0 +1,22 @@ +import { describe, expect, test } from "bun:test"; +import { isPosix } from "harness"; +import { createTestBuilder } from "./test_builder"; +const TestBuilder = createTestBuilder(import.meta.path); + +describe.if(isPosix)("IOWriter epipe", () => { + TestBuilder.command`yes | head` + .exitCode(0) + .stdout("y\ny\ny\ny\ny\ny\ny\ny\ny\ny\n") + .runAsTest("builtin pipe to command"); + + test("concurrent", async () => { + const promises = Array(100) + .fill(0) + .map(() => Bun.$`yes | head`.text()); + + const results = await Promise.all(promises); + for (const result of results) { + expect(result).toBe("y\ny\ny\ny\ny\ny\ny\ny\ny\ny\n"); + } + }); +}); diff --git a/test/no-validate-exceptions.txt b/test/no-validate-exceptions.txt index 979e8fc751..94dcdb2e0b 100644 --- a/test/no-validate-exceptions.txt +++ b/test/no-validate-exceptions.txt @@ -1,4 +1,4 @@ -# List of tests for which we do NOT set validateExceptionChecks=1 when running in ASan CI +# List of tests for which we do NOT set validateExceptionChecks=1 when running in ASAN CI vendor/elysia/test/a.test.ts vendor/elysia/test/adapter/web-standard/cookie-to-header.test.ts vendor/elysia/test/adapter/web-standard/map-compact-response.test.ts