mirror of
https://github.com/oven-sh/bun
synced 2026-02-09 10:28:47 +00:00
350 lines
13 KiB
TypeScript
350 lines
13 KiB
TypeScript
// @ts-nocheck
|
|
/*
|
|
* Copyright (C) 2020 Apple Inc. All rights reserved.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions
|
|
* are met:
|
|
* 1. Redistributions of source code must retain the above copyright
|
|
* notice, this list of conditions and the following disclaimer.
|
|
* 2. Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
|
|
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
|
|
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
|
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
|
|
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
|
|
* THE POSSIBILITY OF SUCH DAMAGE.
|
|
*/
|
|
|
|
// @internal
|
|
|
|
export function isTransformStream(stream) {
|
|
return $isObject(stream) && !!$getByIdDirectPrivate(stream, "readable");
|
|
}
|
|
|
|
export function isTransformStreamDefaultController(controller) {
|
|
return $isObject(controller) && !!$getByIdDirectPrivate(controller, "transformAlgorithm");
|
|
}
|
|
|
|
export function createTransformStream(
|
|
startAlgorithm,
|
|
transformAlgorithm,
|
|
flushAlgorithm,
|
|
writableHighWaterMark,
|
|
writableSizeAlgorithm,
|
|
readableHighWaterMark,
|
|
readableSizeAlgorithm,
|
|
) {
|
|
if (writableHighWaterMark === undefined) writableHighWaterMark = 1;
|
|
if (writableSizeAlgorithm === undefined) writableSizeAlgorithm = () => 1;
|
|
if (readableHighWaterMark === undefined) readableHighWaterMark = 0;
|
|
if (readableSizeAlgorithm === undefined) readableSizeAlgorithm = () => 1;
|
|
$assert(writableHighWaterMark >= 0);
|
|
$assert(readableHighWaterMark >= 0);
|
|
|
|
const transform = {};
|
|
$putByIdDirectPrivate(transform, "TransformStream", true);
|
|
|
|
const stream = new TransformStream(transform);
|
|
const startPromiseCapability = $newPromiseCapability(Promise);
|
|
$initializeTransformStream(
|
|
stream,
|
|
startPromiseCapability.promise,
|
|
writableHighWaterMark,
|
|
writableSizeAlgorithm,
|
|
readableHighWaterMark,
|
|
readableSizeAlgorithm,
|
|
);
|
|
|
|
const controller = new TransformStreamDefaultController();
|
|
$setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm);
|
|
|
|
startAlgorithm().$then(
|
|
() => {
|
|
startPromiseCapability.resolve.$call();
|
|
},
|
|
error => {
|
|
startPromiseCapability.reject.$call(undefined, error);
|
|
},
|
|
);
|
|
|
|
return stream;
|
|
}
|
|
|
|
export function initializeTransformStream(
|
|
stream,
|
|
startPromise,
|
|
writableHighWaterMark,
|
|
writableSizeAlgorithm,
|
|
readableHighWaterMark,
|
|
readableSizeAlgorithm,
|
|
) {
|
|
const startAlgorithm = () => {
|
|
return startPromise;
|
|
};
|
|
const writeAlgorithm = chunk => {
|
|
return $transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
|
|
};
|
|
const abortAlgorithm = reason => {
|
|
return $transformStreamDefaultSinkAbortAlgorithm(stream, reason);
|
|
};
|
|
const closeAlgorithm = () => {
|
|
return $transformStreamDefaultSinkCloseAlgorithm(stream);
|
|
};
|
|
const writable = $createWritableStream(
|
|
startAlgorithm,
|
|
writeAlgorithm,
|
|
closeAlgorithm,
|
|
abortAlgorithm,
|
|
writableHighWaterMark,
|
|
writableSizeAlgorithm,
|
|
);
|
|
|
|
const pullAlgorithm = () => {
|
|
return $transformStreamDefaultSourcePullAlgorithm(stream);
|
|
};
|
|
const cancelAlgorithm = reason => {
|
|
$transformStreamErrorWritableAndUnblockWrite(stream, reason);
|
|
return Promise.$resolve();
|
|
};
|
|
const underlyingSource = {};
|
|
$putByIdDirectPrivate(underlyingSource, "start", startAlgorithm);
|
|
$putByIdDirectPrivate(underlyingSource, "pull", pullAlgorithm);
|
|
$putByIdDirectPrivate(underlyingSource, "cancel", cancelAlgorithm);
|
|
const options = {};
|
|
$putByIdDirectPrivate(options, "size", readableSizeAlgorithm);
|
|
$putByIdDirectPrivate(options, "highWaterMark", readableHighWaterMark);
|
|
const readable = new ReadableStream(underlyingSource, options);
|
|
|
|
// The writable to expose to JS through writable getter.
|
|
$putByIdDirectPrivate(stream, "writable", writable);
|
|
// The writable to use for the actual transform algorithms.
|
|
$putByIdDirectPrivate(stream, "internalWritable", $getInternalWritableStream(writable));
|
|
|
|
$putByIdDirectPrivate(stream, "readable", readable);
|
|
$putByIdDirectPrivate(stream, "backpressure", undefined);
|
|
$putByIdDirectPrivate(stream, "backpressureChangePromise", undefined);
|
|
|
|
$transformStreamSetBackpressure(stream, true);
|
|
$putByIdDirectPrivate(stream, "controller", undefined);
|
|
}
|
|
|
|
export function transformStreamError(stream, e) {
|
|
const readable = $getByIdDirectPrivate(stream, "readable");
|
|
const readableController = $getByIdDirectPrivate(readable, "readableStreamController");
|
|
$readableStreamDefaultControllerError(readableController, e);
|
|
|
|
$transformStreamErrorWritableAndUnblockWrite(stream, e);
|
|
}
|
|
|
|
export function transformStreamErrorWritableAndUnblockWrite(stream, e) {
|
|
$transformStreamDefaultControllerClearAlgorithms($getByIdDirectPrivate(stream, "controller"));
|
|
|
|
const writable = $getByIdDirectPrivate(stream, "internalWritable");
|
|
$writableStreamDefaultControllerErrorIfNeeded($getByIdDirectPrivate(writable, "controller"), e);
|
|
|
|
if ($getByIdDirectPrivate(stream, "backpressure")) $transformStreamSetBackpressure(stream, false);
|
|
}
|
|
|
|
export function transformStreamSetBackpressure(stream, backpressure) {
|
|
$assert($getByIdDirectPrivate(stream, "backpressure") !== backpressure);
|
|
|
|
const backpressureChangePromise = $getByIdDirectPrivate(stream, "backpressureChangePromise");
|
|
if (backpressureChangePromise !== undefined) backpressureChangePromise.resolve.$call();
|
|
|
|
$putByIdDirectPrivate(stream, "backpressureChangePromise", $newPromiseCapability(Promise));
|
|
$putByIdDirectPrivate(stream, "backpressure", backpressure);
|
|
}
|
|
|
|
export function setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm) {
|
|
$assert($isTransformStream(stream));
|
|
$assert($getByIdDirectPrivate(stream, "controller") === undefined);
|
|
|
|
$putByIdDirectPrivate(controller, "stream", stream);
|
|
$putByIdDirectPrivate(stream, "controller", controller);
|
|
$putByIdDirectPrivate(controller, "transformAlgorithm", transformAlgorithm);
|
|
$putByIdDirectPrivate(controller, "flushAlgorithm", flushAlgorithm);
|
|
}
|
|
|
|
export function setUpTransformStreamDefaultControllerFromTransformer(stream, transformer, transformerDict) {
|
|
const controller = new TransformStreamDefaultController();
|
|
let transformAlgorithm = chunk => {
|
|
try {
|
|
$transformStreamDefaultControllerEnqueue(controller, chunk);
|
|
} catch (e) {
|
|
return Promise.$reject(e);
|
|
}
|
|
return Promise.$resolve();
|
|
};
|
|
let flushAlgorithm = () => {
|
|
return Promise.$resolve();
|
|
};
|
|
|
|
if ("transform" in transformerDict)
|
|
transformAlgorithm = chunk => {
|
|
return $promiseInvokeOrNoopMethod(transformer, transformerDict["transform"], [chunk, controller]);
|
|
};
|
|
|
|
if ("flush" in transformerDict) {
|
|
flushAlgorithm = () => {
|
|
return $promiseInvokeOrNoopMethod(transformer, transformerDict["flush"], [controller]);
|
|
};
|
|
}
|
|
|
|
$setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm);
|
|
}
|
|
|
|
export function transformStreamDefaultControllerClearAlgorithms(controller) {
|
|
// We set transformAlgorithm to true to allow GC but keep the isTransformStreamDefaultController check.
|
|
$putByIdDirectPrivate(controller, "transformAlgorithm", true);
|
|
$putByIdDirectPrivate(controller, "flushAlgorithm", undefined);
|
|
}
|
|
|
|
export function transformStreamDefaultControllerEnqueue(controller, chunk) {
|
|
const stream = $getByIdDirectPrivate(controller, "stream");
|
|
const readable = $getByIdDirectPrivate(stream, "readable");
|
|
const readableController = $getByIdDirectPrivate(readable, "readableStreamController");
|
|
|
|
$assert(readableController !== undefined);
|
|
if (!$readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
|
|
$throwTypeError("TransformStream.readable cannot close or enqueue");
|
|
|
|
try {
|
|
$readableStreamDefaultControllerEnqueue(readableController, chunk);
|
|
} catch (e) {
|
|
$transformStreamErrorWritableAndUnblockWrite(stream, e);
|
|
throw $getByIdDirectPrivate(readable, "storedError");
|
|
}
|
|
|
|
const backpressure = !$readableStreamDefaultControllerShouldCallPull(readableController);
|
|
if (backpressure !== $getByIdDirectPrivate(stream, "backpressure")) {
|
|
$assert(backpressure);
|
|
$transformStreamSetBackpressure(stream, true);
|
|
}
|
|
}
|
|
|
|
export function transformStreamDefaultControllerError(controller, e) {
|
|
$transformStreamError($getByIdDirectPrivate(controller, "stream"), e);
|
|
}
|
|
|
|
export function transformStreamDefaultControllerPerformTransform(controller, chunk) {
|
|
const promiseCapability = $newPromiseCapability(Promise);
|
|
|
|
const transformPromise = $getByIdDirectPrivate(controller, "transformAlgorithm").$call(undefined, chunk);
|
|
transformPromise.$then(
|
|
() => {
|
|
promiseCapability.resolve();
|
|
},
|
|
r => {
|
|
$transformStreamError($getByIdDirectPrivate(controller, "stream"), r);
|
|
promiseCapability.reject.$call(undefined, r);
|
|
},
|
|
);
|
|
return promiseCapability.promise;
|
|
}
|
|
|
|
export function transformStreamDefaultControllerTerminate(controller) {
|
|
const stream = $getByIdDirectPrivate(controller, "stream");
|
|
const readable = $getByIdDirectPrivate(stream, "readable");
|
|
const readableController = $getByIdDirectPrivate(readable, "readableStreamController");
|
|
|
|
// FIXME: Update readableStreamDefaultControllerClose to make this check.
|
|
if ($readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
|
|
$readableStreamDefaultControllerClose(readableController);
|
|
const error = $makeTypeError("the stream has been terminated");
|
|
$transformStreamErrorWritableAndUnblockWrite(stream, error);
|
|
}
|
|
|
|
export function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
|
|
const writable = $getByIdDirectPrivate(stream, "internalWritable");
|
|
|
|
$assert($getByIdDirectPrivate(writable, "state") === "writable");
|
|
|
|
const controller = $getByIdDirectPrivate(stream, "controller");
|
|
|
|
if ($getByIdDirectPrivate(stream, "backpressure")) {
|
|
const promiseCapability = $newPromiseCapability(Promise);
|
|
|
|
const backpressureChangePromise = $getByIdDirectPrivate(stream, "backpressureChangePromise");
|
|
$assert(backpressureChangePromise !== undefined);
|
|
backpressureChangePromise.promise.$then(
|
|
() => {
|
|
const state = $getByIdDirectPrivate(writable, "state");
|
|
if (state === "erroring") {
|
|
promiseCapability.reject.$call(undefined, $getByIdDirectPrivate(writable, "storedError"));
|
|
return;
|
|
}
|
|
|
|
$assert(state === "writable");
|
|
$transformStreamDefaultControllerPerformTransform(controller, chunk).$then(
|
|
() => {
|
|
promiseCapability.resolve();
|
|
},
|
|
e => {
|
|
promiseCapability.reject.$call(undefined, e);
|
|
},
|
|
);
|
|
},
|
|
e => {
|
|
promiseCapability.reject.$call(undefined, e);
|
|
},
|
|
);
|
|
|
|
return promiseCapability.promise;
|
|
}
|
|
return $transformStreamDefaultControllerPerformTransform(controller, chunk);
|
|
}
|
|
|
|
export function transformStreamDefaultSinkAbortAlgorithm(stream, reason) {
|
|
$transformStreamError(stream, reason);
|
|
return Promise.$resolve();
|
|
}
|
|
|
|
export function transformStreamDefaultSinkCloseAlgorithm(stream) {
|
|
const readable = $getByIdDirectPrivate(stream, "readable");
|
|
const controller = $getByIdDirectPrivate(stream, "controller");
|
|
const readableController = $getByIdDirectPrivate(readable, "readableStreamController");
|
|
|
|
const flushAlgorithm = $getByIdDirectPrivate(controller, "flushAlgorithm");
|
|
$assert(flushAlgorithm !== undefined);
|
|
const flushPromise = $getByIdDirectPrivate(controller, "flushAlgorithm").$call();
|
|
$transformStreamDefaultControllerClearAlgorithms(controller);
|
|
|
|
const promiseCapability = $newPromiseCapability(Promise);
|
|
flushPromise.$then(
|
|
() => {
|
|
if ($getByIdDirectPrivate(readable, "state") === $streamErrored) {
|
|
promiseCapability.reject.$call(undefined, $getByIdDirectPrivate(readable, "storedError"));
|
|
return;
|
|
}
|
|
|
|
// FIXME: Update readableStreamDefaultControllerClose to make this check.
|
|
if ($readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
|
|
$readableStreamDefaultControllerClose(readableController);
|
|
promiseCapability.resolve();
|
|
},
|
|
r => {
|
|
$transformStreamError($getByIdDirectPrivate(controller, "stream"), r);
|
|
promiseCapability.reject.$call(undefined, $getByIdDirectPrivate(readable, "storedError"));
|
|
},
|
|
);
|
|
return promiseCapability.promise;
|
|
}
|
|
|
|
export function transformStreamDefaultSourcePullAlgorithm(stream) {
|
|
$assert($getByIdDirectPrivate(stream, "backpressure"));
|
|
$assert($getByIdDirectPrivate(stream, "backpressureChangePromise") !== undefined);
|
|
|
|
$transformStreamSetBackpressure(stream, false);
|
|
|
|
return $getByIdDirectPrivate(stream, "backpressureChangePromise").promise;
|
|
}
|