mirror of
https://github.com/oven-sh/bun
synced 2026-02-15 05:12:29 +00:00
516 lines
17 KiB
TypeScript
516 lines
17 KiB
TypeScript
/*
|
|
* Copyright (C) 2015 Canon Inc.
|
|
* Copyright (C) 2015 Igalia.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions
|
|
* are met:
|
|
* 1. Redistributions of source code must retain the above copyright
|
|
* notice, this list of conditions and the following disclaimer.
|
|
* 2. Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
|
|
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
|
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
|
|
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
|
|
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
|
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
|
|
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
|
|
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
|
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
*/
|
|
|
|
export function initializeReadableStream(
|
|
this: ReadableStream,
|
|
underlyingSource: UnderlyingSource,
|
|
strategy: QueuingStrategy,
|
|
) {
|
|
if (underlyingSource === undefined) underlyingSource = { $bunNativePtr: undefined, $lazy: false } as UnderlyingSource;
|
|
if (strategy === undefined) strategy = {};
|
|
|
|
if (!$isObject(underlyingSource)) throw new TypeError("ReadableStream constructor takes an object as first argument");
|
|
|
|
if (strategy !== undefined && !$isObject(strategy))
|
|
throw new TypeError("ReadableStream constructor takes an object as second argument, if any");
|
|
|
|
$putByIdDirectPrivate(this, "state", $streamReadable);
|
|
|
|
$putByIdDirectPrivate(this, "reader", undefined);
|
|
|
|
$putByIdDirectPrivate(this, "storedError", undefined);
|
|
|
|
this.$disturbed = false;
|
|
|
|
// Initialized with null value to enable distinction with undefined case.
|
|
$putByIdDirectPrivate(this, "readableStreamController", null);
|
|
this.$bunNativePtr = $getByIdDirectPrivate(underlyingSource, "bunNativePtr") ?? undefined;
|
|
|
|
$putByIdDirectPrivate(this, "asyncContext", $getInternalField($asyncContext, 0));
|
|
|
|
const isDirect = underlyingSource.type === "direct";
|
|
// direct streams are always lazy
|
|
const isUnderlyingSourceLazy = !!underlyingSource.$lazy;
|
|
const isLazy = isDirect || isUnderlyingSourceLazy;
|
|
let pullFn;
|
|
|
|
// FIXME: We should introduce https://streams.spec.whatwg.org/#create-readable-stream.
|
|
// For now, we emulate this with underlyingSource with private properties.
|
|
if (!isLazy && (pullFn = $getByIdDirectPrivate(underlyingSource, "pull")) !== undefined) {
|
|
const size = $getByIdDirectPrivate(strategy, "size");
|
|
const highWaterMark = $getByIdDirectPrivate(strategy, "highWaterMark");
|
|
$putByIdDirectPrivate(this, "highWaterMark", highWaterMark);
|
|
$putByIdDirectPrivate(this, "underlyingSource", undefined);
|
|
$setupReadableStreamDefaultController(
|
|
this,
|
|
underlyingSource,
|
|
size,
|
|
highWaterMark !== undefined ? highWaterMark : 1,
|
|
$getByIdDirectPrivate(underlyingSource, "start"),
|
|
pullFn,
|
|
$getByIdDirectPrivate(underlyingSource, "cancel"),
|
|
);
|
|
|
|
return this;
|
|
}
|
|
if (isDirect) {
|
|
$putByIdDirectPrivate(this, "underlyingSource", underlyingSource);
|
|
$putByIdDirectPrivate(this, "highWaterMark", $getByIdDirectPrivate(strategy, "highWaterMark"));
|
|
$putByIdDirectPrivate(this, "start", () => $createReadableStreamController(this, underlyingSource, strategy));
|
|
} else if (isLazy) {
|
|
const autoAllocateChunkSize = underlyingSource.autoAllocateChunkSize;
|
|
$putByIdDirectPrivate(this, "highWaterMark", undefined);
|
|
$putByIdDirectPrivate(this, "underlyingSource", undefined);
|
|
$putByIdDirectPrivate(
|
|
this,
|
|
"highWaterMark",
|
|
autoAllocateChunkSize || $getByIdDirectPrivate(strategy, "highWaterMark"),
|
|
);
|
|
|
|
$putByIdDirectPrivate(this, "start", () => {
|
|
const instance = $lazyLoadStream(this, autoAllocateChunkSize);
|
|
if (instance) {
|
|
$createReadableStreamController(this, instance, strategy);
|
|
}
|
|
});
|
|
} else {
|
|
$putByIdDirectPrivate(this, "underlyingSource", undefined);
|
|
$putByIdDirectPrivate(this, "highWaterMark", $getByIdDirectPrivate(strategy, "highWaterMark"));
|
|
$putByIdDirectPrivate(this, "start", undefined);
|
|
$createReadableStreamController(this, underlyingSource, strategy);
|
|
}
|
|
|
|
return this;
|
|
}
|
|
|
|
$linkTimeConstant;
|
|
export function readableStreamToArray(stream: ReadableStream): Promise<unknown[]> {
|
|
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
|
|
// this is a direct stream
|
|
var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
|
|
if (underlyingSource !== undefined) {
|
|
return $readableStreamToArrayDirect(stream, underlyingSource);
|
|
}
|
|
if ($isReadableStreamLocked(stream)) return Promise.$reject($ERR_INVALID_STATE_TypeError("ReadableStream is locked"));
|
|
return $readableStreamIntoArray(stream);
|
|
}
|
|
|
|
$linkTimeConstant;
|
|
export function readableStreamToText(stream: ReadableStream): Promise<string> {
|
|
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
|
|
// this is a direct stream
|
|
var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
|
|
if (underlyingSource !== undefined) {
|
|
return $readableStreamToTextDirect(stream, underlyingSource);
|
|
}
|
|
if ($isReadableStreamLocked(stream)) return Promise.$reject($ERR_INVALID_STATE_TypeError("ReadableStream is locked"));
|
|
|
|
const result = $tryUseReadableStreamBufferedFastPath(stream, "text");
|
|
|
|
if (result) {
|
|
return result;
|
|
}
|
|
|
|
return $readableStreamIntoText(stream);
|
|
}
|
|
|
|
$linkTimeConstant;
|
|
export function readableStreamToArrayBuffer(stream: ReadableStream<ArrayBuffer>): Promise<ArrayBuffer> | ArrayBuffer {
|
|
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
|
|
// this is a direct stream
|
|
var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
|
|
if (underlyingSource !== undefined) {
|
|
return $readableStreamToArrayBufferDirect(stream, underlyingSource, false);
|
|
}
|
|
if ($isReadableStreamLocked(stream)) return Promise.$reject($ERR_INVALID_STATE_TypeError("ReadableStream is locked"));
|
|
|
|
let result = $tryUseReadableStreamBufferedFastPath(stream, "arrayBuffer");
|
|
|
|
if (result) {
|
|
return result;
|
|
}
|
|
|
|
result = Bun.readableStreamToArray(stream);
|
|
|
|
function toArrayBuffer(result: unknown[]) {
|
|
switch (result.length) {
|
|
case 0: {
|
|
return new ArrayBuffer(0);
|
|
}
|
|
case 1: {
|
|
const view = result[0];
|
|
if (view instanceof ArrayBuffer || view instanceof SharedArrayBuffer) {
|
|
return view;
|
|
}
|
|
|
|
if (ArrayBuffer.isView(view)) {
|
|
const buffer = view.buffer;
|
|
const byteOffset = view.byteOffset;
|
|
const byteLength = view.byteLength;
|
|
if (byteOffset === 0 && byteLength === buffer.byteLength) {
|
|
return buffer;
|
|
}
|
|
|
|
return buffer.slice(byteOffset, byteOffset + byteLength);
|
|
}
|
|
|
|
if (typeof view === "string") {
|
|
return new TextEncoder().encode(view);
|
|
}
|
|
}
|
|
default: {
|
|
let anyStrings = false;
|
|
for (const chunk of result) {
|
|
if (typeof chunk === "string") {
|
|
anyStrings = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!anyStrings) {
|
|
return Bun.concatArrayBuffers(result, false);
|
|
}
|
|
|
|
const sink = new Bun.ArrayBufferSink();
|
|
sink.start();
|
|
|
|
for (const chunk of result) {
|
|
sink.write(chunk);
|
|
}
|
|
|
|
return sink.end() as Uint8Array;
|
|
}
|
|
}
|
|
}
|
|
|
|
if ($isPromise(result)) {
|
|
const completedResult = Bun.peek(result);
|
|
if (completedResult !== result) {
|
|
result = completedResult;
|
|
} else {
|
|
return result.then(toArrayBuffer);
|
|
}
|
|
}
|
|
return $createFulfilledPromise(toArrayBuffer(result));
|
|
}
|
|
|
|
$linkTimeConstant;
|
|
export function readableStreamToBytes(stream: ReadableStream<ArrayBuffer>): Promise<Uint8Array> | Uint8Array {
|
|
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
|
|
// this is a direct stream
|
|
var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
|
|
|
|
if (underlyingSource !== undefined) {
|
|
return $readableStreamToArrayBufferDirect(stream, underlyingSource, true);
|
|
}
|
|
if ($isReadableStreamLocked(stream)) return Promise.$reject($ERR_INVALID_STATE_TypeError("ReadableStream is locked"));
|
|
|
|
let result = $tryUseReadableStreamBufferedFastPath(stream, "bytes");
|
|
|
|
if (result) {
|
|
return result;
|
|
}
|
|
|
|
result = Bun.readableStreamToArray(stream);
|
|
|
|
function toBytes(result: unknown[]) {
|
|
switch (result.length) {
|
|
case 0: {
|
|
return new Uint8Array(0);
|
|
}
|
|
case 1: {
|
|
const view = result[0];
|
|
if (view instanceof Uint8Array) {
|
|
return view;
|
|
}
|
|
|
|
if (ArrayBuffer.isView(view)) {
|
|
return new Uint8Array(view.buffer, view.byteOffset, view.byteLength);
|
|
}
|
|
|
|
if (view instanceof ArrayBuffer || view instanceof SharedArrayBuffer) {
|
|
return new Uint8Array(view);
|
|
}
|
|
|
|
if (typeof view === "string") {
|
|
return new TextEncoder().encode(view);
|
|
}
|
|
}
|
|
default: {
|
|
let anyStrings = false;
|
|
for (const chunk of result) {
|
|
if (typeof chunk === "string") {
|
|
anyStrings = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!anyStrings) {
|
|
return Bun.concatArrayBuffers(result, true);
|
|
}
|
|
|
|
const sink = new Bun.ArrayBufferSink();
|
|
sink.start({ asUint8Array: true });
|
|
|
|
for (const chunk of result) {
|
|
sink.write(chunk);
|
|
}
|
|
|
|
return sink.end() as Uint8Array;
|
|
}
|
|
}
|
|
}
|
|
|
|
if ($isPromise(result)) {
|
|
const completedResult = Bun.peek(result);
|
|
if (completedResult !== result) {
|
|
result = completedResult;
|
|
} else {
|
|
return result.then(toBytes);
|
|
}
|
|
}
|
|
|
|
return $createFulfilledPromise(toBytes(result));
|
|
}
|
|
|
|
$linkTimeConstant;
|
|
export function readableStreamToFormData(
|
|
stream: ReadableStream<ArrayBuffer>,
|
|
contentType: string | ArrayBuffer | ArrayBufferView,
|
|
): Promise<FormData> {
|
|
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
|
|
if ($isReadableStreamLocked(stream)) return Promise.$reject($ERR_INVALID_STATE_TypeError("ReadableStream is locked"));
|
|
return Bun.readableStreamToBlob(stream).then(blob => {
|
|
return FormData.from(blob, contentType);
|
|
});
|
|
}
|
|
|
|
$linkTimeConstant;
|
|
export function readableStreamToJSON(stream: ReadableStream): unknown {
|
|
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
|
|
if ($isReadableStreamLocked(stream)) return Promise.$reject($ERR_INVALID_STATE_TypeError("ReadableStream is locked"));
|
|
let result = $tryUseReadableStreamBufferedFastPath(stream, "json");
|
|
if (result) {
|
|
return result;
|
|
}
|
|
|
|
let text = Bun.readableStreamToText(stream);
|
|
const peeked = Bun.peek(text);
|
|
if (peeked !== text) {
|
|
try {
|
|
return $createFulfilledPromise(globalThis.JSON.parse(peeked));
|
|
} catch (e) {
|
|
return Promise.reject(e);
|
|
}
|
|
}
|
|
|
|
return text.then(globalThis.JSON.parse);
|
|
}
|
|
|
|
$linkTimeConstant;
|
|
export function readableStreamToBlob(stream: ReadableStream): Promise<Blob> {
|
|
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
|
|
if ($isReadableStreamLocked(stream)) return Promise.$reject($ERR_INVALID_STATE_TypeError("ReadableStream is locked"));
|
|
|
|
return (
|
|
$tryUseReadableStreamBufferedFastPath(stream, "blob") ||
|
|
Promise.resolve(Bun.readableStreamToArray(stream)).then(array => new Blob(array))
|
|
);
|
|
}
|
|
|
|
$linkTimeConstant;
|
|
export function createEmptyReadableStream() {
|
|
var stream = new ReadableStream({
|
|
pull() {},
|
|
} as any);
|
|
$readableStreamClose(stream);
|
|
return stream;
|
|
}
|
|
|
|
$linkTimeConstant;
|
|
export function createUsedReadableStream() {
|
|
var stream = new ReadableStream({
|
|
pull() {},
|
|
} as any);
|
|
stream.getReader();
|
|
return stream;
|
|
}
|
|
|
|
$linkTimeConstant;
|
|
export function createNativeReadableStream(nativePtr, autoAllocateChunkSize) {
|
|
$assert(nativePtr, "nativePtr must be a valid pointer");
|
|
return new ReadableStream({
|
|
$lazy: true,
|
|
$bunNativePtr: nativePtr,
|
|
autoAllocateChunkSize: autoAllocateChunkSize,
|
|
});
|
|
}
|
|
|
|
export function cancel(this, reason) {
|
|
if (!$isReadableStream(this)) return Promise.$reject($ERR_INVALID_THIS("ReadableStream"));
|
|
|
|
if ($isReadableStreamLocked(this)) return Promise.$reject($ERR_INVALID_STATE_TypeError("ReadableStream is locked"));
|
|
|
|
return $readableStreamCancel(this, reason);
|
|
}
|
|
|
|
export function getReader(this, options) {
|
|
if (!$isReadableStream(this)) throw $ERR_INVALID_THIS("ReadableStream");
|
|
|
|
const mode = $toDictionary(options, {}, "ReadableStream.getReader takes an object as first argument").mode;
|
|
if (mode === undefined) {
|
|
var start_ = $getByIdDirectPrivate(this, "start");
|
|
if (start_) {
|
|
$putByIdDirectPrivate(this, "start", undefined);
|
|
start_();
|
|
}
|
|
|
|
return new ReadableStreamDefaultReader(this);
|
|
}
|
|
// String conversion is required by spec, hence double equals.
|
|
if (mode == "byob") {
|
|
return new ReadableStreamBYOBReader(this);
|
|
}
|
|
|
|
throw $ERR_INVALID_ARG_VALUE("mode", mode, "byob");
|
|
}
|
|
|
|
export function pipeThrough(this, streams, options) {
|
|
const transforms = streams;
|
|
|
|
const readable = transforms["readable"];
|
|
if (!$isReadableStream(readable)) throw $makeTypeError("readable should be ReadableStream");
|
|
|
|
const writable = transforms["writable"];
|
|
const internalWritable = $getInternalWritableStream(writable);
|
|
if (!$isWritableStream(internalWritable)) throw $makeTypeError("writable should be WritableStream");
|
|
|
|
let preventClose = false;
|
|
let preventAbort = false;
|
|
let preventCancel = false;
|
|
let signal;
|
|
if (!$isUndefinedOrNull(options)) {
|
|
if (!$isObject(options)) throw $makeTypeError("options must be an object");
|
|
|
|
preventAbort = !!options["preventAbort"];
|
|
preventCancel = !!options["preventCancel"];
|
|
preventClose = !!options["preventClose"];
|
|
|
|
signal = options["signal"];
|
|
if (signal !== undefined && !$isAbortSignal(signal)) throw $makeTypeError("options.signal must be AbortSignal");
|
|
}
|
|
|
|
if (!$isReadableStream(this)) throw $ERR_INVALID_THIS("ReadableStream");
|
|
|
|
if ($isReadableStreamLocked(this)) throw $ERR_INVALID_STATE_TypeError("ReadableStream is locked");
|
|
|
|
if ($isWritableStreamLocked(internalWritable)) throw $makeTypeError("WritableStream is locked");
|
|
|
|
const promise = $readableStreamPipeToWritableStream(
|
|
this,
|
|
internalWritable,
|
|
preventClose,
|
|
preventAbort,
|
|
preventCancel,
|
|
signal,
|
|
);
|
|
$markPromiseAsHandled(promise);
|
|
|
|
return readable;
|
|
}
|
|
|
|
export function pipeTo(this, destination) {
|
|
if (!$isReadableStream(this)) return Promise.$reject($ERR_INVALID_THIS("ReadableStream"));
|
|
|
|
if ($isReadableStreamLocked(this)) return Promise.$reject($ERR_INVALID_STATE_TypeError("ReadableStream is locked"));
|
|
|
|
// FIXME: https://bugs.webkit.org/show_bug.cgi?id=159869.
|
|
// Built-in generator should be able to parse function signature to compute the function length correctly.
|
|
let options = $argument(1);
|
|
|
|
let preventClose = false;
|
|
let preventAbort = false;
|
|
let preventCancel = false;
|
|
let signal;
|
|
if (!$isUndefinedOrNull(options)) {
|
|
if (!$isObject(options)) return Promise.$reject($makeTypeError("options must be an object"));
|
|
|
|
try {
|
|
preventAbort = !!options["preventAbort"];
|
|
preventCancel = !!options["preventCancel"];
|
|
preventClose = !!options["preventClose"];
|
|
|
|
signal = options["signal"];
|
|
} catch (e) {
|
|
return Promise.$reject(e);
|
|
}
|
|
|
|
if (signal !== undefined && !$isAbortSignal(signal))
|
|
return Promise.$reject(new TypeError("options.signal must be AbortSignal"));
|
|
}
|
|
|
|
const internalDestination = $getInternalWritableStream(destination);
|
|
if (!$isWritableStream(internalDestination))
|
|
return Promise.$reject(new TypeError("ReadableStream pipeTo requires a WritableStream"));
|
|
|
|
if ($isWritableStreamLocked(internalDestination)) return Promise.$reject(new TypeError("WritableStream is locked"));
|
|
|
|
return $readableStreamPipeToWritableStream(
|
|
this,
|
|
internalDestination,
|
|
preventClose,
|
|
preventAbort,
|
|
preventCancel,
|
|
signal,
|
|
);
|
|
}
|
|
|
|
export function tee(this) {
|
|
if (!$isReadableStream(this)) throw $ERR_INVALID_THIS("ReadableStream");
|
|
|
|
return $readableStreamTee(this, false);
|
|
}
|
|
|
|
$getter;
|
|
export function locked(this) {
|
|
if (!$isReadableStream(this)) throw $makeGetterTypeError("ReadableStream", "locked");
|
|
|
|
return $isReadableStreamLocked(this);
|
|
}
|
|
|
|
export function values(this, options) {
|
|
var prototype = ReadableStream.prototype;
|
|
$readableStreamDefineLazyIterators(prototype);
|
|
return prototype.values.$call(this, options);
|
|
}
|
|
|
|
$linkTimeConstant;
|
|
export function lazyAsyncIterator(this) {
|
|
var prototype = ReadableStream.prototype;
|
|
$readableStreamDefineLazyIterators(prototype);
|
|
return prototype[globalThis.Symbol.asyncIterator].$call(this);
|
|
}
|