ReadableStream & ReadableByteStream fixes (#19683)

This commit is contained in:
pfg
2025-05-16 22:30:58 -07:00
committed by GitHub
parent 95af099a0c
commit 89c5e40544
10 changed files with 390 additions and 35 deletions

View File

@@ -703,6 +703,9 @@ declare function $ERR_INVALID_CHAR(name, field?): TypeError;
declare function $ERR_HTTP_INVALID_HEADER_VALUE(value: string, name: string): TypeError;
declare function $ERR_HTTP_HEADERS_SENT(action: string): Error;
declare function $ERR_INVALID_PROTOCOL(proto, expected): TypeError;
declare function $ERR_INVALID_STATE(message: string): Error;
declare function $ERR_INVALID_STATE_TypeError(message: string): TypeError;
declare function $ERR_INVALID_STATE_RangeError(message: string): RangeError;
declare function $ERR_UNESCAPED_CHARACTERS(arg): TypeError;
declare function $ERR_HTTP_INVALID_STATUS_CODE(code): RangeError;
declare function $ERR_UNHANDLED_ERROR(err?): Error;

View File

@@ -33,13 +33,13 @@ export function initializeReadableByteStreamController(this, stream, underlyingB
export function enqueue(this: ReadableByteStreamController, chunk: ArrayBufferView) {
if (!$isReadableByteStreamController(this)) throw $ERR_INVALID_THIS("ReadableByteStreamController");
if ($getByIdDirectPrivate(this, "closeRequested"))
throw new TypeError("ReadableByteStreamController is requested to close");
if ($getByIdDirectPrivate(this, "closeRequested")) throw $ERR_INVALID_STATE_TypeError("Controller is already closed");
if ($getByIdDirectPrivate($getByIdDirectPrivate(this, "controlledReadableStream"), "state") !== $streamReadable)
throw new TypeError("ReadableStream is not readable");
throw $ERR_INVALID_STATE_TypeError("Controller is already closed");
if (!$isObject(chunk) || !ArrayBuffer.$isView(chunk)) throw new TypeError("Provided chunk is not a TypedArray");
if (!$isObject(chunk) || !ArrayBuffer.$isView(chunk))
throw $ERR_INVALID_ARG_TYPE("buffer", "Buffer, TypedArray, or DataView", chunk);
return $readableByteStreamControllerEnqueue(this, chunk);
}
@@ -48,7 +48,7 @@ export function error(this: ReadableByteStreamController, error: any) {
if (!$isReadableByteStreamController(this)) throw $ERR_INVALID_THIS("ReadableByteStreamController");
if ($getByIdDirectPrivate($getByIdDirectPrivate(this, "controlledReadableStream"), "state") !== $streamReadable)
throw new TypeError("ReadableStream is not readable");
throw $ERR_INVALID_STATE_TypeError("Controller is already closed");
$readableByteStreamControllerError(this, error);
}
@@ -59,7 +59,7 @@ export function close(this: ReadableByteStreamController) {
if ($getByIdDirectPrivate(this, "closeRequested")) throw new TypeError("Close has already been requested");
if ($getByIdDirectPrivate($getByIdDirectPrivate(this, "controlledReadableStream"), "state") !== $streamReadable)
throw new TypeError("ReadableStream is not readable");
throw $ERR_INVALID_STATE_TypeError("Controller is already closed");
$readableByteStreamControllerClose(this);
}

View File

@@ -96,7 +96,9 @@ export function isReadableByteStreamController(controller) {
export function isReadableStreamBYOBRequest(byobRequest) {
// Same test mechanism as in isReadableStreamDefaultController (ReadableStreamInternals.js).
// See corresponding function for explanations.
return $isObject(byobRequest) && !!$getByIdDirectPrivate(byobRequest, "associatedReadableByteStreamController");
return (
$isObject(byobRequest) && $getByIdDirectPrivate(byobRequest, "associatedReadableByteStreamController") !== undefined
);
}
export function isReadableStreamBYOBReader(reader) {
@@ -369,7 +371,7 @@ export function readableByteStreamControllerRespondWithNewView(controller, view)
if (firstDescriptor!.byteOffset + firstDescriptor!.bytesFilled !== view.byteOffset)
throw new RangeError("Invalid value for view.byteOffset");
if (firstDescriptor!.byteLength !== view.byteLength) throw new RangeError("Invalid value for view.byteLength");
if (firstDescriptor!.byteLength < view.byteLength) throw $ERR_INVALID_ARG_VALUE("view", view);
firstDescriptor!.buffer = view.buffer;
$readableByteStreamControllerRespondInternal(controller, view.byteLength);
@@ -536,7 +538,7 @@ export function readableByteStreamControllerShiftPendingDescriptor(controller):
export function readableByteStreamControllerInvalidateBYOBRequest(controller) {
if ($getByIdDirectPrivate(controller, "byobRequest") === undefined) return;
const byobRequest = $getByIdDirectPrivate(controller, "byobRequest");
$putByIdDirectPrivate(byobRequest, "associatedReadableByteStreamController", undefined);
$putByIdDirectPrivate(byobRequest, "associatedReadableByteStreamController", null);
$putByIdDirectPrivate(byobRequest, "view", undefined);
$putByIdDirectPrivate(controller, "byobRequest", undefined);
}

View File

@@ -38,7 +38,7 @@ export function cancel(this, reason) {
if (!$isReadableStreamBYOBReader(this)) return Promise.$reject($ERR_INVALID_THIS("ReadableStreamBYOBReader"));
if (!$getByIdDirectPrivate(this, "ownerReadableStream"))
return Promise.$reject($makeTypeError("cancel() called on a reader owned by no readable stream"));
return Promise.$reject($ERR_INVALID_STATE_TypeError("The reader is not attached to a stream"));
return $readableStreamReaderGenericCancel(this, reason);
}
@@ -47,11 +47,12 @@ export function read(this, view: DataView) {
if (!$isReadableStreamBYOBReader(this)) return Promise.$reject($ERR_INVALID_THIS("ReadableStreamBYOBReader"));
if (!$getByIdDirectPrivate(this, "ownerReadableStream"))
return Promise.$reject($makeTypeError("read() called on a reader owned by no readable stream"));
return Promise.$reject($ERR_INVALID_STATE_TypeError("The reader is not attached to a stream"));
if (!$isObject(view)) return Promise.$reject($makeTypeError("Provided view is not an object"));
if (!$isObject(view)) return Promise.$reject($ERR_INVALID_ARG_TYPE("view", "Buffer, TypedArray, or DataView", view));
if (!ArrayBuffer.$isView(view)) return Promise.$reject($makeTypeError("Provided view is not an ArrayBufferView"));
if (!ArrayBuffer.$isView(view))
return Promise.$reject($ERR_INVALID_ARG_TYPE("view", "Buffer, TypedArray, or DataView", view));
if (view.byteLength === 0) return Promise.$reject($makeTypeError("Provided view cannot have a 0 byteLength"));

View File

@@ -33,8 +33,8 @@ export function initializeReadableStreamBYOBRequest(this, controller, view) {
export function respond(this, bytesWritten) {
if (!$isReadableStreamBYOBRequest(this)) throw $ERR_INVALID_THIS("ReadableStreamBYOBRequest");
if ($getByIdDirectPrivate(this, "associatedReadableByteStreamController") === undefined)
throw new TypeError("ReadableStreamBYOBRequest.associatedReadableByteStreamController is undefined");
if ($getByIdDirectPrivate(this, "associatedReadableByteStreamController") == null)
throw $ERR_INVALID_STATE_TypeError("This BYOB request has been invalidated");
return $readableByteStreamControllerRespond(
$getByIdDirectPrivate(this, "associatedReadableByteStreamController"),
@@ -45,12 +45,12 @@ export function respond(this, bytesWritten) {
export function respondWithNewView(this, view) {
if (!$isReadableStreamBYOBRequest(this)) throw $ERR_INVALID_THIS("ReadableStreamBYOBRequest");
if ($getByIdDirectPrivate(this, "associatedReadableByteStreamController") === undefined)
throw new TypeError("ReadableStreamBYOBRequest.associatedReadableByteStreamController is undefined");
if ($getByIdDirectPrivate(this, "associatedReadableByteStreamController") == null)
throw $ERR_INVALID_STATE_TypeError("This BYOB request has been invalidated");
if (!$isObject(view)) throw new TypeError("Provided view is not an object");
if (!$isObject(view)) throw $ERR_INVALID_ARG_TYPE("view", "Buffer, TypedArray, or DataView", view);
if (!ArrayBuffer.$isView(view)) throw new TypeError("Provided view is not an ArrayBufferView");
if (!ArrayBuffer.$isView(view)) throw $ERR_INVALID_ARG_TYPE("view", "Buffer, TypedArray, or DataView", view);
return $readableByteStreamControllerRespondWithNewView(
$getByIdDirectPrivate(this, "associatedReadableByteStreamController"),
@@ -60,7 +60,7 @@ export function respondWithNewView(this, view) {
$getter;
export function view(this) {
if (!$isReadableStreamBYOBRequest(this)) throw $makeGetterTypeError("ReadableStreamBYOBRequest", "view");
if (!$isReadableStreamBYOBRequest(this)) throw $ERR_INVALID_THIS("ReadableStreamBYOBRequest");
return $getByIdDirectPrivate(this, "view");
}

View File

@@ -34,7 +34,7 @@ export function enqueue(this, chunk) {
if (!$isReadableStreamDefaultController(this)) throw $ERR_INVALID_THIS("ReadableStreamDefaultController");
if (!$readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
throw $ERR_INVALID_STATE("ReadableStreamDefaultController is not in a state where chunk can be enqueued");
throw $ERR_INVALID_STATE_TypeError("Controller is already closed");
}
return $readableStreamDefaultControllerEnqueue(this, chunk);
@@ -49,7 +49,7 @@ export function close(this) {
if (!$isReadableStreamDefaultController(this)) throw $ERR_INVALID_THIS("ReadableStreamDefaultController");
if (!$readableStreamDefaultControllerCanCloseOrEnqueue(this))
throw new TypeError("ReadableStreamDefaultController is not in a state where it can be closed");
throw $ERR_INVALID_STATE_TypeError("Controller is already closed");
$readableStreamDefaultControllerClose(this);
}

View File

@@ -37,7 +37,7 @@ export function cancel(this, reason) {
if (!$isReadableStreamDefaultReader(this)) return Promise.$reject($ERR_INVALID_THIS("ReadableStreamDefaultReader"));
if (!$getByIdDirectPrivate(this, "ownerReadableStream"))
return Promise.$reject(new TypeError("cancel() called on a reader owned by no readable stream"));
return Promise.$reject($ERR_INVALID_STATE_TypeError("The reader is not attached to a stream"));
return $readableStreamReaderGenericCancel(this, reason);
}
@@ -47,7 +47,7 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
throw new TypeError("ReadableStreamDefaultReader.readMany() should not be called directly");
const stream = $getByIdDirectPrivate(this, "ownerReadableStream");
if (!stream) throw new TypeError("readMany() called on a reader owned by no readable stream");
if (!stream) throw $ERR_INVALID_STATE_TypeError("The reader is not attached to a stream");
const state = $getByIdDirectPrivate(stream, "state");
stream.$disturbed = true;
@@ -172,7 +172,7 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
export function read(this) {
if (!$isReadableStreamDefaultReader(this)) return Promise.$reject($ERR_INVALID_THIS("ReadableStreamDefaultReader"));
if (!$getByIdDirectPrivate(this, "ownerReadableStream"))
return Promise.$reject(new TypeError("read() called on a reader owned by no readable stream"));
return Promise.$reject($ERR_INVALID_STATE_TypeError("The reader is not attached to a stream"));
return $readableStreamDefaultReaderRead(this);
}

View File

@@ -676,11 +676,11 @@ export function isReadableStreamDefaultController(controller) {
// However, since it is a private slot, it cannot be checked using hasOwnProperty().
// underlyingSource is obtained in ReadableStream constructor: if undefined, it is set
// to an empty object. Therefore, following test is ok.
return $isObject(controller) && !!$getByIdDirectPrivate(controller, "underlyingSource");
return $isObject(controller) && $getByIdDirectPrivate(controller, "underlyingSource") !== undefined;
}
export function readDirectStream(stream, sink, underlyingSource) {
$putByIdDirectPrivate(stream, "underlyingSource", undefined);
$putByIdDirectPrivate(stream, "underlyingSource", null); // doing this causes isReadableStreamDefaultController to return false
$putByIdDirectPrivate(stream, "start", undefined);
function close(stream, reason) {
const cancelFn = underlyingSource?.cancel;
@@ -849,13 +849,12 @@ export async function readStreamIntoSink(stream: ReadableStream, sink, isNative)
var readableStreamController = $getByIdDirectPrivate(stream, "readableStreamController");
if (readableStreamController) {
if ($getByIdDirectPrivate(readableStreamController, "underlyingSource"))
$putByIdDirectPrivate(readableStreamController, "underlyingSource", undefined);
$putByIdDirectPrivate(readableStreamController, "underlyingSource", null);
if ($getByIdDirectPrivate(readableStreamController, "controlledReadableStream"))
$putByIdDirectPrivate(readableStreamController, "controlledReadableStream", undefined);
$putByIdDirectPrivate(readableStreamController, "controlledReadableStream", null);
$putByIdDirectPrivate(stream, "readableStreamController", null);
if ($getByIdDirectPrivate(stream, "underlyingSource"))
$putByIdDirectPrivate(stream, "underlyingSource", undefined);
if ($getByIdDirectPrivate(stream, "underlyingSource")) $putByIdDirectPrivate(stream, "underlyingSource", null);
readableStreamController = undefined;
}
@@ -1264,7 +1263,7 @@ export function initializeTextStream(underlyingSource, highWaterMark: number) {
};
$putByIdDirectPrivate(this, "readableStreamController", controller);
$putByIdDirectPrivate(this, "underlyingSource", undefined);
$putByIdDirectPrivate(this, "underlyingSource", null);
$putByIdDirectPrivate(this, "start", undefined);
return closingPromise;
}
@@ -1324,7 +1323,7 @@ export function initializeArrayStream(underlyingSource, _highWaterMark: number)
};
$putByIdDirectPrivate(this, "readableStreamController", controller);
$putByIdDirectPrivate(this, "underlyingSource", undefined);
$putByIdDirectPrivate(this, "underlyingSource", null);
$putByIdDirectPrivate(this, "start", undefined);
return closingPromise;
}
@@ -1360,7 +1359,7 @@ export function initializeArrayBufferStream(underlyingSource, highWaterMark: num
};
$putByIdDirectPrivate(this, "readableStreamController", controller);
$putByIdDirectPrivate(this, "underlyingSource", undefined);
$putByIdDirectPrivate(this, "underlyingSource", null);
$putByIdDirectPrivate(this, "start", undefined);
}
@@ -2118,7 +2117,7 @@ export function readableStreamToArrayBufferDirect(
asUint8Array: boolean,
) {
var sink = new Bun.ArrayBufferSink();
$putByIdDirectPrivate(stream, "underlyingSource", undefined);
$putByIdDirectPrivate(stream, "underlyingSource", null);
var highWaterMark = $getByIdDirectPrivate(stream, "highWaterMark");
sink.start({ highWaterMark, asUint8Array });
var capability = $newPromiseCapability(Promise);

View File

@@ -0,0 +1,293 @@
// Flags: --expose-internals --no-warnings
'use strict';
const common = require('../common');
const assert = require('assert');
const {
ReadableStream,
ReadableByteStreamController,
ReadableStreamDefaultReader,
ReadableStreamBYOBReader,
ReadableStreamBYOBRequest,
} = require('stream/web');
let kState;
if(typeof Bun === "undefined") {
({
kState,
} = require('internal/webstreams/util'));
}
const {
open,
} = require('fs/promises');
const {
readFileSync,
} = require('fs');
const {
Buffer,
} = require('buffer');
const {
inspect,
} = require('util');
{
const r = new ReadableStream({
type: 'bytes',
});
if(typeof Bun === "undefined") {
assert(r[kState].controller instanceof ReadableByteStreamController);
}
assert.strictEqual(typeof r.locked, 'boolean');
assert.strictEqual(typeof r.cancel, 'function');
assert.strictEqual(typeof r.getReader, 'function');
assert.strictEqual(typeof r.pipeThrough, 'function');
assert.strictEqual(typeof r.pipeTo, 'function');
assert.strictEqual(typeof r.tee, 'function');
['', null, 'asdf'].forEach((mode) => {
assert.throws(() => r.getReader({ mode }), {
code: 'ERR_INVALID_ARG_VALUE',
});
});
[1, 'asdf'].forEach((options) => {
assert.throws(() => r.getReader(options), {
code: 'ERR_INVALID_ARG_TYPE',
});
});
assert(!r.locked);
const defaultReader = r.getReader();
assert(r.locked);
assert(defaultReader instanceof ReadableStreamDefaultReader);
defaultReader.releaseLock();
const byobReader = r.getReader({ mode: 'byob' });
assert(byobReader instanceof ReadableStreamBYOBReader);
assert.match(
inspect(byobReader, { depth: 0 }),
/ReadableStreamBYOBReader/);
}
class Source {
constructor() {
this.controllerClosed = false;
}
async start(controller) {
this.file = await open(__filename);
this.controller = controller;
}
async pull(controller) {
const byobRequest = controller.byobRequest;
assert.match(inspect(byobRequest), /ReadableStreamBYOBRequest/);
const view = byobRequest.view;
const {
bytesRead,
} = await this.file.read({
buffer: view,
offset: view.byteOffset,
length: view.byteLength
});
if (bytesRead === 0) {
await this.file.close();
this.controller.close();
}
assert.throws(() => byobRequest.respondWithNewView({}), {
code: 'ERR_INVALID_ARG_TYPE',
});
byobRequest.respond(bytesRead);
assert.throws(() => byobRequest.respond(bytesRead), {
code: 'ERR_INVALID_STATE',
});
assert.throws(() => byobRequest.respondWithNewView(view), {
code: 'ERR_INVALID_STATE',
});
}
get type() { return 'bytes'; }
get autoAllocateChunkSize() { return 1024; }
}
{
const stream = new ReadableStream(new Source());
if(typeof Bun === "undefined") {
assert(stream[kState].controller instanceof ReadableByteStreamController);
}
async function read(stream) {
const reader = stream.getReader({ mode: 'byob' });
const chunks = [];
let result;
do {
result = await reader.read(Buffer.alloc(100));
if (result.value !== undefined)
chunks.push(Buffer.from(result.value));
} while (!result.done);
return Buffer.concat(chunks);
}
read(stream).then(common.mustCall((data) => {
const check = readFileSync(__filename);
assert.deepStrictEqual(check, data);
}));
}
{
const stream = new ReadableStream(new Source());
if(typeof Bun === "undefined") {
assert(stream[kState].controller instanceof ReadableByteStreamController);
}
async function read(stream) {
const chunks = [];
for await (const chunk of stream)
chunks.push(chunk);
return Buffer.concat(chunks);
}
read(stream).then(common.mustCall((data) => {
const check = readFileSync(__filename);
assert.deepStrictEqual(check, data);
}));
}
{
const stream = new ReadableStream(new Source());
if(typeof Bun === "undefined") {
assert(stream[kState].controller instanceof ReadableByteStreamController);
}
async function read(stream) {
// eslint-disable-next-line no-unused-vars
for await (const _ of stream)
break;
}
read(stream).then(common.mustCall());
}
{
const stream = new ReadableStream(new Source());
if(typeof Bun === "undefined") {
assert(stream[kState].controller instanceof ReadableByteStreamController);
}
const error = new Error('boom');
async function read(stream) {
// eslint-disable-next-line no-unused-vars
for await (const _ of stream)
throw error;
}
assert.rejects(read(stream), error).then(common.mustCall());
}
{
assert.throws(() => {
Reflect.get(ReadableStreamBYOBRequest.prototype, 'view', {});
}, {
code: 'ERR_INVALID_THIS',
});
assert.throws(() => ReadableStreamBYOBRequest.prototype.respond.call({}), {
code: 'ERR_INVALID_THIS',
});
assert.throws(() => {
ReadableStreamBYOBRequest.prototype.respondWithNewView.call({});
}, {
code: 'ERR_INVALID_THIS',
});
}
{
const readable = new ReadableStream({ type: 'bytes' });
const reader = readable.getReader({ mode: 'byob' });
reader.releaseLock();
reader.releaseLock();
assert.rejects(reader.read(new Uint8Array(10)), {
code: 'ERR_INVALID_STATE',
}).then(common.mustCall());
assert.rejects(reader.cancel(), {
code: 'ERR_INVALID_STATE',
}).then(common.mustCall());
}
{
let controller;
new ReadableStream({
type: 'bytes',
start(c) { controller = c; }
});
assert.throws(() => controller.enqueue(1), {
code: 'ERR_INVALID_ARG_TYPE',
});
controller.close();
assert.throws(() => controller.enqueue(new Uint8Array(10)), {
code: 'ERR_INVALID_STATE',
});
assert.throws(() => controller.close(), {
code: 'ERR_INVALID_STATE',
});
}
{
let controller;
new ReadableStream({
type: 'bytes',
start(c) { controller = c; }
});
controller.enqueue(new Uint8Array(10));
controller.close();
assert.throws(() => controller.enqueue(new Uint8Array(10)), {
code: 'ERR_INVALID_STATE',
});
}
{
const stream = new ReadableStream({
type: 'bytes',
pull(c) {
const v = new Uint8Array(c.byobRequest.view.buffer, 0, 3);
v.set([20, 21, 22]);
c.byobRequest.respondWithNewView(v);
},
});
const buffer = new ArrayBuffer(10);
const view = new Uint8Array(buffer, 0, 3);
view.set([10, 11, 12]);
const reader = stream.getReader({ mode: 'byob' });
reader.read(view);
}
{
const stream = new ReadableStream({
type: 'bytes',
autoAllocateChunkSize: 10,
pull(c) {
const v = new Uint8Array(c.byobRequest.view.buffer, 0, 3);
v.set([20, 21, 22]);
c.byobRequest.respondWithNewView(v);
},
});
const reader = stream.getReader();
reader.read();
}

View File

@@ -0,0 +1,57 @@
test("ReadableStream", async () => {
const { resolve, promise } = Promise.withResolvers();
let controller: ReadableStreamDefaultController;
let stream = () =>
new ReadableStream({
start(controller1) {
controller = controller1;
controller1.close();
process.nextTick(resolve);
},
});
stream();
await promise;
expect(() => controller!.close()).toThrowError(
expect.objectContaining({
name: "TypeError",
message: "Invalid state: Controller is already closed",
code: "ERR_INVALID_STATE",
}),
);
});
test("server version", async () => {
const { resolve, promise } = Promise.withResolvers();
let controller: ReadableStreamDefaultController;
let stream = () =>
new ReadableStream({
start(controller1) {
controller = controller1;
controller.close();
process.nextTick(resolve);
},
});
// will start the server on default port 3000
const server = Bun.serve({
fetch(req) {
return new Response(stream());
},
});
await fetch(server.url, {});
await promise;
expect(() => controller!.close()).toThrowError(
expect.objectContaining({
name: "TypeError",
message: "Invalid state: Controller is already closed",
code: "ERR_INVALID_STATE",
}),
);
});