Co-authored-by: pfg <pfg@pfg.pw>
Co-authored-by: Ryan Gonzalez <git@refi64.dev>
Co-authored-by: Ben Grant <ben@bun.sh>
Co-authored-by: Dave Caruso <me@paperdave.net>
This commit is contained in:
Jarred Sumner
2024-11-01 18:38:01 -07:00
committed by GitHub
parent ce2afac827
commit 6914c5e32c
10 changed files with 148 additions and 21 deletions

View File

@@ -290,7 +290,7 @@ $ xcode-select --install
Bun defaults to linking `libatomic` statically, as not all systems have it. If you are building on a distro that does not have a static libatomic available, you can run the following command to enable dynamic linking:
```bash
$ bun setup -DUSE_STATIC_LIBATOMIC=OFF
$ bun run build -DUSE_STATIC_LIBATOMIC=OFF
```
The built version of Bun may not work on other systems if compiled this way.

View File

@@ -138,7 +138,7 @@ if(CMAKE_HOST_LINUX AND NOT WIN32 AND NOT APPLE)
OUTPUT_STRIP_TRAILING_WHITESPACE
ERROR_QUIET
)
if(LINUX_DISTRO MATCHES "NAME=\"(Arch|Manjaro|Artix) Linux\"|NAME=\"openSUSE Tumbleweed\"")
if(LINUX_DISTRO MATCHES "NAME=\"(Arch|Manjaro|Artix) Linux( ARM)?\"|NAME=\"openSUSE Tumbleweed\"")
set(DEFAULT_STATIC_LIBATOMIC OFF)
endif()
endif()

View File

@@ -45,6 +45,7 @@ export default [
["ERR_BUFFER_OUT_OF_BOUNDS", RangeError, "RangeError"],
["ERR_UNKNOWN_SIGNAL", TypeError, "TypeError"],
["ERR_SOCKET_BAD_PORT", RangeError, "RangeError"],
["ERR_STREAM_RELEASE_LOCK", Error, "AbortError"],
// Bun-specific
["ERR_FORMDATA_PARSE_ERROR", TypeError, "TypeError"],

View File

@@ -195,6 +195,10 @@ export function getStdinStream(fd) {
}
}
} catch (err) {
if (err?.code === "ERR_STREAM_RELEASE_LOCK") {
// Not a bug. Happens in unref().
return;
}
stream.destroy(err);
}
}
@@ -212,6 +216,7 @@ export function getStdinStream(fd) {
$debug('on("resume");');
ref();
stream._undestroy();
stream_destroyed = false;
});
stream._readableState.reading = false;

View File

@@ -108,6 +108,7 @@ export function initializeReadableStream(
$linkTimeConstant;
export function readableStreamToArray(stream: ReadableStream): Promise<unknown[]> {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
// this is a direct stream
var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
if (underlyingSource !== undefined) {
@@ -119,6 +120,7 @@ export function readableStreamToArray(stream: ReadableStream): Promise<unknown[]
$linkTimeConstant;
export function readableStreamToText(stream: ReadableStream): Promise<string> {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
// this is a direct stream
var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
if (underlyingSource !== undefined) {
@@ -137,6 +139,7 @@ export function readableStreamToText(stream: ReadableStream): Promise<string> {
$linkTimeConstant;
export function readableStreamToArrayBuffer(stream: ReadableStream<ArrayBuffer>): Promise<ArrayBuffer> | ArrayBuffer {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
// this is a direct stream
var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
if (underlyingSource !== undefined) {
@@ -216,6 +219,7 @@ export function readableStreamToArrayBuffer(stream: ReadableStream<ArrayBuffer>)
$linkTimeConstant;
export function readableStreamToBytes(stream: ReadableStream<ArrayBuffer>): Promise<Uint8Array> | Uint8Array {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
// this is a direct stream
var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
@@ -297,6 +301,7 @@ export function readableStreamToFormData(
stream: ReadableStream<ArrayBuffer>,
contentType: string | ArrayBuffer | ArrayBufferView,
): Promise<FormData> {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked"));
return Bun.readableStreamToBlob(stream).then(blob => {
return FormData.from(blob, contentType);
@@ -305,6 +310,7 @@ export function readableStreamToFormData(
$linkTimeConstant;
export function readableStreamToJSON(stream: ReadableStream): unknown {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked"));
let result = $tryUseReadableStreamBufferedFastPath(stream, "json");
if (result) {
@@ -326,6 +332,7 @@ export function readableStreamToJSON(stream: ReadableStream): unknown {
$linkTimeConstant;
export function readableStreamToBlob(stream: ReadableStream): Promise<Blob> {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked"));
return (
@@ -422,7 +429,15 @@ export function pipeThrough(this, streams, options) {
if ($isWritableStreamLocked(internalWritable)) throw $makeTypeError("WritableStream is locked");
$readableStreamPipeToWritableStream(this, internalWritable, preventClose, preventAbort, preventCancel, signal);
const promise = $readableStreamPipeToWritableStream(
this,
internalWritable,
preventClose,
preventAbort,
preventCancel,
signal,
);
$markPromiseAsHandled(promise);
return readable;
}

View File

@@ -33,8 +33,9 @@ export function initializeReadableStreamDefaultController(this, stream, underlyi
export function enqueue(this, chunk) {
if (!$isReadableStreamDefaultController(this)) throw $makeThisTypeError("ReadableStreamDefaultController", "enqueue");
if (!$readableStreamDefaultControllerCanCloseOrEnqueue(this))
throw new TypeError("ReadableStreamDefaultController is not in a state where chunk can be enqueued");
if (!$readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
throw $ERR_INVALID_STATE("ReadableStreamDefaultController is not in a state where chunk can be enqueued");
}
return $readableStreamDefaultControllerEnqueue(this, chunk);
}

View File

@@ -172,10 +172,7 @@ export function releaseLock(this) {
if (!$getByIdDirectPrivate(this, "ownerReadableStream")) return;
if ($getByIdDirectPrivate(this, "readRequests")?.isNotEmpty())
throw new TypeError("There are still pending read requests, cannot release the lock");
$readableStreamReaderGenericRelease(this);
$readableStreamDefaultReaderRelease(this);
}
$getter;

View File

@@ -331,7 +331,10 @@ export function pipeToDoReadWrite(pipeState) {
pipeState.pendingReadPromiseCapability.resolve.$call(undefined, canWrite);
if (!canWrite) return;
pipeState.pendingWritePromise = $writableStreamDefaultWriterWrite(pipeState.writer, result.value);
pipeState.pendingWritePromise = $writableStreamDefaultWriterWrite(pipeState.writer, result.value).$then(
undefined,
() => {},
);
},
e => {
pipeState.pendingReadPromiseCapability.resolve.$call(undefined, false);
@@ -396,7 +399,7 @@ export function pipeToClosingMustBePropagatedForward(pipeState) {
action();
return;
}
$getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").promise.$then(action, undefined);
$getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").promise.$then(action, () => {});
}
export function pipeToClosingMustBePropagatedBackward(pipeState) {
@@ -1367,20 +1370,18 @@ export function readableStreamError(stream, error) {
if (!reader) return;
$getByIdDirectPrivate(reader, "closedPromiseCapability").reject.$call(undefined, error);
const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").promise;
$markPromiseAsHandled(promise);
if ($isReadableStreamDefaultReader(reader)) {
const requests = $getByIdDirectPrivate(reader, "readRequests");
$putByIdDirectPrivate(reader, "readRequests", $createFIFO());
for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error);
$readableStreamDefaultReaderErrorReadRequests(reader, error);
} else {
$assert($isReadableStreamBYOBReader(reader));
const requests = $getByIdDirectPrivate(reader, "readIntoRequests");
$putByIdDirectPrivate(reader, "readIntoRequests", $createFIFO());
for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error);
}
$getByIdDirectPrivate(reader, "closedPromiseCapability").reject.$call(undefined, error);
const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").promise;
$markPromiseAsHandled(promise);
}
export function readableStreamDefaultControllerShouldCallPull(controller) {
@@ -1608,6 +1609,15 @@ export function isReadableStreamDisturbed(stream) {
return stream.$disturbed;
}
$visibility = "Private";
export function readableStreamDefaultReaderRelease(reader) {
$readableStreamReaderGenericRelease(reader);
$readableStreamDefaultReaderErrorReadRequests(
reader,
$ERR_STREAM_RELEASE_LOCK("Stream reader cancelled via releaseLock()"),
);
}
$visibility = "Private";
export function readableStreamReaderGenericRelease(reader) {
$assert(!!$getByIdDirectPrivate(reader, "ownerReadableStream"));
@@ -1616,11 +1626,11 @@ export function readableStreamReaderGenericRelease(reader) {
if ($getByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "state") === $streamReadable)
$getByIdDirectPrivate(reader, "closedPromiseCapability").reject.$call(
undefined,
$makeTypeError("releasing lock of reader whose stream is still in readable state"),
$ERR_STREAM_RELEASE_LOCK("Stream reader cancelled via releaseLock()"),
);
else
$putByIdDirectPrivate(reader, "closedPromiseCapability", {
promise: $newHandledRejectedPromise($makeTypeError("reader released lock")),
promise: $newHandledRejectedPromise($ERR_STREAM_RELEASE_LOCK("Stream reader cancelled via releaseLock()")),
});
const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").promise;
@@ -1636,6 +1646,12 @@ export function readableStreamReaderGenericRelease(reader) {
$putByIdDirectPrivate(reader, "ownerReadableStream", undefined);
}
export function readableStreamDefaultReaderErrorReadRequests(reader, error) {
const requests = $getByIdDirectPrivate(reader, "readRequests");
$putByIdDirectPrivate(reader, "readRequests", $createFIFO());
for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error);
}
export function readableStreamDefaultControllerCanCloseOrEnqueue(controller) {
if ($getByIdDirectPrivate(controller, "closeRequested")) {
return false;

View File

@@ -9,7 +9,11 @@ test("works with prompts", async () => {
stdin: "pipe",
});
await Bun.sleep(100);
const reader = child.stdout.getReader();
await reader.read();
reader.releaseLock();
child.stdin.write("dylan\n");
await Bun.sleep(100);
child.stdin.write("999\n");

View File

@@ -756,6 +756,55 @@ it("ReadableStream for empty file closes immediately", async () => {
expect(chunks.length).toBe(0);
});
it("ReadableStream errors the stream on pull rejection", async () => {
let stream = new ReadableStream({
pull(controller) {
return Promise.reject("pull rejected");
},
});
let reader = stream.getReader();
let closed = reader.closed.catch(err => `closed: ${err}`);
let read = reader.read().catch(err => `read: ${err}`);
expect(await Promise.race([closed, read])).toBe("closed: pull rejected");
expect(await read).toBe("read: pull rejected");
});
it("ReadableStream rejects pending reads when the lock is released", async () => {
let { resolve, promise } = Promise.withResolvers();
let stream = new ReadableStream({
async pull(controller) {
controller.enqueue("123");
await promise;
controller.enqueue("456");
controller.close();
},
});
let reader = stream.getReader();
expect((await reader.read()).value).toBe("123");
let read = reader.read();
reader.releaseLock();
expect(read).rejects.toThrow(
expect.objectContaining({
name: "AbortError",
code: "ERR_STREAM_RELEASE_LOCK",
}),
);
expect(reader.closed).rejects.toThrow(
expect.objectContaining({
name: "AbortError",
code: "ERR_STREAM_RELEASE_LOCK",
}),
);
resolve();
reader = stream.getReader();
expect((await reader.read()).value).toBe("456");
});
it("new Response(stream).arrayBuffer() (bytes)", async () => {
var queue = [Buffer.from("abdefgh")];
var stream = new ReadableStream({
@@ -1053,3 +1102,42 @@ it("fs.createReadStream(filename) should be able to break inside async loop", as
expect(true).toBe(true);
}
});
it("pipeTo doesn't cause unhandled rejections on readable errors", async () => {
// https://github.com/WebKit/WebKit/blob/3a75b5d2de94aa396a99b454ac47f3be9e0dc726/LayoutTests/streams/pipeTo-unhandled-promise.html
let unhandledRejectionCaught = false;
const catchUnhandledRejection = () => {
unhandledRejectionCaught = true;
};
process.on("unhandledRejection", catchUnhandledRejection);
const writable = new WritableStream();
const readable = new ReadableStream({ start: c => c.error("error") });
readable.pipeTo(writable).catch(() => {});
await Bun.sleep(15);
process.off("unhandledRejection", catchUnhandledRejection);
expect(unhandledRejectionCaught).toBe(false);
});
it("pipeThrough doesn't cause unhandled rejections on readable errors", async () => {
let unhandledRejectionCaught = false;
const catchUnhandledRejection = () => {
unhandledRejectionCaught = true;
};
process.on("unhandledRejection", catchUnhandledRejection);
const readable = new ReadableStream({ start: c => c.error("error") });
const ts = new TransformStream();
readable.pipeThrough(ts);
await Bun.sleep(15);
process.off("unhandledRejection", catchUnhandledRejection);
expect(unhandledRejectionCaught).toBe(false);
});