Compare commits

...

7 Commits

Author SHA1 Message Date
pfg
081971d69d fix non-tty stdin streams 2024-10-28 20:19:16 -07:00
Ben Grant
0912a33113 Try something else for stdin issues 2024-10-28 18:43:50 -07:00
Ben Grant
b6dfd6e6e4 Add error code to ReadableStreamDefaultController enqueue error 2024-10-28 18:43:50 -07:00
Ben Grant
083c2d7d37 Add assertions to readableStreamTo functions 2024-10-28 18:43:50 -07:00
Jarred Sumner
eafa644322 Update ProcessObjectInternals.ts 2024-10-28 18:43:50 -07:00
Jarred Sumner
9145a28476 use a regular fs.ReadStream when it's not a TTY 2024-10-28 18:43:50 -07:00
Ryan Gonzalez
542692df63 Add $ERR_STREAM_RELEASE_LOCK
Ignore ERR_STREAM_RELEASE_LOCK in process.stdin

Reject pending reads when a ReadableStream's lock is released

f8ae40c2a7

Update the ReadableStream promise resolution ordering on pull rejection

1f74c15de9

Fix pipeThrough unhandled rejections on readable errors

07e4b92acd

Fix pipeTo unhandled rejections on readable errors

3a75b5d2de

Fixes #13816.
2024-10-28 18:43:50 -07:00
7 changed files with 177 additions and 31 deletions

View File

@@ -45,6 +45,7 @@ export default [
["ERR_BUFFER_OUT_OF_BOUNDS", RangeError, "RangeError"],
["ERR_UNKNOWN_SIGNAL", TypeError, "TypeError"],
["ERR_SOCKET_BAD_PORT", RangeError, "RangeError"],
["ERR_STREAM_RELEASE_LOCK", Error, "AbortError"],
// Bun-specific
["ERR_FORMDATA_PARSE_ERROR", TypeError, "TypeError"],

View File

@@ -61,6 +61,41 @@ export function getStdioWriteStream(fd) {
}
export function getStdinStream(fd) {
const tty = require("node:tty");
const fs = require("node:fs");
let stream;
if (tty.isatty(fd)) {
stream = new tty.ReadStream(fd);
} else {
const stat = fs.fstatSync(fd);
if (stat.isFile()) {
stream = new fs.ReadStream(null, { fd: fd, autoClose: false });
} else {
const net = require("node:net");
if (process.channel && process.channel.fd === fd) {
stream = new net.Socket({
handle: process.channel,
readable: true,
writable: false,
manualStart: true,
});
} else if (stat.isSocket()) {
stream = new net.Socket({
fd: fd,
readable: true,
writable: false,
manualStart: true,
});
} else {
const { Readable } = require("node:stream");
stream = new Readable({ read() {} });
stream.push(null);
}
stream._writableState.ended = true;
}
}
// Ideally we could use this:
// return require("node:stream")[Symbol.for("::bunternal::")]._ReadableFromWeb(Bun.stdin.stream());
// but we need to extend TTY/FS ReadStream
@@ -106,22 +141,11 @@ export function getStdinStream(fd) {
$getByIdDirectPrivate(native, "readableStreamController"),
"underlyingByteSource",
).$resume(false);
} catch (e) {
if (IS_BUN_DEVELOPMENT) {
// we assume this isn't possible, but because we aren't sure
// we will ignore if error during release, but make a big deal in debug
console.error(e);
$assert(!"reachable");
}
}
} catch (e) {}
}
}
}
const tty = require("node:tty");
const ReadStream = tty.isatty(fd) ? tty.ReadStream : require("node:fs").ReadStream;
const stream = new ReadStream(fd);
const originalOn = stream.on;
let stream_destroyed = false;
@@ -194,7 +218,11 @@ export function getStdinStream(fd) {
unref();
}
}
} catch (err) {
} catch (err: any) {
if (err?.code === "ERR_STREAM_RELEASE_LOCK") {
// Not a bug. Happens in unref().
return;
}
stream.destroy(err);
}
}

View File

@@ -108,6 +108,7 @@ export function initializeReadableStream(
$linkTimeConstant;
export function readableStreamToArray(stream: ReadableStream): Promise<unknown[]> {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
// this is a direct stream
var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
if (underlyingSource !== undefined) {
@@ -119,6 +120,7 @@ export function readableStreamToArray(stream: ReadableStream): Promise<unknown[]
$linkTimeConstant;
export function readableStreamToText(stream: ReadableStream): Promise<string> {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
// this is a direct stream
var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
if (underlyingSource !== undefined) {
@@ -137,6 +139,7 @@ export function readableStreamToText(stream: ReadableStream): Promise<string> {
$linkTimeConstant;
export function readableStreamToArrayBuffer(stream: ReadableStream<ArrayBuffer>): Promise<ArrayBuffer> | ArrayBuffer {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
// this is a direct stream
var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
if (underlyingSource !== undefined) {
@@ -216,6 +219,7 @@ export function readableStreamToArrayBuffer(stream: ReadableStream<ArrayBuffer>)
$linkTimeConstant;
export function readableStreamToBytes(stream: ReadableStream<ArrayBuffer>): Promise<Uint8Array> | Uint8Array {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
// this is a direct stream
var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
@@ -297,6 +301,7 @@ export function readableStreamToFormData(
stream: ReadableStream<ArrayBuffer>,
contentType: string | ArrayBuffer | ArrayBufferView,
): Promise<FormData> {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked"));
return Bun.readableStreamToBlob(stream).then(blob => {
return FormData.from(blob, contentType);
@@ -305,6 +310,7 @@ export function readableStreamToFormData(
$linkTimeConstant;
export function readableStreamToJSON(stream: ReadableStream): unknown {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked"));
let result = $tryUseReadableStreamBufferedFastPath(stream, "json");
if (result) {
@@ -326,6 +332,7 @@ export function readableStreamToJSON(stream: ReadableStream): unknown {
$linkTimeConstant;
export function readableStreamToBlob(stream: ReadableStream): Promise<Blob> {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked"));
return (
@@ -422,7 +429,15 @@ export function pipeThrough(this, streams, options) {
if ($isWritableStreamLocked(internalWritable)) throw $makeTypeError("WritableStream is locked");
$readableStreamPipeToWritableStream(this, internalWritable, preventClose, preventAbort, preventCancel, signal);
const promise = $readableStreamPipeToWritableStream(
this,
internalWritable,
preventClose,
preventAbort,
preventCancel,
signal,
);
$markPromiseAsHandled(promise);
return readable;
}

View File

@@ -33,8 +33,9 @@ export function initializeReadableStreamDefaultController(this, stream, underlyi
export function enqueue(this, chunk) {
if (!$isReadableStreamDefaultController(this)) throw $makeThisTypeError("ReadableStreamDefaultController", "enqueue");
if (!$readableStreamDefaultControllerCanCloseOrEnqueue(this))
throw new TypeError("ReadableStreamDefaultController is not in a state where chunk can be enqueued");
if (!$readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
throw $ERR_INVALID_STATE("ReadableStreamDefaultController is not in a state where chunk can be enqueued");
}
return $readableStreamDefaultControllerEnqueue(this, chunk);
}

View File

@@ -172,10 +172,7 @@ export function releaseLock(this) {
if (!$getByIdDirectPrivate(this, "ownerReadableStream")) return;
if ($getByIdDirectPrivate(this, "readRequests")?.isNotEmpty())
throw new TypeError("There are still pending read requests, cannot release the lock");
$readableStreamReaderGenericRelease(this);
$readableStreamDefaultReaderRelease(this);
}
$getter;

View File

@@ -331,7 +331,10 @@ export function pipeToDoReadWrite(pipeState) {
pipeState.pendingReadPromiseCapability.resolve.$call(undefined, canWrite);
if (!canWrite) return;
pipeState.pendingWritePromise = $writableStreamDefaultWriterWrite(pipeState.writer, result.value);
pipeState.pendingWritePromise = $writableStreamDefaultWriterWrite(pipeState.writer, result.value).$then(
undefined,
() => {},
);
},
e => {
pipeState.pendingReadPromiseCapability.resolve.$call(undefined, false);
@@ -396,7 +399,7 @@ export function pipeToClosingMustBePropagatedForward(pipeState) {
action();
return;
}
$getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").promise.$then(action, undefined);
$getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").promise.$then(action, () => {});
}
export function pipeToClosingMustBePropagatedBackward(pipeState) {
@@ -1367,20 +1370,18 @@ export function readableStreamError(stream, error) {
if (!reader) return;
$getByIdDirectPrivate(reader, "closedPromiseCapability").reject.$call(undefined, error);
const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").promise;
$markPromiseAsHandled(promise);
if ($isReadableStreamDefaultReader(reader)) {
const requests = $getByIdDirectPrivate(reader, "readRequests");
$putByIdDirectPrivate(reader, "readRequests", $createFIFO());
for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error);
$readableStreamDefaultReaderErrorReadRequests(reader, error);
} else {
$assert($isReadableStreamBYOBReader(reader));
const requests = $getByIdDirectPrivate(reader, "readIntoRequests");
$putByIdDirectPrivate(reader, "readIntoRequests", $createFIFO());
for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error);
}
$getByIdDirectPrivate(reader, "closedPromiseCapability").reject.$call(undefined, error);
const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").promise;
$markPromiseAsHandled(promise);
}
export function readableStreamDefaultControllerShouldCallPull(controller) {
@@ -1608,6 +1609,15 @@ export function isReadableStreamDisturbed(stream) {
return stream.$disturbed;
}
$visibility = "Private";
export function readableStreamDefaultReaderRelease(reader) {
$readableStreamReaderGenericRelease(reader);
$readableStreamDefaultReaderErrorReadRequests(
reader,
$ERR_STREAM_RELEASE_LOCK("Stream reader cancelled via releaseLock()"),
);
}
$visibility = "Private";
export function readableStreamReaderGenericRelease(reader) {
$assert(!!$getByIdDirectPrivate(reader, "ownerReadableStream"));
@@ -1616,11 +1626,11 @@ export function readableStreamReaderGenericRelease(reader) {
if ($getByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "state") === $streamReadable)
$getByIdDirectPrivate(reader, "closedPromiseCapability").reject.$call(
undefined,
$makeTypeError("releasing lock of reader whose stream is still in readable state"),
$ERR_STREAM_RELEASE_LOCK("Stream reader cancelled via releaseLock()"),
);
else
$putByIdDirectPrivate(reader, "closedPromiseCapability", {
promise: $newHandledRejectedPromise($makeTypeError("reader released lock")),
promise: $newHandledRejectedPromise($ERR_STREAM_RELEASE_LOCK("Stream reader cancelled via releaseLock()")),
});
const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").promise;
@@ -1636,6 +1646,12 @@ export function readableStreamReaderGenericRelease(reader) {
$putByIdDirectPrivate(reader, "ownerReadableStream", undefined);
}
export function readableStreamDefaultReaderErrorReadRequests(reader, error) {
const requests = $getByIdDirectPrivate(reader, "readRequests");
$putByIdDirectPrivate(reader, "readRequests", $createFIFO());
for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error);
}
export function readableStreamDefaultControllerCanCloseOrEnqueue(controller) {
if ($getByIdDirectPrivate(controller, "closeRequested")) {
return false;

View File

@@ -756,6 +756,55 @@ it("ReadableStream for empty file closes immediately", async () => {
expect(chunks.length).toBe(0);
});
it("ReadableStream errors the stream on pull rejection", async () => {
let stream = new ReadableStream({
pull(controller) {
return Promise.reject("pull rejected");
},
});
let reader = stream.getReader();
let closed = reader.closed.catch(err => `closed: ${err}`);
let read = reader.read().catch(err => `read: ${err}`);
expect(await Promise.race([closed, read])).toBe("closed: pull rejected");
expect(await read).toBe("read: pull rejected");
});
it("ReadableStream rejects pending reads when the lock is released", async () => {
let { resolve, promise } = Promise.withResolvers();
let stream = new ReadableStream({
async pull(controller) {
controller.enqueue("123");
await promise;
controller.enqueue("456");
controller.close();
},
});
let reader = stream.getReader();
expect((await reader.read()).value).toBe("123");
let read = reader.read();
reader.releaseLock();
expect(read).rejects.toThrow(
expect.objectContaining({
name: "AbortError",
code: "ERR_STREAM_RELEASE_LOCK",
}),
);
expect(reader.closed).rejects.toThrow(
expect.objectContaining({
name: "AbortError",
code: "ERR_STREAM_RELEASE_LOCK",
}),
);
resolve();
reader = stream.getReader();
expect((await reader.read()).value).toBe("456");
});
it("new Response(stream).arrayBuffer() (bytes)", async () => {
var queue = [Buffer.from("abdefgh")];
var stream = new ReadableStream({
@@ -1053,3 +1102,42 @@ it("fs.createReadStream(filename) should be able to break inside async loop", as
expect(true).toBe(true);
}
});
it("pipeTo doesn't cause unhandled rejections on readable errors", async () => {
// https://github.com/WebKit/WebKit/blob/3a75b5d2de94aa396a99b454ac47f3be9e0dc726/LayoutTests/streams/pipeTo-unhandled-promise.html
let unhandledRejectionCaught = false;
const catchUnhandledRejection = () => {
unhandledRejectionCaught = true;
};
process.on("unhandledRejection", catchUnhandledRejection);
const writable = new WritableStream();
const readable = new ReadableStream({ start: c => c.error("error") });
readable.pipeTo(writable).catch(() => {});
await Bun.sleep(15);
process.off("unhandledRejection", catchUnhandledRejection);
expect(unhandledRejectionCaught).toBe(false);
});
it("pipeThrough doesn't cause unhandled rejections on readable errors", async () => {
let unhandledRejectionCaught = false;
const catchUnhandledRejection = () => {
unhandledRejectionCaught = true;
};
process.on("unhandledRejection", catchUnhandledRejection);
const readable = new ReadableStream({ start: c => c.error("error") });
const ts = new TransformStream();
readable.pipeThrough(ts);
await Bun.sleep(15);
process.off("unhandledRejection", catchUnhandledRejection);
expect(unhandledRejectionCaught).toBe(false);
});