This commit is contained in:
Jarred Sumner
2025-06-03 13:23:12 -07:00
committed by GitHub
parent c7327d62c2
commit 1bd44e9ce7
5 changed files with 172 additions and 120 deletions

View File

@@ -83,7 +83,11 @@ pub const Lazy = union(enum) {
};
}
else switch (bun.sys.open(file.pathlike.path.sliceZ(&file_buf), bun.O.RDONLY | bun.O.NONBLOCK | bun.O.CLOEXEC, 0)) {
.result => |fd| fd,
.result => |fd| brk: {
if (Environment.isPosix) is_nonblocking = true;
break :brk fd;
},
.err => |err| {
return .{ .err = err.withPath(file.pathlike.path.slice()) };
},
@@ -116,6 +120,10 @@ pub const Lazy = union(enum) {
return .{ .err = .fromCode(.ISDIR, .fstat) };
}
if (bun.S.ISREG(stat.mode)) {
is_nonblocking = false;
}
this.pollable = bun.sys.isPollable(stat.mode) or is_nonblocking or (file.is_atty orelse false);
this.file_type = if (bun.S.ISFIFO(stat.mode))
.pipe
@@ -129,7 +137,9 @@ pub const Lazy = union(enum) {
this.file_type = .nonblocking_pipe;
}
this.nonblocking = is_nonblocking or (this.pollable and !(file.is_atty orelse false));
this.nonblocking = is_nonblocking or (this.pollable and
!(file.is_atty orelse false) and
this.file_type != .pipe);
if (this.nonblocking and this.file_type == .pipe) {
this.file_type = .nonblocking_pipe;

View File

@@ -428,16 +428,86 @@ const PosixBufferedReader = struct {
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .nonblocking_pipe, wrapReadFn(bun.sys.readNonblocking));
}
fn readBlockingPipe(parent: *PosixBufferedReader, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void {
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .pipe, wrapReadFn(bun.sys.readNonblocking));
fn readBlockingPipe(parent: *PosixBufferedReader, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, _: isize, received_hup: bool) void {
while (true) {
const streaming = parent.vtable.isStreamingEnabled();
if (resizable_buffer.capacity == 0) {
// Use stack buffer for streaming
const stack_buffer = parent.vtable.eventLoop().pipeReadBuffer();
switch (bun.sys.readNonblocking(fd, stack_buffer)) {
.result => |bytes_read| {
if (parent.maxbuf) |l| l.onReadBytes(bytes_read);
if (bytes_read == 0) {
// EOF - finished and closed pipe
parent.closeWithoutReporting();
parent.done();
return;
}
if (streaming) {
// Stream this chunk and register for next cycle
_ = parent.vtable.onReadChunk(stack_buffer[0..bytes_read], if (received_hup and bytes_read < stack_buffer.len) .eof else .progress);
} else {
resizable_buffer.appendSlice(stack_buffer[0..bytes_read]) catch bun.outOfMemory();
}
},
.err => |err| {
if (!err.isRetry()) {
parent.onError(err);
return;
}
// EAGAIN - fall through to register for next poll
},
}
} else {
resizable_buffer.ensureUnusedCapacity(16 * 1024) catch bun.outOfMemory();
var buf: []u8 = resizable_buffer.unusedCapacitySlice();
switch (bun.sys.readNonblocking(fd, buf)) {
.result => |bytes_read| {
if (parent.maxbuf) |l| l.onReadBytes(bytes_read);
parent._offset += bytes_read;
resizable_buffer.appendSlice(buf[0..bytes_read]) catch bun.outOfMemory();
if (bytes_read == 0) {
parent.closeWithoutReporting();
parent.done();
return;
}
if (streaming) {
if (!parent.vtable.onReadChunk(buf[0..bytes_read], if (received_hup and bytes_read < buf.len) .eof else .progress)) {
return;
}
}
},
.err => |err| {
if (!err.isRetry()) {
parent.onError(err);
return;
}
},
}
}
// Register for next poll cycle unless we got HUP
if (!received_hup) {
parent.registerPoll();
return;
}
// We have received HUP but have not consumed it yet. We can't register for next poll cycle.
// We need to keep going.
}
}
fn readWithFn(parent: *PosixBufferedReader, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup_: bool, comptime file_type: FileType, comptime sys_fn: *const fn (bun.FileDescriptor, []u8, usize) JSC.Maybe(usize)) void {
fn readWithFn(parent: *PosixBufferedReader, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool, comptime file_type: FileType, comptime sys_fn: *const fn (bun.FileDescriptor, []u8, usize) JSC.Maybe(usize)) void {
_ = size_hint; // autofix
const streaming = parent.vtable.isStreamingEnabled();
var received_hup = received_hup_;
if (streaming) {
const stack_buffer = parent.vtable.eventLoop().pipeReadBuffer();
while (resizable_buffer.capacity == 0) {
@@ -465,47 +535,12 @@ const PosixBufferedReader = struct {
return;
}
if (comptime file_type == .pipe) {
if (bun.Environment.isMac or !bun.linux.RWFFlagSupport.isMaybeSupported()) {
switch (bun.isReadable(fd)) {
.ready => {},
.hup => {
received_hup = true;
},
.not_ready => {
if (received_hup) {
parent.closeWithoutReporting();
}
defer {
if (received_hup) {
parent.done();
}
}
if (stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len].len > 0) {
if (!parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], if (received_hup) .eof else .drained)) {
return;
}
}
if (!received_hup) {
parent.registerPoll();
}
return;
},
}
}
}
if (comptime file_type != .pipe) {
// blocking pipes block a process, so we have to keep reading as much as we can
// otherwise, we do want to stream the data
if (stack_buffer_head.len < stack_buffer_cutoff) {
if (!parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], if (received_hup) .eof else .progress)) {
return;
}
stack_buffer_head = stack_buffer;
// Keep reading as much as we can
if (stack_buffer_head.len < stack_buffer_cutoff) {
if (!parent.vtable.onReadChunk(stack_buffer[0 .. stack_buffer.len - stack_buffer_head.len], if (received_hup) .eof else .progress)) {
return;
}
stack_buffer_head = stack_buffer;
}
},
.err => |err| {
@@ -558,33 +593,6 @@ const PosixBufferedReader = struct {
parent.done();
return;
}
if (comptime file_type == .pipe) {
if (bun.Environment.isMac or !bun.linux.RWFFlagSupport.isMaybeSupported()) {
switch (bun.isReadable(fd)) {
.ready => {},
.hup => {
received_hup = true;
},
.not_ready => {
if (received_hup) {
parent.closeWithoutReporting();
}
defer {
if (received_hup) {
parent.done();
}
}
if (!received_hup) {
parent.registerPoll();
}
return;
},
}
}
}
},
.err => |err| {
if (err.isRetry()) {
@@ -621,54 +629,16 @@ const PosixBufferedReader = struct {
return;
}
if (comptime file_type == .pipe) {
if (bun.Environment.isMac or !bun.linux.RWFFlagSupport.isMaybeSupported()) {
switch (bun.isReadable(fd)) {
.ready => {},
.hup => {
received_hup = true;
},
.not_ready => {
if (received_hup) {
parent.closeWithoutReporting();
}
defer {
if (received_hup) {
parent.done();
}
}
if (parent.vtable.isStreamingEnabled()) {
defer {
resizable_buffer.clearRetainingCapacity();
}
if (!parent.vtable.onReadChunk(resizable_buffer.items, if (received_hup) .eof else .drained) and !received_hup) {
return;
}
}
if (!received_hup) {
parent.registerPoll();
}
return;
},
if (parent.vtable.isStreamingEnabled()) {
if (resizable_buffer.items.len > 128_000) {
defer {
resizable_buffer.clearRetainingCapacity();
}
}
}
if (comptime file_type != .pipe) {
if (parent.vtable.isStreamingEnabled()) {
if (resizable_buffer.items.len > 128_000) {
defer {
resizable_buffer.clearRetainingCapacity();
}
if (!parent.vtable.onReadChunk(resizable_buffer.items, .progress)) {
return;
}
continue;
if (!parent.vtable.onReadChunk(resizable_buffer.items, .progress)) {
return;
}
continue;
}
}
},

View File

@@ -0,0 +1,22 @@
// Test script for TTY stdin buffering issue
// Should work the same in Node.js and Bun
console.log("Starting TTY stdin test...");
console.log("Listening for chunks from stdin...");
let chunkCount = 0;
for await (const chunk of process.stdin) {
chunkCount++;
const timestamp = new Date().toISOString();
console.log(`[${timestamp}] Chunk #${chunkCount}:`, chunk);
// If we get more than 3 chunks, exit
if (chunkCount >= 3) {
console.log("Received 3 chunks, exiting...");
process.exit(0);
}
}
console.error("Exited without receiving 3 chunks");
process.exit(1);

View File

@@ -0,0 +1,37 @@
import { spawnSync } from "bun";
import { expect, test } from "bun:test";
import { bunEnv, bunExe, isWindows } from "harness";
import { join } from "path";
// https://github.com/oven-sh/bun/issues/18239
test.skipIf(isWindows)("TTY stdin buffering should work correctly", async () => {
const dataGeneratorPath = join(import.meta.dir, "data-generator.sh");
const fixturePath = join(import.meta.dir, "18239.fixture.ts");
// Run the data generator piped into our TTY test fixture
const result = spawnSync({
cmd: ["bash", "-c", `"${dataGeneratorPath}" | "${bunExe()}" "${fixturePath}"`],
env: {
...bunEnv,
BUN_DEBUG_QUIET_LOGS: "1",
},
stderr: "pipe",
stdout: "pipe",
});
const stdout = result.stdout.toString();
const stderr = result.stderr.toString();
// Should have received exactly 3 chunks
expect(stdout).toContain("Received 3 chunks, exiting...");
// Should not have the error message
expect(stderr).not.toContain("Exited without receiving 3 chunks");
// Should contain chunk messages with timestamps
expect(stdout).toMatch(/\[.*\] Chunk #1:/);
expect(stdout).toMatch(/\[.*\] Chunk #2:/);
expect(stdout).toMatch(/\[.*\] Chunk #3:/);
expect(result.exitCode).toBe(0);
});

View File

@@ -0,0 +1,13 @@
#!/bin/bash
# Test data generator script
# Sends data with delays to simulate real-time input
echo "Generating test data with 200ms delay..."
for i in {1..3}; do
echo "Line $i - $(date)"
sleep 0.2
done
echo "All data sent!"