mirror of
https://github.com/oven-sh/bun
synced 2026-02-11 11:29:02 +00:00
Rewrite ReadableStream.from() using WebKit reference implementation
This commit rewrites the ReadableStream.from() implementation to closely follow the WebKit reference implementation, improving spec compliance and simplifying the code structure. Key changes: - Use WebKit's algorithm structure for iterator handling - Simplify async/sync iterator detection and setup - Use regular ReadableStream constructor with highWaterMark: 0 - Properly handle promise values from sync iterators - Maintain proper error handling and validation - All existing tests continue to pass 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -520,204 +520,83 @@ export function from(asyncIterable) {
|
||||
throw new TypeError("ReadableStream.from() takes a non-null value");
|
||||
}
|
||||
|
||||
// Check if it's already a ReadableStream
|
||||
// Check if it's already a ReadableStream - return it directly
|
||||
if ($isReadableStream(asyncIterable)) {
|
||||
return asyncIterable;
|
||||
}
|
||||
|
||||
// Handle iterables (sync and async) - follow WHATWG spec algorithm
|
||||
let asyncIteratorMethod = asyncIterable[globalThis.Symbol.asyncIterator];
|
||||
let iteratorMethod = asyncIterable[globalThis.Symbol.iterator];
|
||||
|
||||
// Step 1: Prefer async iterator if available and not null
|
||||
if (asyncIteratorMethod != null && asyncIteratorMethod !== null) {
|
||||
if (typeof asyncIteratorMethod !== "function") {
|
||||
throw new TypeError("ReadableStream.from() argument's @@asyncIterator method must be a function");
|
||||
let iterator;
|
||||
let isAsync = false;
|
||||
|
||||
// Handle async iterables first (following WebKit reference implementation)
|
||||
const asyncIteratorMethod = asyncIterable[globalThis.Symbol.asyncIterator];
|
||||
if (!$isUndefinedOrNull(asyncIteratorMethod)) {
|
||||
if (!$isCallable(asyncIteratorMethod)) {
|
||||
throw new TypeError("ReadableStream.from requires that the property of the first argument, iterable[Symbol.asyncIterator], when exists, be a function");
|
||||
}
|
||||
|
||||
// Check if asyncIterator method returns the same object (self-iterator pattern)
|
||||
let testIterator;
|
||||
try {
|
||||
testIterator = asyncIteratorMethod.$call(asyncIterable);
|
||||
if (!$isObject(testIterator)) {
|
||||
throw new TypeError("ReadableStream.from() argument's @@asyncIterator method must return an object");
|
||||
}
|
||||
} catch (error) {
|
||||
// Re-throw validation errors immediately
|
||||
throw error;
|
||||
iterator = asyncIteratorMethod.$call(asyncIterable);
|
||||
if (!$isObject(iterator)) {
|
||||
throw new TypeError("The return value of asyncIterable[Symbol.asyncIterator] must be an object.");
|
||||
}
|
||||
|
||||
let returnsSelf = testIterator === asyncIterable;
|
||||
|
||||
if (returnsSelf) {
|
||||
// For self-iterator pattern, defer everything to first pull to avoid timing issues
|
||||
let iterator;
|
||||
let iteratorCreated = false;
|
||||
|
||||
return new ReadableStream({
|
||||
async pull(controller) {
|
||||
if (!iteratorCreated) {
|
||||
try {
|
||||
iterator = asyncIteratorMethod.$call(asyncIterable);
|
||||
if (!$isObject(iterator)) {
|
||||
throw new TypeError("ReadableStream.from() argument's @@asyncIterator method must return an object");
|
||||
}
|
||||
iteratorCreated = true;
|
||||
} catch (error) {
|
||||
controller.error(error);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await iterator.next();
|
||||
if (!$isObject(result)) {
|
||||
throw new TypeError("Iterator result must be an object");
|
||||
}
|
||||
|
||||
if (result.done) {
|
||||
controller.close();
|
||||
} else {
|
||||
controller.enqueue(result.value);
|
||||
}
|
||||
} catch (error) {
|
||||
controller.error(error);
|
||||
}
|
||||
},
|
||||
|
||||
async cancel(reason) {
|
||||
if (iteratorCreated && iterator && iterator.return) {
|
||||
if (typeof iterator.return !== "function") {
|
||||
throw new TypeError("Iterator return() method must be a function");
|
||||
}
|
||||
try {
|
||||
const result = await iterator.return(reason);
|
||||
if (!$isObject(result)) {
|
||||
throw new TypeError("Iterator return() method must return an object");
|
||||
}
|
||||
} catch (error) {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Normal validation during construction for non-self-iterator cases
|
||||
let iterator;
|
||||
try {
|
||||
iterator = asyncIteratorMethod.$call(asyncIterable);
|
||||
if (!$isObject(iterator)) {
|
||||
throw new TypeError("ReadableStream.from() argument's @@asyncIterator method must return an object");
|
||||
}
|
||||
} catch (error) {
|
||||
// All errors from calling iterator method should be re-thrown synchronously
|
||||
throw error;
|
||||
}
|
||||
|
||||
return new ReadableStream({
|
||||
async pull(controller) {
|
||||
try {
|
||||
const result = await iterator.next();
|
||||
if (!$isObject(result)) {
|
||||
throw new TypeError("Iterator result must be an object");
|
||||
}
|
||||
|
||||
if (result.done) {
|
||||
controller.close();
|
||||
} else {
|
||||
controller.enqueue(result.value);
|
||||
}
|
||||
} catch (error) {
|
||||
controller.error(error);
|
||||
}
|
||||
},
|
||||
|
||||
async cancel(reason) {
|
||||
if (iterator && iterator.return) {
|
||||
if (typeof iterator.return !== "function") {
|
||||
throw new TypeError("Iterator return() method must be a function");
|
||||
}
|
||||
try {
|
||||
const result = await iterator.return(reason);
|
||||
if (!$isObject(result)) {
|
||||
throw new TypeError("Iterator return() method must return an object");
|
||||
}
|
||||
} catch (error) {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
} else if (iteratorMethod != null) {
|
||||
// Step 2: Use sync iterator
|
||||
if (typeof iteratorMethod !== "function") {
|
||||
throw new TypeError("ReadableStream.from() argument's @@iterator method must be a function");
|
||||
}
|
||||
|
||||
// Validate iterator method returns an object during construction
|
||||
// ALL errors from iterator method should be re-thrown synchronously
|
||||
let iterator;
|
||||
try {
|
||||
iterator = iteratorMethod.$call(asyncIterable);
|
||||
if (!$isObject(iterator)) {
|
||||
throw new TypeError("ReadableStream.from() argument's @@iterator method must return an object");
|
||||
}
|
||||
} catch (error) {
|
||||
// All errors from calling iterator method should be re-thrown synchronously
|
||||
throw error;
|
||||
}
|
||||
|
||||
// Store the iterator we created during validation for reuse
|
||||
let iteratorCreated = true;
|
||||
|
||||
return new ReadableStream({
|
||||
start(controller) {
|
||||
// Store iterator for use in pull
|
||||
this.iterator = iterator;
|
||||
},
|
||||
|
||||
async pull(controller) {
|
||||
try {
|
||||
const result = this.iterator.next();
|
||||
if (!$isObject(result)) {
|
||||
throw new TypeError("Iterator result must be an object");
|
||||
}
|
||||
|
||||
if (result.done) {
|
||||
controller.close();
|
||||
} else {
|
||||
// Await the value if it's a promise
|
||||
const value = result.value && typeof result.value.then === "function"
|
||||
? await result.value
|
||||
: result.value;
|
||||
controller.enqueue(value);
|
||||
}
|
||||
} catch (error) {
|
||||
controller.error(error);
|
||||
}
|
||||
},
|
||||
|
||||
async cancel(reason) {
|
||||
// Use the iterator stored in start
|
||||
if (this.iterator && this.iterator.return) {
|
||||
if (typeof this.iterator.return !== "function") {
|
||||
throw new TypeError("Iterator return() method must be a function");
|
||||
}
|
||||
try {
|
||||
const result = this.iterator.return(reason);
|
||||
// Handle both sync and async return methods
|
||||
const actualResult = result && typeof result.then === "function" ? await result : result;
|
||||
if (!$isObject(actualResult)) {
|
||||
throw new TypeError("Iterator return() method must return an object");
|
||||
}
|
||||
} catch (error) {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
isAsync = true;
|
||||
} else {
|
||||
throw new TypeError("ReadableStream.from() argument must be an iterable or async iterable");
|
||||
// Fall back to sync iterator
|
||||
const iteratorMethod = asyncIterable[globalThis.Symbol.iterator];
|
||||
if (!$isCallable(iteratorMethod)) {
|
||||
throw new TypeError("ReadableStream.from requires that the property of the first argument, iterable[Symbol.iterator], when exists, be a function");
|
||||
}
|
||||
const syncIterator = iteratorMethod.$call(asyncIterable);
|
||||
if (!$isObject(syncIterator)) {
|
||||
throw new TypeError("The return value of asyncIterable[Symbol.iterator] must be an object.");
|
||||
}
|
||||
iterator = syncIterator;
|
||||
isAsync = false;
|
||||
}
|
||||
|
||||
const nextMethod = iterator.next;
|
||||
|
||||
// Use regular ReadableStream constructor with high water mark 0
|
||||
return new ReadableStream({
|
||||
async pull(controller) {
|
||||
let result;
|
||||
try {
|
||||
result = nextMethod.$call(iterator);
|
||||
} catch (e) {
|
||||
return Promise.reject(e);
|
||||
}
|
||||
|
||||
const iterResult = await Promise.resolve(result);
|
||||
if (!$isObject(iterResult)) {
|
||||
throw new TypeError("The result of calling next on an iterator was not an object.");
|
||||
}
|
||||
if (iterResult.done) {
|
||||
controller.close();
|
||||
} else {
|
||||
// For sync iterators, we need to potentially await the value if it's a promise
|
||||
let value = iterResult.value;
|
||||
if (!isAsync && value && typeof value.then === "function") {
|
||||
value = await value;
|
||||
}
|
||||
controller.enqueue(value);
|
||||
}
|
||||
},
|
||||
|
||||
cancel(reason) {
|
||||
try {
|
||||
const returnMethod = iterator.return;
|
||||
if ($isUndefinedOrNull(returnMethod))
|
||||
return Promise.resolve(undefined);
|
||||
if (!$isCallable(returnMethod))
|
||||
throw new TypeError("iterator.return was present but not callable");
|
||||
const returnResult = returnMethod.$call(iterator, reason);
|
||||
return Promise.resolve(returnResult).then((iterResult) => {
|
||||
if (!$isObject(iterResult)) {
|
||||
throw new TypeError("The result of calling return on an iterator was not an object.");
|
||||
}
|
||||
});
|
||||
} catch (e) {
|
||||
return Promise.reject(e);
|
||||
}
|
||||
}
|
||||
}, { highWaterMark: 0 });
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user