Compare commits

...

1 Commits

Author SHA1 Message Date
Claude Bot
2e120d5e8c feat(streams): implement ReadableStream.from() static method
Implements the WHATWG Streams spec ReadableStream.from() static method
that creates a ReadableStream from any iterable or async iterable.

Changes:
- Add readableStreamFrom() function in ReadableStream.ts
- Register the static method on the ReadableStream constructor in
  JSReadableStream.cpp
- Add type definitions in globals.d.ts and builtins.d.ts
- Add comprehensive tests for the new API

The implementation:
- Returns the same ReadableStream when passed a ReadableStream
- Supports sync iterables (arrays, Sets, Maps, generators, strings)
- Supports async iterables (async generators, custom async iterables)
- Properly handles iterator.return() on cancellation
- Correctly propagates errors from iterators

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-29 08:02:36 +00:00
5 changed files with 483 additions and 0 deletions

View File

@@ -71,6 +71,20 @@ declare var ReadableStream: Bun.__internal.UseLibDomIfAvailable<
prototype: ReadableStream;
new <R = any>(underlyingSource?: Bun.UnderlyingSource<R>, strategy?: QueuingStrategy<R>): ReadableStream<R>;
new <R = any>(underlyingSource?: Bun.DirectUnderlyingSource<R>, strategy?: QueuingStrategy<R>): ReadableStream<R>;
// TODO: Remove when TypeScript's lib.dom.d.ts includes ReadableStream.from()
// Tracking: https://github.com/microsoft/TypeScript-DOM-lib-generator/issues/1691
/**
* Creates a ReadableStream wrapping the provided iterable or async iterable.
* @param asyncIterable An iterable or async iterable to wrap in a ReadableStream.
* @returns A ReadableStream that reads from the provided iterable.
*/
from<R>(asyncIterable: AsyncIterable<R> | Iterable<R | Promise<R>>): ReadableStream<R>;
/**
* Creates a ReadableStream wrapping the provided iterable or async iterable.
* @param asyncIterable A ReadableStream to return as-is.
* @returns The same ReadableStream that was passed in.
*/
from<R>(asyncIterable: ReadableStream<R>): ReadableStream<R>;
}
>;

View File

@@ -153,6 +153,8 @@ template<> void JSReadableStreamDOMConstructor::initializeProperties(VM& vm, JSD
m_originalName.set(vm, this, nameString);
putDirect(vm, vm.propertyNames->name, nameString, JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::DontEnum);
putDirect(vm, vm.propertyNames->prototype, JSReadableStream::prototype(vm, globalObject), JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::DontEnum | JSC::PropertyAttribute::DontDelete);
// Add static ReadableStream.from() method per WHATWG Streams spec
putDirectBuiltinFunction(vm, &globalObject, vm.propertyNames->builtinNames().fromPublicName(), readableStreamReadableStreamFromCodeGenerator(vm), JSC::PropertyAttribute::DontEnum | JSC::PropertyAttribute::Function);
}
template<> FunctionExecutable* JSReadableStreamDOMConstructor::initializeExecutable(VM& vm)

View File

@@ -103,6 +103,7 @@ interface ReadableStream<R = any> extends _ReadableStream<R> {
declare var ReadableStream: {
prototype: ReadableStream;
new (): ReadableStream;
from<R>(asyncIterable: AsyncIterable<R> | Iterable<R | Promise<R>> | ReadableStream<R>): ReadableStream<R>;
};
interface Console {

View File

@@ -513,3 +513,117 @@ export function lazyAsyncIterator(this) {
$readableStreamDefineLazyIterators(prototype);
return prototype[globalThis.Symbol.asyncIterator].$call(this);
}
// ReadableStream.from() static method per WHATWG Streams spec
// https://streams.spec.whatwg.org/#rs-from
export function readableStreamFrom(asyncIterable) {
// 1. If asyncIterable is a ReadableStream, return asyncIterable
if ($isReadableStream(asyncIterable)) {
return asyncIterable;
}
let iteratorRecord;
let stream;
// 2. Let iteratorRecord be ? GetIterator(asyncIterable, async).
// This handles both async iterables and sync iterables (converted to async)
const asyncIteratorMethod = asyncIterable?.[globalThis.Symbol.asyncIterator];
if (asyncIteratorMethod !== undefined) {
// Async iterable path
if (!$isCallable(asyncIteratorMethod)) {
throw new TypeError("asyncIterable[Symbol.asyncIterator] is not a function");
}
iteratorRecord = asyncIteratorMethod.$call(asyncIterable);
if (!$isObject(iteratorRecord)) {
throw new TypeError("asyncIterable[Symbol.asyncIterator]() did not return an object");
}
} else {
// Try sync iterable path - the spec says GetIterator will convert sync to async
const syncIteratorMethod = asyncIterable?.[globalThis.Symbol.iterator];
if (syncIteratorMethod === undefined) {
throw new TypeError("The object is not iterable");
}
if (!$isCallable(syncIteratorMethod)) {
throw new TypeError("asyncIterable[Symbol.iterator] is not a function");
}
const syncIterator = syncIteratorMethod.$call(asyncIterable);
if (!$isObject(syncIterator)) {
throw new TypeError("asyncIterable[Symbol.iterator]() did not return an object");
}
// Wrap sync iterator as async iterator
iteratorRecord = {
next() {
try {
const result = syncIterator.next();
return Promise.$resolve(result);
} catch (e) {
return Promise.$reject(e);
}
},
return(value) {
const returnMethod = syncIterator.return;
if (returnMethod === undefined) {
return Promise.$resolve({ value, done: true });
}
try {
const result = returnMethod.$call(syncIterator, value);
return Promise.$resolve(result);
} catch (e) {
return Promise.$reject(e);
}
},
};
}
// 3. Let stream be a new ReadableStream.
// 4. Let startAlgorithm be an algorithm that returns undefined.
// 5. Let pullAlgorithm be the following steps...
// 6. Let cancelAlgorithm be the following steps...
// 7. Perform ! SetUpReadableStreamDefaultControllerFromUnderlyingSource(stream, {}, {}, pullAlgorithm, cancelAlgorithm, 0).
const iterator = iteratorRecord;
stream = new ReadableStream({
async pull(controller) {
let nextResult;
try {
nextResult = await iterator.next();
} catch (e) {
controller.error(e);
return;
}
if (!$isObject(nextResult)) {
controller.error(new TypeError("Iterator result is not an object"));
return;
}
if (nextResult.done) {
controller.close();
return;
}
controller.enqueue(nextResult.value);
},
cancel(reason) {
const returnMethod = iterator.return;
if (returnMethod === undefined) {
return Promise.$resolve();
}
return (async () => {
let returnResult;
try {
returnResult = await returnMethod.$call(iterator, reason);
} catch (e) {
throw e;
}
if (!$isObject(returnResult)) {
throw new TypeError("Iterator result is not an object");
}
})();
},
});
return stream;
}

View File

@@ -0,0 +1,352 @@
import { describe, expect, test } from "bun:test";
describe("ReadableStream.from", () => {
describe("basic functionality", () => {
test("exists as a static method", () => {
expect(typeof ReadableStream.from).toBe("function");
});
test("has correct function length (1 parameter)", () => {
expect(ReadableStream.from.length).toBe(1);
});
});
describe("sync iterables", () => {
test("works with arrays", async () => {
const stream = ReadableStream.from([1, 2, 3]);
const reader = stream.getReader();
const values: number[] = [];
let result;
while (!(result = await reader.read()).done) {
values.push(result.value);
}
expect(values).toEqual([1, 2, 3]);
});
test("works with Set", async () => {
const stream = ReadableStream.from(new Set(["a", "b", "c"]));
const reader = stream.getReader();
const values: string[] = [];
let result;
while (!(result = await reader.read()).done) {
values.push(result.value);
}
expect(values).toEqual(["a", "b", "c"]);
});
test("works with Map", async () => {
const stream = ReadableStream.from(
new Map([
["a", 1],
["b", 2],
]),
);
const reader = stream.getReader();
const values: [string, number][] = [];
let result;
while (!(result = await reader.read()).done) {
values.push(result.value);
}
expect(values).toEqual([
["a", 1],
["b", 2],
]);
});
test("works with generator functions", async () => {
function* gen() {
yield 1;
yield 2;
yield 3;
}
const stream = ReadableStream.from(gen());
const reader = stream.getReader();
const values: number[] = [];
let result;
while (!(result = await reader.read()).done) {
values.push(result.value);
}
expect(values).toEqual([1, 2, 3]);
});
test("works with string (iterating characters)", async () => {
const stream = ReadableStream.from("abc");
const reader = stream.getReader();
const values: string[] = [];
let result;
while (!(result = await reader.read()).done) {
values.push(result.value);
}
expect(values).toEqual(["a", "b", "c"]);
});
test("works with empty array", async () => {
const stream = ReadableStream.from([]);
const reader = stream.getReader();
const result = await reader.read();
expect(result.done).toBe(true);
});
});
describe("async iterables", () => {
test("works with async generators", async () => {
async function* asyncGen() {
yield "a";
yield "b";
yield "c";
}
const stream = ReadableStream.from(asyncGen());
const reader = stream.getReader();
const values: string[] = [];
let result;
while (!(result = await reader.read()).done) {
values.push(result.value);
}
expect(values).toEqual(["a", "b", "c"]);
});
test("works with async generators that yield promises", async () => {
async function* asyncGen() {
yield Promise.resolve(1);
yield Promise.resolve(2);
yield Promise.resolve(3);
}
const stream = ReadableStream.from(asyncGen());
const reader = stream.getReader();
const values: number[] = [];
let result;
while (!(result = await reader.read()).done) {
values.push(result.value);
}
// Promises should be yielded as-is, not awaited
expect(values).toEqual([1, 2, 3]);
});
test("works with custom async iterable", async () => {
const customAsyncIterable = {
[Symbol.asyncIterator]() {
let i = 0;
return {
async next() {
if (i < 3) {
return { value: i++, done: false };
}
return { done: true };
},
};
},
};
const stream = ReadableStream.from(customAsyncIterable);
const reader = stream.getReader();
const values: number[] = [];
let result;
while (!(result = await reader.read()).done) {
values.push(result.value);
}
expect(values).toEqual([0, 1, 2]);
});
});
describe("ReadableStream passthrough", () => {
test("returns the same ReadableStream when passed a ReadableStream", async () => {
const original = new ReadableStream({
start(controller) {
controller.enqueue("test");
controller.close();
},
});
const result = ReadableStream.from(original);
expect(result).toBe(original);
});
});
describe("error handling", () => {
test("throws TypeError for non-iterable values", () => {
expect(() => ReadableStream.from(123 as any)).toThrow(TypeError);
expect(() => ReadableStream.from(null as any)).toThrow(TypeError);
expect(() => ReadableStream.from(undefined as any)).toThrow(TypeError);
expect(() => ReadableStream.from({} as any)).toThrow(TypeError);
});
test("throws TypeError when Symbol.asyncIterator is not a function", () => {
const badAsyncIterable = {
[Symbol.asyncIterator]: "not a function",
};
expect(() => ReadableStream.from(badAsyncIterable as any)).toThrow(TypeError);
});
test("throws TypeError when Symbol.iterator is not a function", () => {
const badIterable = {
[Symbol.iterator]: "not a function",
};
expect(() => ReadableStream.from(badIterable as any)).toThrow(TypeError);
});
test("propagates errors from iterator.next()", async () => {
const errorIterable = {
[Symbol.iterator]() {
return {
next() {
throw new Error("iterator error");
},
};
},
};
const stream = ReadableStream.from(errorIterable);
const reader = stream.getReader();
await expect(reader.read()).rejects.toThrow("iterator error");
});
test("propagates errors from async iterator.next()", async () => {
const errorAsyncIterable = {
[Symbol.asyncIterator]() {
return {
async next() {
throw new Error("async iterator error");
},
};
},
};
const stream = ReadableStream.from(errorAsyncIterable);
const reader = stream.getReader();
await expect(reader.read()).rejects.toThrow("async iterator error");
});
});
describe("cancellation", () => {
test("calls iterator.return() on cancel for sync iterables", async () => {
let returnCalled = false;
const customIterable = {
[Symbol.iterator]() {
let i = 0;
return {
next() {
if (i++ < 10) return { value: i, done: false };
return { done: true, value: undefined };
},
return() {
returnCalled = true;
return { done: true, value: undefined };
},
};
},
};
const stream = ReadableStream.from(customIterable);
const reader = stream.getReader();
await reader.read();
await reader.cancel();
expect(returnCalled).toBe(true);
});
test("calls iterator.return() on cancel for async iterables", async () => {
let returnCalled = false;
const customAsyncIterable = {
[Symbol.asyncIterator]() {
let i = 0;
return {
async next() {
if (i++ < 10) return { value: i, done: false };
return { done: true, value: undefined };
},
async return() {
returnCalled = true;
return { done: true, value: undefined };
},
};
},
};
const stream = ReadableStream.from(customAsyncIterable);
const reader = stream.getReader();
await reader.read();
await reader.cancel();
expect(returnCalled).toBe(true);
});
test("does not error when iterator has no return method", async () => {
const customIterable = {
[Symbol.iterator]() {
let i = 0;
return {
next() {
if (i++ < 10) return { value: i, done: false };
return { done: true, value: undefined };
},
// No return method
};
},
};
const stream = ReadableStream.from(customIterable);
const reader = stream.getReader();
await reader.read();
// Should not throw
await reader.cancel();
});
});
describe("for await...of consumption", () => {
test("can be consumed with for await...of", async () => {
const stream = ReadableStream.from([1, 2, 3]);
const values: number[] = [];
for await (const value of stream) {
values.push(value);
}
expect(values).toEqual([1, 2, 3]);
});
});
describe("Node.js compatibility", () => {
test("works with array of promises (sync iterable with async values)", async () => {
const stream = ReadableStream.from([Promise.resolve(1), Promise.resolve(2), Promise.resolve(3)]);
const reader = stream.getReader();
const values: number[] = [];
let result;
while (!(result = await reader.read()).done) {
// Values should be promises that need to be awaited
values.push(await result.value);
}
expect(values).toEqual([1, 2, 3]);
});
});
});