Compare commits

...

1 Commits

Author SHA1 Message Date
Claude Bot
81e928d8b8 fix: support BYOB readers on native ReadableStreams
Fixes #12908. Native ReadableStreams (req.body, fetch().body,
Blob.stream(), Bun.file().stream()) threw "ReadableStreamBYOBReader
needs a ReadableByteStreamController" when getReader({ mode: 'byob' })
was called.

Three issues were fixed:

1. getReader() only called the lazy stream initialization function
   for default readers, not BYOB readers. Moved the start_() call
   before the mode check so both paths trigger initialization.

2. When a BYOB reader is requested, the NativeReadableStreamSource
   now gets type: "bytes" set so createReadableStreamController
   creates a ReadableByteStreamController. This is only done for
   BYOB readers to avoid the pendingPullIntos performance overhead
   for default readers.

3. readableStreamClose() didn't handle BYOB readers - it only
   resolved pending readRequests for default readers. Added handling
   for readIntoRequests so BYOB reads resolve on stream close.

Also fixed readableStreamReaderGenericRelease() to look up the
underlying source from either "underlyingSource" (default controller)
or "underlyingByteSource" (byte stream controller) when calling
$resume.

Co-Authored-By: Claude <noreply@anthropic.com>
2026-02-20 02:39:56 +00:00
3 changed files with 149 additions and 32 deletions

View File

@@ -90,8 +90,8 @@ export function initializeReadableStream(
autoAllocateChunkSize || $getByIdDirectPrivate(strategy, "highWaterMark"),
);
$putByIdDirectPrivate(this, "start", () => {
const instance = $lazyLoadStream(this, autoAllocateChunkSize);
$putByIdDirectPrivate(this, "start", byob => {
const instance = $lazyLoadStream(this, autoAllocateChunkSize, byob);
if (instance) {
$createReadableStreamController(this, instance, strategy);
}
@@ -381,21 +381,22 @@ export function getReader(this, options) {
if (!$isReadableStream(this)) throw $ERR_INVALID_THIS("ReadableStream");
const mode = $toDictionary(options, {}, "ReadableStream.getReader takes an object as first argument").mode;
if (mode === undefined) {
var start_ = $getByIdDirectPrivate(this, "start");
if (start_) {
$putByIdDirectPrivate(this, "start", undefined);
start_();
}
// String conversion is required by spec, hence double equals.
const isByob = mode == "byob";
var start_ = $getByIdDirectPrivate(this, "start");
if (start_) {
$putByIdDirectPrivate(this, "start", undefined);
start_(isByob);
}
if (!isByob) {
if (mode !== undefined) {
throw $ERR_INVALID_ARG_VALUE("mode", mode, "byob");
}
return new ReadableStreamDefaultReader(this);
}
// String conversion is required by spec, hence double equals.
if (mode == "byob") {
return new ReadableStreamBYOBReader(this);
}
throw $ERR_INVALID_ARG_VALUE("mode", mode, "byob");
return new ReadableStreamBYOBReader(this);
}
export function pipeThrough(this, streams, options) {

View File

@@ -1665,6 +1665,14 @@ export function readableStreamClose(stream) {
for (var request = requests.shift(); request; request = requests.shift())
$fulfillPromise(request, { value: undefined, done: true });
}
} else if ($isReadableStreamBYOBReader(reader)) {
const readIntoRequests = $getByIdDirectPrivate(reader, "readIntoRequests");
if (readIntoRequests?.isNotEmpty()) {
$putByIdDirectPrivate(reader, "readIntoRequests", $createFIFO());
for (var request = readIntoRequests.shift(); request; request = readIntoRequests.shift())
$fulfillPromise(request, { value: undefined, done: true });
}
}
$getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "closedPromiseCapability").resolve.$call();
@@ -1761,7 +1769,11 @@ export function readableStreamReaderGenericRelease(reader) {
var stream = $getByIdDirectPrivate(reader, "ownerReadableStream");
if (stream.$bunNativePtr) {
$getByIdDirectPrivate($getByIdDirectPrivate(stream, "readableStreamController"), "underlyingSource").$resume(false);
var controller = $getByIdDirectPrivate(stream, "readableStreamController");
var source =
$getByIdDirectPrivate(controller, "underlyingSource") ??
$getByIdDirectPrivate(controller, "underlyingByteSource");
source?.$resume(false);
}
$putByIdDirectPrivate(stream, "reader", undefined);
$putByIdDirectPrivate(reader, "ownerReadableStream", undefined);
@@ -1937,18 +1949,6 @@ export function createLazyLoadedStreamPrototype(): typeof ReadableStreamDefaultC
}
}
// This was a type: "bytes" until Bun v1.1.44, but pendingPullIntos was not really
// compatible with how we send data to the stream, and "mode: 'byob'" wasn't
// supported so changing it isn't an observable change.
//
// When we receive chunks of data from native code, we sometimes read more
// than what the input buffer provided. When that happens, we return a typed
// array instead of the number of bytes read.
//
// When that happens, the ReadableByteStreamController creates (byteLength / autoAllocateChunkSize) pending pull into descriptors.
// So if that number is something like 16 * 1024, and we actually read 2 MB, you're going to create 128 pending pull into descriptors.
//
// And those pendingPullIntos were often never actually drained.
class NativeReadableStreamSource {
constructor(handle, autoAllocateChunkSize, drainValue) {
$putByIdDirectPrivate(this, "stream", handle);
@@ -1985,7 +1985,7 @@ export function createLazyLoadedStreamPrototype(): typeof ReadableStreamDefaultC
}
}
#controller?: WeakRef<ReadableStreamDefaultController>;
#controller?: WeakRef<ReadableStreamDefaultController | ReadableByteStreamController>;
// eslint-disable-next-line no-unused-vars
pull;
@@ -2146,7 +2146,7 @@ export function createLazyLoadedStreamPrototype(): typeof ReadableStreamDefaultC
return NativeReadableStreamSource;
}
export function lazyLoadStream(stream, autoAllocateChunkSize) {
export function lazyLoadStream(stream, autoAllocateChunkSize, byob) {
$debug("lazyLoadStream", stream, autoAllocateChunkSize);
var handle = stream.$bunNativePtr;
if (handle === -1) return;
@@ -2175,7 +2175,7 @@ export function lazyLoadStream(stream, autoAllocateChunkSize) {
// empty file, no need for native back-and-forth on this
if (chunkSize === 0) {
if ((drainValue?.byteLength ?? 0) > 0) {
return {
var source = {
start(controller) {
controller.enqueue(drainValue);
controller.close();
@@ -2184,9 +2184,11 @@ export function lazyLoadStream(stream, autoAllocateChunkSize) {
controller.close();
},
};
if (byob) source.type = "bytes";
return source;
}
return {
var source = {
start(controller) {
controller.close();
},
@@ -2194,9 +2196,13 @@ export function lazyLoadStream(stream, autoAllocateChunkSize) {
controller.close();
},
};
if (byob) source.type = "bytes";
return source;
}
return new Prototype(handle, Math.max(chunkSize, autoAllocateChunkSize), drainValue);
var instance = new Prototype(handle, Math.max(chunkSize, autoAllocateChunkSize), drainValue);
if (byob) instance.type = "bytes";
return instance;
}
export function readableStreamIntoArray(stream) {

View File

@@ -0,0 +1,110 @@
import { expect, test } from "bun:test";
// https://github.com/oven-sh/bun/issues/12908
// BYOB reader on native ReadableStreams (req.body, fetch().body, Blob.stream(), Bun.file().stream())
// threw "ReadableStreamBYOBReader needs a ReadableByteStreamController"
test("req.body supports BYOB reader in Bun.serve", async () => {
using server = Bun.serve({
port: 0,
async fetch(req) {
const reader = req.body!.getReader({ mode: "byob" });
const chunks: Uint8Array[] = [];
while (true) {
const { done, value } = await reader.read(new Uint8Array(1024));
if (done) break;
chunks.push(value);
}
reader.releaseLock();
const total = chunks.reduce((sum, c) => sum + c.byteLength, 0);
return new Response(`OK:${total}`);
},
});
const body = "hello world";
const resp = await fetch(`http://localhost:${server.port}/`, {
method: "POST",
body,
});
expect(resp.status).toBe(200);
expect(await resp.text()).toBe(`OK:${body.length}`);
});
test("fetch() response body supports BYOB reader", async () => {
using server = Bun.serve({
port: 0,
fetch() {
return new Response("hello from server");
},
});
const resp = await fetch(`http://localhost:${server.port}/`);
const reader = resp.body!.getReader({ mode: "byob" });
const chunks: Uint8Array[] = [];
while (true) {
const { done, value } = await reader.read(new Uint8Array(1024));
if (done) break;
chunks.push(value);
}
reader.releaseLock();
const text = new TextDecoder().decode(Buffer.concat(chunks));
expect(text).toBe("hello from server");
});
test("Blob.stream() supports BYOB reader", async () => {
const blob = new Blob(["hello blob data"]);
const stream = blob.stream();
const reader = stream.getReader({ mode: "byob" });
const chunks: Uint8Array[] = [];
while (true) {
const { done, value } = await reader.read(new Uint8Array(1024));
if (done) break;
chunks.push(value);
}
reader.releaseLock();
const text = new TextDecoder().decode(Buffer.concat(chunks));
expect(text).toBe("hello blob data");
});
test("Bun.file().stream() supports BYOB reader", async () => {
// Write a temp file
const path = require("path").join(require("os").tmpdir(), `byob-test-${Date.now()}.txt`);
const content = "hello file data";
await Bun.write(path, content);
try {
const stream = Bun.file(path).stream();
const reader = stream.getReader({ mode: "byob" });
const chunks: Uint8Array[] = [];
while (true) {
const { done, value } = await reader.read(new Uint8Array(1024));
if (done) break;
chunks.push(value);
}
reader.releaseLock();
const text = new TextDecoder().decode(Buffer.concat(chunks));
expect(text).toBe(content);
} finally {
require("fs").unlinkSync(path);
}
});
test("default reader still works on native streams after getReader fix", async () => {
using server = Bun.serve({
port: 0,
fetch() {
return new Response("default reader test");
},
});
const resp = await fetch(`http://localhost:${server.port}/`);
const reader = resp.body!.getReader();
const chunks: Uint8Array[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}
reader.releaseLock();
const text = new TextDecoder().decode(Buffer.concat(chunks));
expect(text).toBe("default reader test");
});