diff --git a/src/bun.js/bindings/BunProcess.cpp b/src/bun.js/bindings/BunProcess.cpp index 57812b1af3..e2449452f0 100644 --- a/src/bun.js/bindings/BunProcess.cpp +++ b/src/bun.js/bindings/BunProcess.cpp @@ -103,6 +103,7 @@ typedef int mode_t; extern "C" bool Bun__Node__ProcessNoDeprecation; extern "C" bool Bun__Node__ProcessThrowDeprecation; +extern "C" int32_t bun_stdio_tty[3]; namespace Bun { @@ -2069,6 +2070,8 @@ static JSValue constructStderr(VM& vm, JSObject* processObject) #define STDIN_FILENO 0 #endif +extern "C" int32_t Bun__Process__getStdinFdType(void*); + static JSValue constructStdin(VM& vm, JSObject* processObject) { auto* globalObject = processObject->globalObject(); @@ -2076,7 +2079,8 @@ static JSValue constructStdin(VM& vm, JSObject* processObject) JSC::JSFunction* getStdioWriteStream = JSC::JSFunction::create(vm, globalObject, processObjectInternalsGetStdinStreamCodeGenerator(vm), globalObject); JSC::MarkedArgumentBuffer args; args.append(JSC::jsNumber(STDIN_FILENO)); - + args.append(jsBoolean(bun_stdio_tty[STDIN_FILENO])); + args.append(jsNumber(Bun__Process__getStdinFdType(Bun::vm(vm)))); JSC::CallData callData = JSC::getCallData(getStdioWriteStream); NakedPtr returnedException = nullptr; diff --git a/src/bun.js/rare_data.zig b/src/bun.js/rare_data.zig index b63322b9a8..d4e5a2dba9 100644 --- a/src/bun.js/rare_data.zig +++ b/src/bun.js/rare_data.zig @@ -407,6 +407,23 @@ pub fn stdin(rare: *RareData) *Blob.Store { }; } +const StdinFdType = enum(i32) { + file = 0, + pipe = 1, + socket = 2, +}; + +pub export fn Bun__Process__getStdinFdType(vm: *JSC.VirtualMachine) StdinFdType { + const mode = vm.rareData().stdin().data.file.mode; + if (bun.S.ISFIFO(mode)) { + return .pipe; + } else if (bun.S.ISSOCK(mode) or bun.S.ISCHR(mode)) { + return .socket; + } else { + return .file; + } +} + const Subprocess = @import("./api/bun/subprocess.zig").Subprocess; pub fn spawnIPCContext(rare: *RareData, vm: *JSC.VirtualMachine) *uws.SocketContext { diff --git a/src/js/builtins/ProcessObjectInternals.ts b/src/js/builtins/ProcessObjectInternals.ts index 6295e7e01e..4afada13a9 100644 --- a/src/js/builtins/ProcessObjectInternals.ts +++ b/src/js/builtins/ProcessObjectInternals.ts @@ -24,6 +24,12 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +const enum BunProcessStdinFdType { + file = 0, + pipe = 1, + socket = 2, +} + export function getStdioWriteStream(fd) { $assert(typeof fd === "number", `Expected fd to be a number, got ${typeof fd}`); const tty = require("node:tty"); @@ -68,7 +74,7 @@ export function getStdioWriteStream(fd) { return [stream, underlyingSink]; } -export function getStdinStream(fd) { +export function getStdinStream(fd, isTTY: boolean, fdType: BunProcessStdinFdType) { const native = Bun.stdin.stream(); // @ts-expect-error const source = native.$bunNativePtr; @@ -103,9 +109,8 @@ export function getStdinStream(fd) { } } - const tty = require("node:tty"); - const ReadStream = tty.isatty(fd) ? tty.ReadStream : require("node:fs").ReadStream; - const stream = new ReadStream(null, { fd }); + const ReadStream = isTTY ? require("node:tty").ReadStream : require("node:fs").ReadStream; + const stream = new ReadStream(null, { fd, autoClose: false }); const originalOn = stream.on; @@ -129,6 +134,20 @@ export function getStdinStream(fd) { stream.fd = fd; + // tty.ReadStream is supposed to extend from net.Socket. + // but we haven't made that work yet. Until then, we need to manually add some of net.Socket's methods + if (isTTY || fdType !== BunProcessStdinFdType.file) { + stream.ref = function () { + ref(); + return this; + }; + + stream.unref = function () { + unref(); + return this; + }; + } + const originalPause = stream.pause; stream.pause = function () { $debug("pause();"); diff --git a/test/js/node/process/process-stdin.test.ts b/test/js/node/process/process-stdin.test.ts new file mode 100644 index 0000000000..011b7137ac --- /dev/null +++ b/test/js/node/process/process-stdin.test.ts @@ -0,0 +1,30 @@ +import { test, expect } from "bun:test"; +import { bunEnv, bunExe, isWindows } from "harness"; + +test("pipe does the right thing", async () => { + // Note: Bun.spawnSync uses memfd_create on Linux for pipe, which means we see + // it as a file instead of a tty + const result = Bun.spawn({ + cmd: [bunExe(), "-e", "console.log(typeof process.stdin.ref)"], + stdin: "pipe", + stdout: "pipe", + stderr: "inherit", + env: bunEnv, + }); + + expect((await new Response(result.stdout).text()).trim()).toBe("function"); + expect(await result.exited).toBe(0); +}); + +test("file does the right thing", async () => { + const result = Bun.spawn({ + cmd: [bunExe(), "-e", "console.log(typeof process.stdin.ref)"], + stdin: Bun.file(import.meta.path), + stdout: "pipe", + stderr: "inherit", + env: bunEnv, + }); + + expect((await new Response(result.stdout).text()).trim()).toBe("undefined"); + expect(await result.exited).toBe(0); +}); diff --git a/test/js/web/fetch/fetch.stream.test.ts b/test/js/web/fetch/fetch.stream.test.ts index ca3e48dec8..40414903c6 100644 --- a/test/js/web/fetch/fetch.stream.test.ts +++ b/test/js/web/fetch/fetch.stream.test.ts @@ -663,14 +663,28 @@ describe("fetch() with streaming", () => { switch (compression) { case "gzip-libdeflate": case "gzip": - return Bun.gzipSync(data, { library: compression === "gzip-libdeflate" ? "libdeflate" : "zlib" }); + return Bun.gzipSync(data, { + library: compression === "gzip-libdeflate" ? "libdeflate" : "zlib", + level: 1, // fastest compression + }); case "deflate-libdeflate": case "deflate": - return Bun.deflateSync(data, { library: compression === "deflate-libdeflate" ? "libdeflate" : "zlib" }); + return Bun.deflateSync(data, { + library: compression === "deflate-libdeflate" ? "libdeflate" : "zlib", + level: 1, // fastest compression + }); case "deflate_with_headers": - return zlib.deflateSync(data); + return zlib.deflateSync(data, { + level: 1, // fastest compression + }); case "br": - return zlib.brotliCompressSync(data); + return zlib.brotliCompressSync(data, { + params: { + [zlib.constants.BROTLI_PARAM_QUALITY]: 0, + [zlib.constants.BROTLI_PARAM_MODE]: zlib.constants.BROTLI_MODE_GENERIC, + [zlib.constants.BROTLI_PARAM_SIZE_HINT]: 0, + }, + }); default: return data; } @@ -903,25 +917,33 @@ describe("fetch() with streaming", () => { }); test(`Content-Length response works (multiple parts) with ${compression} compression`, async () => { - const rawBytes = Buffer.allocUnsafe(128 * 1024); + const rawBytes = Buffer.allocUnsafe(1024 * 1024); // Random data doesn't compress well. We need enough random data that // the compressed data is larger than 64 bytes. require("crypto").randomFillSync(rawBytes); const content = rawBytes.toString("hex"); + const contentBuffer = Buffer.from(content); + + const data = compress(compression, contentBuffer); var onReceivedHeaders = Promise.withResolvers(); using server = Bun.serve({ port: 0, async fetch(req) { - const data = compress(compression, Buffer.from(content)); return new Response( new ReadableStream({ async pull(controller) { - const firstChunk = data.subarray(0, 64); - const secondChunk = data.subarray(firstChunk.length); - controller.enqueue(firstChunk); - await onReceivedHeaders.promise; - await Bun.sleep(128); - controller.enqueue(secondChunk); + // Ensure we actually send it over the network in multiple chunks. + let tenth = (data.length / 10) | 0; + let remaining = data; + while (remaining.length > 0) { + const chunk = remaining.subarray(0, Math.min(tenth, remaining.length)); + controller.enqueue(chunk); + if (remaining === data) { + await onReceivedHeaders.promise; + } + remaining = remaining.subarray(chunk.length); + await Bun.sleep(1); + } controller.close(); }, }), @@ -952,15 +974,21 @@ describe("fetch() with streaming", () => { gcTick(false); const reader = res.body?.getReader(); - let buffer = Buffer.alloc(0); - let parts = 0; + let chunks: Uint8Array[] = []; + let currentRange = 0; while (true) { gcTick(false); const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult; if (value) { - buffer = Buffer.concat([buffer, value]); - parts++; + chunks.push(value); + + // Check the content is what is expected at this time. + // We're avoiding calling .buffer since that changes the internal representation in JSC and we want to test the raw data. + expect(contentBuffer.compare(value, undefined, undefined, currentRange, currentRange + value.length)).toEqual( + 0, + ); + currentRange += value.length; } if (done) { break; @@ -968,8 +996,18 @@ describe("fetch() with streaming", () => { } gcTick(false); - expect(buffer.toString("utf8")).toBe(content); - expect(parts).toBeGreaterThan(1); + expect(Buffer.concat(chunks).toString("utf8")).toBe(content); + expect(chunks.length).toBeGreaterThan(1); + + currentRange = 0; + for (const chunk of chunks) { + // Check that each chunk hasn't been modified. + // We want to be 100% sure that there is no accidental memory re-use here. + expect(contentBuffer.compare(chunk, undefined, undefined, currentRange, currentRange + chunk.length)).toEqual( + 0, + ); + currentRange += chunk.length; + } }); test(`Extra data should be ignored on streaming (multiple chunks, TCP server) with ${compression} compression`, async () => {