diff --git a/src/bun.js/webcore/FileSink.zig b/src/bun.js/webcore/FileSink.zig index 89309b3bd5..200ef8c845 100644 --- a/src/bun.js/webcore/FileSink.zig +++ b/src/bun.js/webcore/FileSink.zig @@ -62,12 +62,33 @@ pub fn memoryCost(this: *const FileSink) usize { fn Bun__ForceFileSinkToBeSynchronousForProcessObjectStdio(_: *jsc.JSGlobalObject, jsvalue: jsc.JSValue) callconv(.C) void { var this: *FileSink = @alignCast(@ptrCast(JSSink.fromJS(jsvalue) orelse return)); - this.force_sync = true; + if (comptime !Environment.isWindows) { + this.force_sync = true; this.writer.force_sync = true; if (this.fd != bun.invalid_fd) { _ = bun.sys.updateNonblocking(this.fd, false); } + } else { + if (this.writer.source) |*source| { + switch (source.*) { + .pipe => |pipe| { + if (uv.uv_stream_set_blocking(@ptrCast(pipe), 1) == .zero) { + return; + } + }, + .tty => |tty| { + if (uv.uv_stream_set_blocking(@ptrCast(tty), 1) == .zero) { + return; + } + }, + + else => {}, + } + } + + // Fallback to WriteFile() if it fails. + this.force_sync = true; } } diff --git a/src/io/PipeReader.zig b/src/io/PipeReader.zig index 50717375c9..4ff47c2bda 100644 --- a/src/io/PipeReader.zig +++ b/src/io/PipeReader.zig @@ -824,13 +824,19 @@ pub const WindowsBufferedReader = struct { fn _onReadChunk(this: *WindowsBufferedReader, buf: []u8, hasMore: ReadState) bool { if (this.maxbuf) |m| m.onReadBytes(buf.len); - this.flags.has_inflight_read = false; if (hasMore == .eof) { this.flags.received_eof = true; } - const onReadChunkFn = this.vtable.onReadChunk orelse return true; - return onReadChunkFn(this.parent, buf, hasMore); + const onReadChunkFn = this.vtable.onReadChunk orelse { + this.flags.has_inflight_read = false; + return true; + }; + const result = onReadChunkFn(this.parent, buf, hasMore); + // Clear has_inflight_read after the callback completes to prevent + // libuv from starting a new read while we're still processing data + this.flags.has_inflight_read = false; + return result; } fn finish(this: *WindowsBufferedReader) void { @@ -928,7 +934,10 @@ pub const WindowsBufferedReader = struct { switch (nread_int) { 0 => { // EAGAIN or EWOULDBLOCK or canceled (buf is not safe to access here) - return this.onRead(.{ .result = 0 }, "", .drained); + // With libuv 1.51.0+, calling onRead(.drained) here causes a race condition + // where subsequent reads return truncated data (see logs showing 6024 instead + // of 74468 bytes). Just ignore 0-byte reads and let libuv continue. + return; }, uv.UV_EOF => { _ = this.stopReading(); diff --git a/test/js/bun/spawn/spawn-stdin-readable-stream.test.ts b/test/js/bun/spawn/spawn-stdin-readable-stream.test.ts index 2f26d71b8f..9790f76fce 100644 --- a/test/js/bun/spawn/spawn-stdin-readable-stream.test.ts +++ b/test/js/bun/spawn/spawn-stdin-readable-stream.test.ts @@ -169,11 +169,12 @@ describe("spawn stdin ReadableStream", () => { const chunkSize = 64 * 1024; // 64KB chunks const numChunks = 16; // 1MB total let pushedChunks = 0; + const chunk = Buffer.alloc(chunkSize, "x"); const stream = new ReadableStream({ pull(controller) { if (pushedChunks < numChunks) { - controller.enqueue("x".repeat(chunkSize)); + controller.enqueue(chunk); pushedChunks++; } else { controller.close(); @@ -182,7 +183,16 @@ describe("spawn stdin ReadableStream", () => { }); await using proc = spawn({ - cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], + cmd: [ + bunExe(), + "-e", + ` + let length = 0; + process.stdin.on('data', (data) => length += data.length); + process.once('beforeExit', () => console.error(length)); + process.stdin.pipe(process.stdout) +`, + ], stdin: stream, stdout: "pipe", env: bunEnv, @@ -190,7 +200,7 @@ describe("spawn stdin ReadableStream", () => { const text = await proc.stdout.text(); expect(text.length).toBe(chunkSize * numChunks); - expect(text).toBe("x".repeat(chunkSize * numChunks)); + expect(text).toBe(chunk.toString().repeat(numChunks)); expect(await proc.exited).toBe(0); }); diff --git a/test/regression/issue/11297/11297.fixture.ts b/test/regression/issue/11297/11297.fixture.ts index bc6ad09128..39aefc5719 100644 --- a/test/regression/issue/11297/11297.fixture.ts +++ b/test/regression/issue/11297/11297.fixture.ts @@ -4,7 +4,16 @@ const string = Buffer.alloc(1024 * 1024, "zombo.com\n").toString(); process.exitCode = 1; const proc = Bun.spawn({ - cmd: [bunExe(), "-e", "process.stdin.pipe(process.stdout)"], + cmd: [ + bunExe(), + "-e", + ` +let length = 0; +process.stdin.on('data', (data) => length += data.length); +process.once('beforeExit', () => console.error(length)); +process.stdin.pipe(process.stdout) + `, + ], stdio: ["pipe", "pipe", "inherit"], });