mirror of
https://github.com/oven-sh/bun
synced 2026-02-02 15:08:46 +00:00
fix(child_process): return net.Socket for piped stdout/stderr
Node.js returns net.Socket instances for piped stdout/stderr streams in child processes, but Bun was returning plain Readable streams. This broke code that checks `cp.stdout instanceof Socket` or relies on the constructor name for type identification. This change adds a new `constructNativeSocket` function that creates a net.Socket wrapping the native readable stream, and updates the child_process implementation to use it for stdout/stderr streams. Fixes #26505 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -256,4 +256,74 @@ function unref(this: NativeReadable) {
|
||||
}
|
||||
}
|
||||
|
||||
export default { constructNativeReadable };
|
||||
// constructNativeSocket creates a net.Socket wrapping a native readable.
|
||||
// This is used for child_process stdout/stderr to match Node.js behavior
|
||||
// where these streams are Socket instances, not plain Readable streams.
|
||||
// See: https://github.com/oven-sh/bun/issues/26505
|
||||
function constructNativeSocket(readableStream: ReadableStream, options): NativeReadable {
|
||||
$assert(typeof readableStream === "object" && readableStream instanceof ReadableStream, "Invalid readable stream");
|
||||
const bunNativePtr = (readableStream as any).$bunNativePtr;
|
||||
$assert(typeof bunNativePtr === "object", "Invalid native ptr");
|
||||
|
||||
// Create a Socket with readable=true, writable=false for stdout/stderr
|
||||
// Spread options first, then enforce readable/writable to prevent overrides
|
||||
const { Socket } = require("node:net");
|
||||
const stream = new Socket({
|
||||
...options,
|
||||
readable: true,
|
||||
writable: false,
|
||||
});
|
||||
|
||||
// Override _read with our native implementation
|
||||
stream._read = read;
|
||||
|
||||
// Create a custom _destroy that cleans up native resources and then emits close
|
||||
const originalSocketDestroy = Socket.prototype._destroy;
|
||||
stream._destroy = function socketDestroy(error: any, cb: () => void) {
|
||||
const ptr = this.$bunNativePtr;
|
||||
if (ptr) {
|
||||
ptr.cancel(error);
|
||||
}
|
||||
// Call the original Socket._destroy which will emit "close"
|
||||
// Since we don't have a _handle, it will call cb and emit close via emitCloseNT
|
||||
return originalSocketDestroy.$call(this, error, cb);
|
||||
};
|
||||
|
||||
// End the writable side immediately since this is a read-only socket
|
||||
stream._writableState.ended = true;
|
||||
stream._writableState.finished = true;
|
||||
|
||||
if (!!$debug) {
|
||||
stream.debugId = ++debugId;
|
||||
}
|
||||
|
||||
stream.$bunNativePtr = bunNativePtr;
|
||||
stream[kRefCount] = 0;
|
||||
stream[kConstructed] = false;
|
||||
stream[kPendingRead] = false;
|
||||
stream[kHasResized] = !dynamicallyAdjustChunkSize();
|
||||
stream[kCloseState] = [false];
|
||||
|
||||
if (typeof options.highWaterMark === "number") {
|
||||
stream[kHighWaterMark] = options.highWaterMark;
|
||||
} else {
|
||||
stream[kHighWaterMark] = 256 * 1024;
|
||||
}
|
||||
|
||||
// Override ref/unref to use native implementation
|
||||
stream.ref = ref;
|
||||
stream.unref = unref;
|
||||
|
||||
// https://github.com/oven-sh/bun/pull/12801
|
||||
// https://github.com/oven-sh/bun/issues/9555
|
||||
// There may be a ReadableStream.Strong handle to the ReadableStream.
|
||||
// We can't update those handles to point to the NativeReadable from JS
|
||||
// So we instead mark it as no longer usable, and create a new NativeReadable
|
||||
transferToNativeReadable(readableStream);
|
||||
|
||||
$debug(`[${stream.debugId}] constructed socket!`);
|
||||
|
||||
return stream;
|
||||
}
|
||||
|
||||
export default { constructNativeReadable, constructNativeSocket };
|
||||
|
||||
@@ -1181,22 +1181,27 @@ class ChildProcess extends EventEmitter {
|
||||
const value = handle?.[fdToStdioName(i as 1 | 2)!];
|
||||
// This can happen if the process was already killed.
|
||||
if (!value) {
|
||||
const Readable = require("internal/streams/readable");
|
||||
const stream = new Readable({ read() {} });
|
||||
// Return a destroyed Socket to match Node.js behavior
|
||||
if (!NetModule) NetModule = require("node:net");
|
||||
const stream = new NetModule.Socket({ readable: true, writable: false });
|
||||
// Mark as destroyed to indicate it's not usable
|
||||
stream.destroy();
|
||||
return stream;
|
||||
}
|
||||
|
||||
const pipe = require("internal/streams/native-readable").constructNativeReadable(value, { encoding });
|
||||
// Use constructNativeSocket to return a Socket instance for stdout/stderr
|
||||
// This matches Node.js behavior where child process stdio streams are Sockets
|
||||
// See: https://github.com/oven-sh/bun/issues/26505
|
||||
const pipe = require("internal/streams/native-readable").constructNativeSocket(value, { encoding });
|
||||
this.#closesNeeded++;
|
||||
pipe.once("close", () => this.#maybeClose());
|
||||
if (autoResume) pipe.resume();
|
||||
return pipe;
|
||||
}
|
||||
case "destroyed": {
|
||||
const Readable = require("internal/streams/readable");
|
||||
const stream = new Readable({ read() {} });
|
||||
// Return a destroyed Socket to match Node.js behavior
|
||||
if (!NetModule) NetModule = require("node:net");
|
||||
const stream = new NetModule.Socket({ readable: true, writable: false });
|
||||
// Mark as destroyed to indicate it's not usable
|
||||
stream.destroy();
|
||||
return stream;
|
||||
|
||||
116
test/regression/issue/26505.test.ts
Normal file
116
test/regression/issue/26505.test.ts
Normal file
@@ -0,0 +1,116 @@
|
||||
import { expect, test } from "bun:test";
|
||||
import { bunEnv, bunExe } from "harness";
|
||||
import { spawn } from "node:child_process";
|
||||
import { Socket } from "node:net";
|
||||
|
||||
// https://github.com/oven-sh/bun/issues/26505
|
||||
// Child process piped stdout/stderr should be Socket instances, not plain Readable streams
|
||||
|
||||
function collectStreamData(stream: NodeJS.ReadableStream): Promise<string> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const chunks: Buffer[] = [];
|
||||
stream.on("data", chunk => chunks.push(Buffer.from(chunk)));
|
||||
stream.on("end", () => resolve(Buffer.concat(chunks).toString()));
|
||||
stream.on("error", reject);
|
||||
});
|
||||
}
|
||||
|
||||
function waitForClose(cp: ReturnType<typeof spawn>): Promise<number | null> {
|
||||
return new Promise(resolve => {
|
||||
cp.on("close", code => resolve(code));
|
||||
});
|
||||
}
|
||||
|
||||
test("child process stdout is a Socket instance", async () => {
|
||||
const cp = spawn(bunExe(), ["-e", "console.log('hello')"], {
|
||||
stdio: "pipe",
|
||||
env: bunEnv,
|
||||
});
|
||||
|
||||
expect(cp.stdout).toBeInstanceOf(Socket);
|
||||
expect(cp.stdout!.constructor.name).toBe("Socket");
|
||||
expect(typeof cp.stdout!.ref).toBe("function");
|
||||
expect(typeof cp.stdout!.unref).toBe("function");
|
||||
|
||||
const [stdout, exitCode] = await Promise.all([collectStreamData(cp.stdout!), waitForClose(cp)]);
|
||||
|
||||
expect(stdout.trim()).toBe("hello");
|
||||
expect(exitCode).toBe(0);
|
||||
});
|
||||
|
||||
test("child process stderr is a Socket instance", async () => {
|
||||
const cp = spawn(bunExe(), ["-e", "console.error('error message')"], {
|
||||
stdio: "pipe",
|
||||
env: bunEnv,
|
||||
});
|
||||
|
||||
expect(cp.stderr).toBeInstanceOf(Socket);
|
||||
expect(cp.stderr!.constructor.name).toBe("Socket");
|
||||
expect(typeof cp.stderr!.ref).toBe("function");
|
||||
expect(typeof cp.stderr!.unref).toBe("function");
|
||||
|
||||
const [stderr, exitCode] = await Promise.all([collectStreamData(cp.stderr!), waitForClose(cp)]);
|
||||
|
||||
expect(stderr.trim()).toBe("error message");
|
||||
expect(exitCode).toBe(0);
|
||||
});
|
||||
|
||||
test("child process stdin is not a Socket (it's a Writable)", async () => {
|
||||
const cp = spawn(bunExe(), ["-e", "process.stdin.pipe(process.stdout)"], {
|
||||
stdio: "pipe",
|
||||
env: bunEnv,
|
||||
});
|
||||
|
||||
// stdin is a Writable, not a Socket
|
||||
expect(cp.stdin).not.toBeInstanceOf(Socket);
|
||||
expect(typeof cp.stdin!.write).toBe("function");
|
||||
|
||||
cp.stdin!.write("hello from stdin");
|
||||
cp.stdin!.end();
|
||||
|
||||
const [stdout, exitCode] = await Promise.all([collectStreamData(cp.stdout!), waitForClose(cp)]);
|
||||
|
||||
expect(stdout).toBe("hello from stdin");
|
||||
expect(exitCode).toBe(0);
|
||||
});
|
||||
|
||||
test("socket ref/unref methods work correctly", async () => {
|
||||
const cp = spawn(bunExe(), ["-e", "console.log('done')"], {
|
||||
stdio: "pipe",
|
||||
env: bunEnv,
|
||||
});
|
||||
|
||||
// Should not throw when calling ref/unref
|
||||
expect(() => cp.stdout!.ref()).not.toThrow();
|
||||
expect(() => cp.stdout!.unref()).not.toThrow();
|
||||
expect(() => cp.stderr!.ref()).not.toThrow();
|
||||
expect(() => cp.stderr!.unref()).not.toThrow();
|
||||
|
||||
const [stdout, exitCode] = await Promise.all([collectStreamData(cp.stdout!), waitForClose(cp)]);
|
||||
|
||||
expect(stdout.trim()).toBe("done");
|
||||
expect(exitCode).toBe(0);
|
||||
});
|
||||
|
||||
test("socket streams work correctly when process exits with non-zero code", async () => {
|
||||
const cp = spawn(bunExe(), ["-e", "console.error('error output'); process.exit(1)"], {
|
||||
stdio: "pipe",
|
||||
env: bunEnv,
|
||||
});
|
||||
|
||||
// Verify stream types are correct even for failing processes
|
||||
expect(cp.stdout).toBeInstanceOf(Socket);
|
||||
expect(cp.stderr).toBeInstanceOf(Socket);
|
||||
expect(cp.stdin).not.toBeInstanceOf(Socket);
|
||||
|
||||
// ref/unref should not throw on failing process streams
|
||||
expect(() => cp.stdout!.ref()).not.toThrow();
|
||||
expect(() => cp.stdout!.unref()).not.toThrow();
|
||||
expect(() => cp.stderr!.ref()).not.toThrow();
|
||||
expect(() => cp.stderr!.unref()).not.toThrow();
|
||||
|
||||
const [stderr, exitCode] = await Promise.all([collectStreamData(cp.stderr!), waitForClose(cp)]);
|
||||
|
||||
expect(stderr.trim()).toBe("error output");
|
||||
expect(exitCode).toBe(1);
|
||||
});
|
||||
Reference in New Issue
Block a user