Fix process.stdin.ref (#16767)

This commit is contained in:
Jarred Sumner
2025-01-26 03:51:16 -08:00
committed by GitHub
parent 89eea169c0
commit 77be87b0a7
5 changed files with 131 additions and 23 deletions

View File

@@ -103,6 +103,7 @@ typedef int mode_t;
extern "C" bool Bun__Node__ProcessNoDeprecation;
extern "C" bool Bun__Node__ProcessThrowDeprecation;
extern "C" int32_t bun_stdio_tty[3];
namespace Bun {
@@ -2069,6 +2070,8 @@ static JSValue constructStderr(VM& vm, JSObject* processObject)
#define STDIN_FILENO 0
#endif
extern "C" int32_t Bun__Process__getStdinFdType(void*);
static JSValue constructStdin(VM& vm, JSObject* processObject)
{
auto* globalObject = processObject->globalObject();
@@ -2076,7 +2079,8 @@ static JSValue constructStdin(VM& vm, JSObject* processObject)
JSC::JSFunction* getStdioWriteStream = JSC::JSFunction::create(vm, globalObject, processObjectInternalsGetStdinStreamCodeGenerator(vm), globalObject);
JSC::MarkedArgumentBuffer args;
args.append(JSC::jsNumber(STDIN_FILENO));
args.append(jsBoolean(bun_stdio_tty[STDIN_FILENO]));
args.append(jsNumber(Bun__Process__getStdinFdType(Bun::vm(vm))));
JSC::CallData callData = JSC::getCallData(getStdioWriteStream);
NakedPtr<JSC::Exception> returnedException = nullptr;

View File

@@ -407,6 +407,23 @@ pub fn stdin(rare: *RareData) *Blob.Store {
};
}
const StdinFdType = enum(i32) {
file = 0,
pipe = 1,
socket = 2,
};
pub export fn Bun__Process__getStdinFdType(vm: *JSC.VirtualMachine) StdinFdType {
const mode = vm.rareData().stdin().data.file.mode;
if (bun.S.ISFIFO(mode)) {
return .pipe;
} else if (bun.S.ISSOCK(mode) or bun.S.ISCHR(mode)) {
return .socket;
} else {
return .file;
}
}
const Subprocess = @import("./api/bun/subprocess.zig").Subprocess;
pub fn spawnIPCContext(rare: *RareData, vm: *JSC.VirtualMachine) *uws.SocketContext {

View File

@@ -24,6 +24,12 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
const enum BunProcessStdinFdType {
file = 0,
pipe = 1,
socket = 2,
}
export function getStdioWriteStream(fd) {
$assert(typeof fd === "number", `Expected fd to be a number, got ${typeof fd}`);
const tty = require("node:tty");
@@ -68,7 +74,7 @@ export function getStdioWriteStream(fd) {
return [stream, underlyingSink];
}
export function getStdinStream(fd) {
export function getStdinStream(fd, isTTY: boolean, fdType: BunProcessStdinFdType) {
const native = Bun.stdin.stream();
// @ts-expect-error
const source = native.$bunNativePtr;
@@ -103,9 +109,8 @@ export function getStdinStream(fd) {
}
}
const tty = require("node:tty");
const ReadStream = tty.isatty(fd) ? tty.ReadStream : require("node:fs").ReadStream;
const stream = new ReadStream(null, { fd });
const ReadStream = isTTY ? require("node:tty").ReadStream : require("node:fs").ReadStream;
const stream = new ReadStream(null, { fd, autoClose: false });
const originalOn = stream.on;
@@ -129,6 +134,20 @@ export function getStdinStream(fd) {
stream.fd = fd;
// tty.ReadStream is supposed to extend from net.Socket.
// but we haven't made that work yet. Until then, we need to manually add some of net.Socket's methods
if (isTTY || fdType !== BunProcessStdinFdType.file) {
stream.ref = function () {
ref();
return this;
};
stream.unref = function () {
unref();
return this;
};
}
const originalPause = stream.pause;
stream.pause = function () {
$debug("pause();");

View File

@@ -0,0 +1,30 @@
import { test, expect } from "bun:test";
import { bunEnv, bunExe, isWindows } from "harness";
test("pipe does the right thing", async () => {
// Note: Bun.spawnSync uses memfd_create on Linux for pipe, which means we see
// it as a file instead of a tty
const result = Bun.spawn({
cmd: [bunExe(), "-e", "console.log(typeof process.stdin.ref)"],
stdin: "pipe",
stdout: "pipe",
stderr: "inherit",
env: bunEnv,
});
expect((await new Response(result.stdout).text()).trim()).toBe("function");
expect(await result.exited).toBe(0);
});
test("file does the right thing", async () => {
const result = Bun.spawn({
cmd: [bunExe(), "-e", "console.log(typeof process.stdin.ref)"],
stdin: Bun.file(import.meta.path),
stdout: "pipe",
stderr: "inherit",
env: bunEnv,
});
expect((await new Response(result.stdout).text()).trim()).toBe("undefined");
expect(await result.exited).toBe(0);
});

View File

@@ -663,14 +663,28 @@ describe("fetch() with streaming", () => {
switch (compression) {
case "gzip-libdeflate":
case "gzip":
return Bun.gzipSync(data, { library: compression === "gzip-libdeflate" ? "libdeflate" : "zlib" });
return Bun.gzipSync(data, {
library: compression === "gzip-libdeflate" ? "libdeflate" : "zlib",
level: 1, // fastest compression
});
case "deflate-libdeflate":
case "deflate":
return Bun.deflateSync(data, { library: compression === "deflate-libdeflate" ? "libdeflate" : "zlib" });
return Bun.deflateSync(data, {
library: compression === "deflate-libdeflate" ? "libdeflate" : "zlib",
level: 1, // fastest compression
});
case "deflate_with_headers":
return zlib.deflateSync(data);
return zlib.deflateSync(data, {
level: 1, // fastest compression
});
case "br":
return zlib.brotliCompressSync(data);
return zlib.brotliCompressSync(data, {
params: {
[zlib.constants.BROTLI_PARAM_QUALITY]: 0,
[zlib.constants.BROTLI_PARAM_MODE]: zlib.constants.BROTLI_MODE_GENERIC,
[zlib.constants.BROTLI_PARAM_SIZE_HINT]: 0,
},
});
default:
return data;
}
@@ -903,25 +917,33 @@ describe("fetch() with streaming", () => {
});
test(`Content-Length response works (multiple parts) with ${compression} compression`, async () => {
const rawBytes = Buffer.allocUnsafe(128 * 1024);
const rawBytes = Buffer.allocUnsafe(1024 * 1024);
// Random data doesn't compress well. We need enough random data that
// the compressed data is larger than 64 bytes.
require("crypto").randomFillSync(rawBytes);
const content = rawBytes.toString("hex");
const contentBuffer = Buffer.from(content);
const data = compress(compression, contentBuffer);
var onReceivedHeaders = Promise.withResolvers();
using server = Bun.serve({
port: 0,
async fetch(req) {
const data = compress(compression, Buffer.from(content));
return new Response(
new ReadableStream({
async pull(controller) {
const firstChunk = data.subarray(0, 64);
const secondChunk = data.subarray(firstChunk.length);
controller.enqueue(firstChunk);
await onReceivedHeaders.promise;
await Bun.sleep(128);
controller.enqueue(secondChunk);
// Ensure we actually send it over the network in multiple chunks.
let tenth = (data.length / 10) | 0;
let remaining = data;
while (remaining.length > 0) {
const chunk = remaining.subarray(0, Math.min(tenth, remaining.length));
controller.enqueue(chunk);
if (remaining === data) {
await onReceivedHeaders.promise;
}
remaining = remaining.subarray(chunk.length);
await Bun.sleep(1);
}
controller.close();
},
}),
@@ -952,15 +974,21 @@ describe("fetch() with streaming", () => {
gcTick(false);
const reader = res.body?.getReader();
let buffer = Buffer.alloc(0);
let parts = 0;
let chunks: Uint8Array[] = [];
let currentRange = 0;
while (true) {
gcTick(false);
const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>;
if (value) {
buffer = Buffer.concat([buffer, value]);
parts++;
chunks.push(value);
// Check the content is what is expected at this time.
// We're avoiding calling .buffer since that changes the internal representation in JSC and we want to test the raw data.
expect(contentBuffer.compare(value, undefined, undefined, currentRange, currentRange + value.length)).toEqual(
0,
);
currentRange += value.length;
}
if (done) {
break;
@@ -968,8 +996,18 @@ describe("fetch() with streaming", () => {
}
gcTick(false);
expect(buffer.toString("utf8")).toBe(content);
expect(parts).toBeGreaterThan(1);
expect(Buffer.concat(chunks).toString("utf8")).toBe(content);
expect(chunks.length).toBeGreaterThan(1);
currentRange = 0;
for (const chunk of chunks) {
// Check that each chunk hasn't been modified.
// We want to be 100% sure that there is no accidental memory re-use here.
expect(contentBuffer.compare(chunk, undefined, undefined, currentRange, currentRange + chunk.length)).toEqual(
0,
);
currentRange += chunk.length;
}
});
test(`Extra data should be ignored on streaming (multiple chunks, TCP server) with ${compression} compression`, async () => {