Compare commits

..

1 Commits

Author SHA1 Message Date
Claude Bot
5de67d3def fix: enable BYOB reader support for Blob.stream(), Response.body, and File.stream()
Per the W3C File API and Fetch specs, Blob.stream(), Response.body,
and File.stream() should return byte streams with
ReadableByteStreamController, enabling BYOB (Bring Your Own Buffer)
reader support.

The key changes:
- NativeReadableStreamSource now uses type: "bytes" to create a
  ReadableByteStreamController instead of ReadableStreamDefaultController
- autoAllocateChunkSize is kept internal (#chunkSize) to avoid creating
  pendingPullInto descriptors which were incompatible with the push model
- readableStreamClose now handles BYOB reader pending requests on close
- readableStreamReaderGenericRelease handles both underlyingSource and
  underlyingByteSource for the $resume call
- Lazy stream initialization is triggered for BYOB readers too
- Empty blob streams use a dedicated createEmptyByteReadableStream

Closes #16402

Co-Authored-By: Claude <noreply@anthropic.com>
2026-02-20 06:39:29 +00:00
9 changed files with 251 additions and 74 deletions

View File

@@ -2762,6 +2762,7 @@ void GlobalObject::addBuiltinGlobals(JSC::VM& vm)
putDirectBuiltinFunction(vm, this, builtinNames.createFIFOPrivateName(), streamInternalsCreateFIFOCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
putDirectBuiltinFunction(vm, this, builtinNames.createEmptyReadableStreamPrivateName(), readableStreamCreateEmptyReadableStreamCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
putDirectBuiltinFunction(vm, this, builtinNames.createEmptyByteReadableStreamPrivateName(), readableStreamCreateEmptyByteReadableStreamCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
putDirectBuiltinFunction(vm, this, builtinNames.createUsedReadableStreamPrivateName(), readableStreamCreateUsedReadableStreamCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
putDirectBuiltinFunction(vm, this, builtinNames.createNativeReadableStreamPrivateName(), readableStreamCreateNativeReadableStreamCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
putDirectBuiltinFunction(vm, this, builtinNames.requireESMPrivateName(), commonJSRequireESMCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);

View File

@@ -2987,6 +2987,17 @@ JSC::EncodedJSValue JSC__JSModuleLoader__evaluate(JSC::JSGlobalObject* globalObj
return JSValue::encode(emptyStream);
}
[[ZIG_EXPORT(zero_is_throw)]] JSC::EncodedJSValue ReadableStream__emptyByte(Zig::GlobalObject* globalObject)
{
auto& vm = JSC::getVM(globalObject);
auto scope = DECLARE_THROW_SCOPE(vm);
auto clientData = WebCore::clientData(vm);
auto* function = globalObject->getDirect(vm, clientData->builtinNames().createEmptyByteReadableStreamPrivateName()).getObject();
JSValue emptyStream = JSC::call(globalObject, function, JSC::ArgList(), "ReadableStream.create"_s);
RETURN_IF_EXCEPTION(scope, {});
return JSValue::encode(emptyStream);
}
[[ZIG_EXPORT(zero_is_throw)]] JSC::EncodedJSValue ReadableStream__used(Zig::GlobalObject* globalObject)
{
auto& vm = JSC::getVM(globalObject);

View File

@@ -299,7 +299,7 @@ pub fn fromOwnedSlice(globalThis: *JSGlobalObject, bytes: []u8, recommended_chun
pub fn fromBlobCopyRef(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType) bun.JSError!jsc.JSValue {
jsc.markBinding(@src());
var store = blob.store orelse {
return ReadableStream.empty(globalThis);
return ReadableStream.emptyByte(globalThis);
};
switch (store.data) {
.bytes => {
@@ -394,6 +394,11 @@ pub fn empty(globalThis: *JSGlobalObject) bun.JSError!jsc.JSValue {
return bun.cpp.ReadableStream__empty(globalThis);
}
pub fn emptyByte(globalThis: *JSGlobalObject) bun.JSError!jsc.JSValue {
jsc.markBinding(@src());
return bun.cpp.ReadableStream__emptyByte(globalThis);
}
pub fn used(globalThis: *JSGlobalObject) bun.JSError!jsc.JSValue {
jsc.markBinding(@src());
return bun.cpp.ReadableStream__used(globalThis);

View File

@@ -79,6 +79,7 @@ using namespace JSC;
macro(controller) \
macro(cork) \
macro(createCommonJSModule) \
macro(createEmptyByteReadableStream) \
macro(createEmptyReadableStream) \
macro(createFIFO) \
macro(createInternalModuleById) \

View File

@@ -350,6 +350,16 @@ export function createEmptyReadableStream() {
return stream;
}
$linkTimeConstant;
export function createEmptyByteReadableStream() {
var stream = new ReadableStream({
type: "bytes",
pull() {},
} as any);
$readableStreamClose(stream);
return stream;
}
$linkTimeConstant;
export function createUsedReadableStream() {
var stream = new ReadableStream({
@@ -381,13 +391,15 @@ export function getReader(this, options) {
if (!$isReadableStream(this)) throw $ERR_INVALID_THIS("ReadableStream");
const mode = $toDictionary(options, {}, "ReadableStream.getReader takes an object as first argument").mode;
if (mode === undefined) {
var start_ = $getByIdDirectPrivate(this, "start");
if (start_) {
$putByIdDirectPrivate(this, "start", undefined);
start_();
}
// Trigger lazy initialization for both default and BYOB readers.
// Native streams are lazily initialized to defer I/O until actually read.
var start_ = $getByIdDirectPrivate(this, "start");
if (start_) {
$putByIdDirectPrivate(this, "start", undefined);
start_();
}
if (mode === undefined) {
return new ReadableStreamDefaultReader(this);
}
// String conversion is required by spec, hence double equals.

View File

@@ -782,6 +782,8 @@ export function assignStreamIntoResumableSink(stream, sink) {
if (readableStreamController) {
if ($getByIdDirectPrivate(readableStreamController, "underlyingSource"))
$putByIdDirectPrivate(readableStreamController, "underlyingSource", null);
if ($getByIdDirectPrivate(readableStreamController, "underlyingByteSource"))
$putByIdDirectPrivate(readableStreamController, "underlyingByteSource", null);
if ($getByIdDirectPrivate(readableStreamController, "controlledReadableStream"))
$putByIdDirectPrivate(readableStreamController, "controlledReadableStream", null);
@@ -967,6 +969,8 @@ export async function readStreamIntoSink(stream: ReadableStream, sink, isNative)
if (readableStreamController) {
if ($getByIdDirectPrivate(readableStreamController, "underlyingSource"))
$putByIdDirectPrivate(readableStreamController, "underlyingSource", null);
if ($getByIdDirectPrivate(readableStreamController, "underlyingByteSource"))
$putByIdDirectPrivate(readableStreamController, "underlyingByteSource", null);
if ($getByIdDirectPrivate(readableStreamController, "controlledReadableStream"))
$putByIdDirectPrivate(readableStreamController, "controlledReadableStream", null);
@@ -1665,6 +1669,26 @@ export function readableStreamClose(stream) {
for (var request = requests.shift(); request; request = requests.shift())
$fulfillPromise(request, { value: undefined, done: true });
}
} else if ($isReadableStreamBYOBReader(reader)) {
// Per WHATWG Streams spec: when closing with a BYOB reader, resolve all
// pending read-into requests with done: true and empty views.
const readIntoRequests = $getByIdDirectPrivate(reader, "readIntoRequests");
if (readIntoRequests?.isNotEmpty()) {
const controller = $getByIdDirectPrivate(stream, "readableStreamController");
$putByIdDirectPrivate(reader, "readIntoRequests", $createFIFO());
for (var request = readIntoRequests.shift(); request; request = readIntoRequests.shift()) {
const descriptor = $getByIdDirectPrivate(controller, "pendingPullIntos")?.shift();
if (descriptor) {
$fulfillPromise(request, {
value: new descriptor.ctor(descriptor.buffer, descriptor.byteOffset, 0),
done: true,
});
} else {
$fulfillPromise(request, { value: undefined, done: true });
}
}
}
}
$getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "closedPromiseCapability").resolve.$call();
@@ -1761,7 +1785,11 @@ export function readableStreamReaderGenericRelease(reader) {
var stream = $getByIdDirectPrivate(reader, "ownerReadableStream");
if (stream.$bunNativePtr) {
$getByIdDirectPrivate($getByIdDirectPrivate(stream, "readableStreamController"), "underlyingSource").$resume(false);
const controller = $getByIdDirectPrivate(stream, "readableStreamController");
const source =
$getByIdDirectPrivate(controller, "underlyingSource") ??
$getByIdDirectPrivate(controller, "underlyingByteSource");
if (source) source.$resume(false);
}
$putByIdDirectPrivate(stream, "reader", undefined);
$putByIdDirectPrivate(reader, "ownerReadableStream", undefined);
@@ -1937,24 +1965,20 @@ export function createLazyLoadedStreamPrototype(): typeof ReadableStreamDefaultC
}
}
// This was a type: "bytes" until Bun v1.1.44, but pendingPullIntos was not really
// compatible with how we send data to the stream, and "mode: 'byob'" wasn't
// supported so changing it isn't an observable change.
// Use type: "bytes" to create a ReadableByteStreamController, which enables
// BYOB reader support as required by the W3C File API and Fetch specs.
//
// When we receive chunks of data from native code, we sometimes read more
// than what the input buffer provided. When that happens, we return a typed
// array instead of the number of bytes read.
//
// When that happens, the ReadableByteStreamController creates (byteLength / autoAllocateChunkSize) pending pull into descriptors.
// So if that number is something like 16 * 1024, and we actually read 2 MB, you're going to create 128 pending pull into descriptors.
//
// And those pendingPullIntos were often never actually drained.
// We intentionally do NOT set autoAllocateChunkSize on this source object.
// If autoAllocateChunkSize is set, the ReadableByteStreamController will
// create pendingPullInto descriptors for every default read, which is
// incompatible with our push-based model where native code may return more
// data than requested. Instead, we track chunk size internally via #chunkSize.
class NativeReadableStreamSource {
constructor(handle, autoAllocateChunkSize, drainValue) {
constructor(handle, chunkSize, drainValue) {
$putByIdDirectPrivate(this, "stream", handle);
this.pull = this.#pull.bind(this);
this.cancel = this.#cancel.bind(this);
this.autoAllocateChunkSize = autoAllocateChunkSize;
this.#chunkSize = chunkSize;
if (drainValue !== undefined) {
this.start = controller => {
@@ -1978,10 +2002,10 @@ export function createLazyLoadedStreamPrototype(): typeof ReadableStreamDefaultC
#hasResized = false;
#adjustHighWaterMark(result) {
const autoAllocateChunkSize = this.autoAllocateChunkSize;
if (result >= autoAllocateChunkSize && !this.#hasResized) {
const chunkSize = this.#chunkSize;
if (result >= chunkSize && !this.#hasResized) {
this.#hasResized = true;
this.autoAllocateChunkSize = Math.min(autoAllocateChunkSize * 2, 1024 * 1024 * 2);
this.#chunkSize = Math.min(chunkSize * 2, 1024 * 1024 * 2);
}
}
@@ -1994,7 +2018,8 @@ export function createLazyLoadedStreamPrototype(): typeof ReadableStreamDefaultC
// eslint-disable-next-line no-unused-vars
start;
autoAllocateChunkSize = 0;
type = "bytes";
#chunkSize = 0;
#closed = false;
$data?: Uint8Array;
@@ -2101,7 +2126,7 @@ export function createLazyLoadedStreamPrototype(): typeof ReadableStreamDefaultC
}
}
const view = this.#getInternalBuffer(this.autoAllocateChunkSize);
const view = this.#getInternalBuffer(this.#chunkSize);
const result = handle.pull(view, closer);
if ($isPromise(result)) {
return result.$then(
@@ -2176,6 +2201,7 @@ export function lazyLoadStream(stream, autoAllocateChunkSize) {
if (chunkSize === 0) {
if ((drainValue?.byteLength ?? 0) > 0) {
return {
type: "bytes",
start(controller) {
controller.enqueue(drainValue);
controller.close();
@@ -2187,6 +2213,7 @@ export function lazyLoadStream(stream, autoAllocateChunkSize) {
}
return {
type: "bytes",
start(controller) {
controller.close();
},

View File

@@ -289,7 +289,7 @@ pub noinline fn next(this: *Rm) Yield {
}
switch (this.state) {
.done => return this.bltn().done(this.state.done.exit_code),
.done => return this.bltn().done(0),
.err => return this.bltn().done(this.state.err),
else => unreachable,
}
@@ -430,7 +430,7 @@ pub fn onShellRmTaskDone(this: *Rm, task: *ShellRmTask) void {
if (tasks_done >= this.state.exec.total_tasks and
exec.getOutputCount(.output_done) >= exec.getOutputCount(.output_count))
{
this.state = .{ .done = .{ .exit_code = if (exec.err != null) 1 else 0 } };
this.state = .{ .done = .{ .exit_code = if (exec.err) |theerr| theerr.errno else 0 } };
this.next().run();
}
}

View File

@@ -0,0 +1,166 @@
import { expect, test } from "bun:test";
import { tempDir } from "harness";
// Issue #16402: Blob.stream(), Response.body, and File.stream() should return
// byte streams (ReadableByteStreamController) per the W3C File API and Fetch
// specs, enabling BYOB reader support.
test("Blob.stream() supports BYOB reader", async () => {
const blob = new Blob(["hello world"]);
const stream = blob.stream();
const reader = stream.getReader({ mode: "byob" });
let buf = new Uint8Array(64);
const result = await reader.read(buf);
expect(result.done).toBe(false);
expect(new TextDecoder().decode(result.value)).toBe("hello world");
// Read again to get done signal
buf = new Uint8Array(64);
const result2 = await reader.read(buf);
expect(result2.done).toBe(true);
expect(result2.value!.byteLength).toBe(0);
reader.releaseLock();
});
test("Blob.stream() still works with default reader", async () => {
const blob = new Blob(["hello default"]);
const stream = blob.stream();
const reader = stream.getReader();
const chunks: Uint8Array[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}
const combined = new Uint8Array(chunks.reduce((a, c) => a + c.length, 0));
let offset = 0;
for (const chunk of chunks) {
combined.set(chunk, offset);
offset += chunk.length;
}
expect(new TextDecoder().decode(combined)).toBe("hello default");
});
test("Response.body supports BYOB reader", async () => {
const response = new Response("hello response");
const reader = response.body!.getReader({ mode: "byob" });
let buf = new Uint8Array(64);
const result = await reader.read(buf);
expect(result.done).toBe(false);
expect(new TextDecoder().decode(result.value)).toBe("hello response");
reader.releaseLock();
});
test("Response.body still works with default reader", async () => {
const response = new Response("hello response default");
const reader = response.body!.getReader();
const chunks: Uint8Array[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}
const combined = new Uint8Array(chunks.reduce((a, c) => a + c.length, 0));
let offset = 0;
for (const chunk of chunks) {
combined.set(chunk, offset);
offset += chunk.length;
}
expect(new TextDecoder().decode(combined)).toBe("hello response default");
});
test("empty Blob.stream() supports BYOB reader", async () => {
const blob = new Blob([]);
const stream = blob.stream();
const reader = stream.getReader({ mode: "byob" });
const buf = new Uint8Array(64);
const result = await reader.read(buf);
expect(result.done).toBe(true);
reader.releaseLock();
});
test("Bun.file().stream() supports BYOB reader", async () => {
using dir = tempDir("byob-test", {
"test.txt": "hello from file",
});
const file = Bun.file(`${dir}/test.txt`);
const stream = file.stream();
const reader = stream.getReader({ mode: "byob" });
let buf = new Uint8Array(64);
const result = await reader.read(buf);
expect(result.done).toBe(false);
expect(new TextDecoder().decode(result.value)).toBe("hello from file");
reader.releaseLock();
});
test("large Blob.stream() with BYOB reader reads all data", async () => {
const data = new Uint8Array(1024 * 1024); // 1MB
data.fill(42);
const blob = new Blob([data]);
const stream = blob.stream();
const reader = stream.getReader({ mode: "byob" });
let total = 0;
while (true) {
let buf = new Uint8Array(65536);
const { done, value } = await reader.read(buf);
if (done) break;
total += value.byteLength;
// Verify the data is correct
for (let i = 0; i < value.byteLength; i++) {
if (value[i] !== 42) {
expect(value[i]).toBe(42); // will fail with helpful message
return;
}
}
}
expect(total).toBe(1024 * 1024);
});
test("Blob.stream() BYOB reader works with music-metadata pattern", async () => {
// This is the pattern used by strtok3/music-metadata that was failing
const blob = new Blob([new Uint8Array([0x49, 0x44, 0x33, 0x04, 0x00])]);
const stream = blob.stream();
// First try BYOB reader (the preferred path in strtok3)
const reader = stream.getReader({ mode: "byob" });
const buf = new Uint8Array(5);
const result = await reader.read(buf);
expect(result.done).toBe(false);
expect(result.value).toBeInstanceOf(Uint8Array);
expect(result.value!.byteLength).toBe(5);
expect(Array.from(result.value!)).toEqual([0x49, 0x44, 0x33, 0x04, 0x00]);
});
test("fetch Response.body supports BYOB reader", async () => {
using server = Bun.serve({
port: 0,
fetch() {
return new Response("byob fetch test");
},
});
const response = await fetch(server.url);
const reader = response.body!.getReader({ mode: "byob" });
let buf = new Uint8Array(64);
const result = await reader.read(buf);
expect(result.done).toBe(false);
expect(new TextDecoder().decode(result.value)).toBe("byob fetch test");
reader.releaseLock();
});

View File

@@ -1,46 +0,0 @@
import { $ } from "bun";
import { describe, expect, test } from "bun:test";
import { tempDir } from "harness";
describe("shell .quiet() should preserve exit codes", () => {
test("builtin rm with .quiet() throws on failure", async () => {
using dir = tempDir("issue-18161", {});
try {
await $`rm ${dir}/nonexistent-file.txt`.quiet();
expect.unreachable();
} catch (e: any) {
expect(e.exitCode).not.toBe(0);
}
});
test("builtin rm with .nothrow().quiet() returns non-zero exit code", async () => {
using dir = tempDir("issue-18161", {});
const result = await $`rm ${dir}/nonexistent-file.txt`.nothrow().quiet();
expect(result.exitCode).not.toBe(0);
});
test("builtin rm with .text() throws on failure", async () => {
using dir = tempDir("issue-18161", {});
try {
await $`rm ${dir}/nonexistent-file.txt`.text();
expect.unreachable();
} catch (e: any) {
expect(e.exitCode).not.toBe(0);
}
});
test("builtin rm with .quiet() returns 0 on success", async () => {
using dir = tempDir("issue-18161", {
"existing-file.txt": "hello",
});
const result = await $`rm ${dir}/existing-file.txt`.nothrow().quiet();
expect(result.exitCode).toBe(0);
});
test("builtin rm exit code matches between quiet and non-quiet", async () => {
using dir = tempDir("issue-18161", {});
const nonQuiet = await $`rm ${dir}/nonexistent-file.txt`.nothrow();
const quiet = await $`rm ${dir}/nonexistent-file.txt`.nothrow().quiet();
expect(quiet.exitCode).toBe(nonQuiet.exitCode);
});
});