From 1bd44e9ce78df08ecdc0de9a66aa779fa29a0eb2 Mon Sep 17 00:00:00 2001 From: Jarred Sumner Date: Tue, 3 Jun 2025 13:23:12 -0700 Subject: [PATCH] Fixes #18239 (#20152) --- src/bun.js/webcore/FileReader.zig | 14 +- src/io/PipeReader.zig | 206 ++++++++---------- test/regression/issue/18239/18239.fixture.ts | 22 ++ test/regression/issue/18239/18239.test.ts | 37 ++++ test/regression/issue/18239/data-generator.sh | 13 ++ 5 files changed, 172 insertions(+), 120 deletions(-) create mode 100644 test/regression/issue/18239/18239.fixture.ts create mode 100644 test/regression/issue/18239/18239.test.ts create mode 100755 test/regression/issue/18239/data-generator.sh diff --git a/src/bun.js/webcore/FileReader.zig b/src/bun.js/webcore/FileReader.zig index 32549f9e92..ac494b6ecd 100644 --- a/src/bun.js/webcore/FileReader.zig +++ b/src/bun.js/webcore/FileReader.zig @@ -83,7 +83,11 @@ pub const Lazy = union(enum) { }; } else switch (bun.sys.open(file.pathlike.path.sliceZ(&file_buf), bun.O.RDONLY | bun.O.NONBLOCK | bun.O.CLOEXEC, 0)) { - .result => |fd| fd, + .result => |fd| brk: { + if (Environment.isPosix) is_nonblocking = true; + break :brk fd; + }, + .err => |err| { return .{ .err = err.withPath(file.pathlike.path.slice()) }; }, @@ -116,6 +120,10 @@ pub const Lazy = union(enum) { return .{ .err = .fromCode(.ISDIR, .fstat) }; } + if (bun.S.ISREG(stat.mode)) { + is_nonblocking = false; + } + this.pollable = bun.sys.isPollable(stat.mode) or is_nonblocking or (file.is_atty orelse false); this.file_type = if (bun.S.ISFIFO(stat.mode)) .pipe @@ -129,7 +137,9 @@ pub const Lazy = union(enum) { this.file_type = .nonblocking_pipe; } - this.nonblocking = is_nonblocking or (this.pollable and !(file.is_atty orelse false)); + this.nonblocking = is_nonblocking or (this.pollable and + !(file.is_atty orelse false) and + this.file_type != .pipe); if (this.nonblocking and this.file_type == .pipe) { this.file_type = .nonblocking_pipe; diff --git a/src/io/PipeReader.zig b/src/io/PipeReader.zig index ff026ca4c0..f6c6855cf0 100644 --- a/src/io/PipeReader.zig +++ b/src/io/PipeReader.zig @@ -428,16 +428,86 @@ const PosixBufferedReader = struct { return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .nonblocking_pipe, wrapReadFn(bun.sys.readNonblocking)); } - fn readBlockingPipe(parent: *PosixBufferedReader, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void { - return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .pipe, wrapReadFn(bun.sys.readNonblocking)); + fn readBlockingPipe(parent: *PosixBufferedReader, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, _: isize, received_hup: bool) void { + while (true) { + const streaming = parent.vtable.isStreamingEnabled(); + + if (resizable_buffer.capacity == 0) { + // Use stack buffer for streaming + const stack_buffer = parent.vtable.eventLoop().pipeReadBuffer(); + + switch (bun.sys.readNonblocking(fd, stack_buffer)) { + .result => |bytes_read| { + if (parent.maxbuf) |l| l.onReadBytes(bytes_read); + + if (bytes_read == 0) { + // EOF - finished and closed pipe + parent.closeWithoutReporting(); + parent.done(); + return; + } + + if (streaming) { + // Stream this chunk and register for next cycle + _ = parent.vtable.onReadChunk(stack_buffer[0..bytes_read], if (received_hup and bytes_read < stack_buffer.len) .eof else .progress); + } else { + resizable_buffer.appendSlice(stack_buffer[0..bytes_read]) catch bun.outOfMemory(); + } + }, + .err => |err| { + if (!err.isRetry()) { + parent.onError(err); + return; + } + // EAGAIN - fall through to register for next poll + }, + } + } else { + resizable_buffer.ensureUnusedCapacity(16 * 1024) catch bun.outOfMemory(); + var buf: []u8 = resizable_buffer.unusedCapacitySlice(); + + switch (bun.sys.readNonblocking(fd, buf)) { + .result => |bytes_read| { + if (parent.maxbuf) |l| l.onReadBytes(bytes_read); + parent._offset += bytes_read; + resizable_buffer.appendSlice(buf[0..bytes_read]) catch bun.outOfMemory(); + + if (bytes_read == 0) { + parent.closeWithoutReporting(); + parent.done(); + return; + } + + if (streaming) { + if (!parent.vtable.onReadChunk(buf[0..bytes_read], if (received_hup and bytes_read < buf.len) .eof else .progress)) { + return; + } + } + }, + .err => |err| { + if (!err.isRetry()) { + parent.onError(err); + return; + } + }, + } + } + + // Register for next poll cycle unless we got HUP + if (!received_hup) { + parent.registerPoll(); + return; + } + + // We have received HUP but have not consumed it yet. We can't register for next poll cycle. + // We need to keep going. + } } - fn readWithFn(parent: *PosixBufferedReader, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup_: bool, comptime file_type: FileType, comptime sys_fn: *const fn (bun.FileDescriptor, []u8, usize) JSC.Maybe(usize)) void { + fn readWithFn(parent: *PosixBufferedReader, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool, comptime file_type: FileType, comptime sys_fn: *const fn (bun.FileDescriptor, []u8, usize) JSC.Maybe(usize)) void { _ = size_hint; // autofix const streaming = parent.vtable.isStreamingEnabled(); - var received_hup = received_hup_; - if (streaming) { const stack_buffer = parent.vtable.eventLoop().pipeReadBuffer(); while (resizable_buffer.capacity == 0) { @@ -465,47 +535,12 @@ const PosixBufferedReader = struct { return; } - if (comptime file_type == .pipe) { - if (bun.Environment.isMac or !bun.linux.RWFFlagSupport.isMaybeSupported()) { - switch (bun.isReadable(fd)) { - .ready => {}, - .hup => { - received_hup = true; - }, - .not_ready => { - if (received_hup) { - parent.closeWithoutReporting(); - } - defer { - if (received_hup) { - parent.done(); - } - } - if (stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len].len > 0) { - if (!parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], if (received_hup) .eof else .drained)) { - return; - } - } - - if (!received_hup) { - parent.registerPoll(); - } - - return; - }, - } - } - } - - if (comptime file_type != .pipe) { - // blocking pipes block a process, so we have to keep reading as much as we can - // otherwise, we do want to stream the data - if (stack_buffer_head.len < stack_buffer_cutoff) { - if (!parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], if (received_hup) .eof else .progress)) { - return; - } - stack_buffer_head = stack_buffer; + // Keep reading as much as we can + if (stack_buffer_head.len < stack_buffer_cutoff) { + if (!parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], if (received_hup) .eof else .progress)) { + return; } + stack_buffer_head = stack_buffer; } }, .err => |err| { @@ -558,33 +593,6 @@ const PosixBufferedReader = struct { parent.done(); return; } - - if (comptime file_type == .pipe) { - if (bun.Environment.isMac or !bun.linux.RWFFlagSupport.isMaybeSupported()) { - switch (bun.isReadable(fd)) { - .ready => {}, - .hup => { - received_hup = true; - }, - .not_ready => { - if (received_hup) { - parent.closeWithoutReporting(); - } - defer { - if (received_hup) { - parent.done(); - } - } - - if (!received_hup) { - parent.registerPoll(); - } - - return; - }, - } - } - } }, .err => |err| { if (err.isRetry()) { @@ -621,54 +629,16 @@ const PosixBufferedReader = struct { return; } - if (comptime file_type == .pipe) { - if (bun.Environment.isMac or !bun.linux.RWFFlagSupport.isMaybeSupported()) { - switch (bun.isReadable(fd)) { - .ready => {}, - .hup => { - received_hup = true; - }, - .not_ready => { - if (received_hup) { - parent.closeWithoutReporting(); - } - defer { - if (received_hup) { - parent.done(); - } - } - - if (parent.vtable.isStreamingEnabled()) { - defer { - resizable_buffer.clearRetainingCapacity(); - } - if (!parent.vtable.onReadChunk(resizable_buffer.items, if (received_hup) .eof else .drained) and !received_hup) { - return; - } - } - - if (!received_hup) { - parent.registerPoll(); - } - - return; - }, + if (parent.vtable.isStreamingEnabled()) { + if (resizable_buffer.items.len > 128_000) { + defer { + resizable_buffer.clearRetainingCapacity(); } - } - } - - if (comptime file_type != .pipe) { - if (parent.vtable.isStreamingEnabled()) { - if (resizable_buffer.items.len > 128_000) { - defer { - resizable_buffer.clearRetainingCapacity(); - } - if (!parent.vtable.onReadChunk(resizable_buffer.items, .progress)) { - return; - } - - continue; + if (!parent.vtable.onReadChunk(resizable_buffer.items, .progress)) { + return; } + + continue; } } }, diff --git a/test/regression/issue/18239/18239.fixture.ts b/test/regression/issue/18239/18239.fixture.ts new file mode 100644 index 0000000000..d4fd997e36 --- /dev/null +++ b/test/regression/issue/18239/18239.fixture.ts @@ -0,0 +1,22 @@ +// Test script for TTY stdin buffering issue +// Should work the same in Node.js and Bun + +console.log("Starting TTY stdin test..."); +console.log("Listening for chunks from stdin..."); + +let chunkCount = 0; + +for await (const chunk of process.stdin) { + chunkCount++; + const timestamp = new Date().toISOString(); + console.log(`[${timestamp}] Chunk #${chunkCount}:`, chunk); + + // If we get more than 3 chunks, exit + if (chunkCount >= 3) { + console.log("Received 3 chunks, exiting..."); + process.exit(0); + } +} + +console.error("Exited without receiving 3 chunks"); +process.exit(1); diff --git a/test/regression/issue/18239/18239.test.ts b/test/regression/issue/18239/18239.test.ts new file mode 100644 index 0000000000..7c427a83f4 --- /dev/null +++ b/test/regression/issue/18239/18239.test.ts @@ -0,0 +1,37 @@ +import { spawnSync } from "bun"; +import { expect, test } from "bun:test"; +import { bunEnv, bunExe, isWindows } from "harness"; +import { join } from "path"; + +// https://github.com/oven-sh/bun/issues/18239 +test.skipIf(isWindows)("TTY stdin buffering should work correctly", async () => { + const dataGeneratorPath = join(import.meta.dir, "data-generator.sh"); + const fixturePath = join(import.meta.dir, "18239.fixture.ts"); + + // Run the data generator piped into our TTY test fixture + const result = spawnSync({ + cmd: ["bash", "-c", `"${dataGeneratorPath}" | "${bunExe()}" "${fixturePath}"`], + env: { + ...bunEnv, + BUN_DEBUG_QUIET_LOGS: "1", + }, + stderr: "pipe", + stdout: "pipe", + }); + + const stdout = result.stdout.toString(); + const stderr = result.stderr.toString(); + + // Should have received exactly 3 chunks + expect(stdout).toContain("Received 3 chunks, exiting..."); + + // Should not have the error message + expect(stderr).not.toContain("Exited without receiving 3 chunks"); + + // Should contain chunk messages with timestamps + expect(stdout).toMatch(/\[.*\] Chunk #1:/); + expect(stdout).toMatch(/\[.*\] Chunk #2:/); + expect(stdout).toMatch(/\[.*\] Chunk #3:/); + + expect(result.exitCode).toBe(0); +}); diff --git a/test/regression/issue/18239/data-generator.sh b/test/regression/issue/18239/data-generator.sh new file mode 100755 index 0000000000..6b86956552 --- /dev/null +++ b/test/regression/issue/18239/data-generator.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +# Test data generator script +# Sends data with delays to simulate real-time input + +echo "Generating test data with 200ms delay..." + +for i in {1..3}; do + echo "Line $i - $(date)" + sleep 0.2 +done + +echo "All data sent!" \ No newline at end of file