diff --git a/src/bun.js/bindings/BunWritableStream.cpp b/src/bun.js/bindings/BunWritableStream.cpp index b051be5013..0fb114e7db 100644 --- a/src/bun.js/bindings/BunWritableStream.cpp +++ b/src/bun.js/bindings/BunWritableStream.cpp @@ -3,6 +3,8 @@ #include "BunWritableStream.h" #include "BunWritableStreamDefaultController.h" #include "BunWritableStreamDefaultWriter.h" +#include "BunStreamStructures.h" +#include "BunStreamInlines.h" namespace Bun { @@ -499,8 +501,49 @@ JSValue JSWritableStream::abort(JSGlobalObject* globalObject, JSValue reason) if (isLocked()) return JSPromise::rejectedPromise(globalObject, createTypeError(globalObject, "Cannot abort a locked WritableStream"_s)); - // 2. Return ! WritableStreamAbort(this, reason). - return Operations::WritableStreamAbort(globalObject, this, reason); + // 2. Let state be this.[[state]]. + const auto state = m_state; + + // 3. If state is "closed" or state is "errored", return a promise resolved with undefined. + if (state == State::Closed || state == State::Errored) + return JSPromise::resolvedPromise(globalObject, jsUndefined()); + + // 4. If this.[[pendingAbortRequest]] is not undefined, return this.[[pendingAbortRequest]].[[promise]]. + if (auto promise = m_pendingAbortRequestPromise.get()) + return promise; + + // 5. Assert: state is "writable" or state is "erroring". + ASSERT(state == State::Writable || state == State::Erroring); + + // 6. Let wasAlreadyErroring be false. + bool wasAlreadyErroring = false; + + // 7. If state is "erroring", + if (state == State::Erroring) { + // a. Set wasAlreadyErroring to true. + wasAlreadyErroring = true; + // b. Set reason to undefined. + reason = jsUndefined(); + } + + // 8. Let promise be a new promise. + JSPromise* promise = JSPromise::create(vm, globalObject->promiseStructure()); + + // 9. Set this.[[pendingAbortRequest]] to record {[[promise]]: promise, [[reason]]: reason, [[wasAlreadyErroring]]: wasAlreadyErroring}. + m_pendingAbortRequestPromise.set(vm, this, promise); + m_pendingAbortRequestReason.set(vm, this, reason); + m_wasAlreadyErroring = wasAlreadyErroring; + + // 10. If wasAlreadyErroring is false, perform ! WritableStreamStartErroring(this, reason). + if (!wasAlreadyErroring) + Operations::WritableStreamStartErroring(this, reason); + + // 11. If this.[[state]] is "errored", perform ! WritableStreamFinishErroring(this). + if (m_state == State::Errored) + Operations::WritableStreamFinishErroring(this); + + // 12. Return promise. + return promise; } JSValue JSWritableStream::close(JSGlobalObject* globalObject) @@ -508,22 +551,98 @@ JSValue JSWritableStream::close(JSGlobalObject* globalObject) VM& vm = globalObject->vm(); auto scope = DECLARE_THROW_SCOPE(vm); - // Cannot close locked stream - if (isLocked() || m_state == State::Errored) - return JSPromise::rejectedPromise(globalObject, createTypeError(globalObject, "Cannot close a locked or errored WritableStream"_s)); + // 1. If ! IsWritableStreamLocked(this) is true, return a promise rejected with a TypeError exception. + if (isLocked()) + return JSPromise::rejectedPromise(globalObject, createTypeError(globalObject, "Cannot close a locked WritableStream"_s)); - // Cannot close if already closing + // 2. If ! WritableStreamCloseQueuedOrInFlight(this) is true, return a promise rejected with a TypeError exception. if (m_closeRequest || m_inFlightCloseRequest) return JSPromise::rejectedPromise(globalObject, createTypeError(globalObject, "Cannot close an already closing stream"_s)); - // Create close promise - JSPromise* promise = JSPromise::create(vm, globalObject->promiseStructure()); - m_closeRequest.set(vm, this, promise); + // 3. Let state be this.[[state]]. + const auto state = m_state; - // Note: The controller just queues up the close operation + // 4. If state is "closed", return a promise rejected with a TypeError exception. + if (state == State::Closed) + return JSPromise::rejectedPromise(globalObject, createTypeError(globalObject, "Cannot close an already closed stream"_s)); + + // 5. If state is "errored", return a promise rejected with this.[[storedError]]. + if (state == State::Errored) + return JSPromise::rejectedPromise(globalObject, m_storedError.get()); + + // 6. If state is "erroring", return a promise rejected with this.[[storedError]]. + if (state == State::Erroring) + return JSPromise::rejectedPromise(globalObject, m_storedError.get()); + + // 7. Assert: state is "writable". + ASSERT(state == State::Writable); + + // 8. Let closeRequest be ! WritableStreamCreateCloseRequest(this). + JSPromise* closeRequest = JSPromise::create(vm, globalObject->promiseStructure()); + m_closeRequest.set(vm, this, closeRequest); + + // 9. Perform ! WritableStreamDefaultControllerClose(this.[[controller]]). m_controller->close(globalObject); - RELEASE_AND_RETURN(scope, promise); + // 10. Return closeRequest.[[promise]]. + return closeRequest; +} + +void JSWritableStream::finishInFlightClose() +{ + VM& vm = m_controller->vm(); + JSGlobalObject* globalObject = m_controller->globalObject(); + + // 1. Assert: this.[[inFlightCloseRequest]] is not undefined. + ASSERT(m_inFlightCloseRequest); + + // 2. Resolve this.[[inFlightCloseRequest]] with undefined. + m_inFlightCloseRequest->resolve(globalObject, jsUndefined()); + + // 3. Set this.[[inFlightCloseRequest]] to undefined. + m_inFlightCloseRequest.clear(); + + // 4. Set this.[[state]] to "closed". + m_state = State::Closed; + + // 5. Let writer be this.[[writer]]. + auto* writer = m_writer.get(); + + // 6. If writer is not undefined, + if (writer) { + // a. Resolve writer.[[closedPromise]] with undefined. + writer->resolveClosedPromise(globalObject, jsUndefined()); + } +} + +void JSWritableStream::finishInFlightCloseWithError(JSValue error) +{ + VM& vm = m_controller->vm(); + JSGlobalObject* globalObject = m_controller->globalObject(); + + // 1. Assert: this.[[inFlightCloseRequest]] is not undefined. + ASSERT(m_inFlightCloseRequest); + + // 2. Reject this.[[inFlightCloseRequest]] with error. + m_inFlightCloseRequest->reject(globalObject, error); + + // 3. Set this.[[inFlightCloseRequest]] to undefined. + m_inFlightCloseRequest.clear(); + + // 4. Set this.[[state]] to "errored". + m_state = State::Errored; + + // 5. Set this.[[storedError]] to error. + m_storedError.set(vm, this, error); + + // 6. Let writer be this.[[writer]]. + auto* writer = m_writer.get(); + + // 7. If writer is not undefined, + if (writer) { + // a. Reject writer.[[closedPromise]] with error. + writer->rejectClosedPromise(globalObject, error); + } } } diff --git a/src/bun.js/bindings/BunWritableStream.h b/src/bun.js/bindings/BunWritableStream.h index b00fee376e..3c6be7721f 100644 --- a/src/bun.js/bindings/BunWritableStream.h +++ b/src/bun.js/bindings/BunWritableStream.h @@ -120,6 +120,9 @@ public: bool hasOperationMarkedInFlight() const { return m_inFlightWriteRequest || m_inFlightCloseRequest; } + void finishInFlightClose(); + void finishInFlightCloseWithError(JSValue error); + private: JSWritableStream(VM&, Structure*); void finishCreation(VM&); diff --git a/src/bun.js/bindings/BunWritableStreamDefaultController.cpp b/src/bun.js/bindings/BunWritableStreamDefaultController.cpp index 8ab85c670f..6ecaacf5d1 100644 --- a/src/bun.js/bindings/BunWritableStreamDefaultController.cpp +++ b/src/bun.js/bindings/BunWritableStreamDefaultController.cpp @@ -1,5 +1,6 @@ #include "root.h" +#include "ZigGlobalObject.h" #include #include #include @@ -10,6 +11,11 @@ #include "JSAbortSignal.h" #include "IDLTypes.h" #include "JSDOMBinding.h" +#include "BunStreamStructures.h" +#include +#include "BunStreamInlines.h" +#include "JSAbortSignal.h" +#include "DOMJITIDLType.h" namespace Bun { @@ -106,7 +112,7 @@ JSC_DEFINE_HOST_FUNCTION(jsWritableStreamDefaultControllerErrorFunction, (JSGlob return {}; } - return JSValue::encode(controller->error(callFrame->argument(0))); + return JSValue::encode(controller->error(globalObject, callFrame->argument(0))); } JSC_DEFINE_CUSTOM_GETTER(jsWritableStreamDefaultControllerGetSignal, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName)) @@ -144,6 +150,32 @@ JSC_DEFINE_CUSTOM_GETTER(jsWritableStreamDefaultControllerGetDesiredSize, (JSGlo } } +JSC_DEFINE_HOST_FUNCTION(jsWritableStreamDefaultControllerCloseFulfill, (JSGlobalObject * globalObject, CallFrame* callFrame)) +{ + VM& vm = globalObject->vm(); + auto scope = DECLARE_THROW_SCOPE(vm); + + JSWritableStream* stream = jsDynamicCast(callFrame->argument(1)); + if (UNLIKELY(!stream)) + return throwVMTypeError(globalObject, scope, "WritableStreamDefaultController.close called with invalid stream"_s); + + stream->finishInFlightClose(); + return JSValue::encode(jsUndefined()); +} + +JSC_DEFINE_HOST_FUNCTION(jsWritableStreamDefaultControllerCloseReject, (JSGlobalObject * globalObject, CallFrame* callFrame)) +{ + VM& vm = globalObject->vm(); + auto scope = DECLARE_THROW_SCOPE(vm); + + JSWritableStream* stream = jsDynamicCast(callFrame->argument(1)); + if (UNLIKELY(!stream)) + return throwVMTypeError(globalObject, scope, "WritableStreamDefaultController.close called with invalid stream"_s); + + stream->finishInFlightCloseWithError(callFrame->argument(0)); + return JSValue::encode(jsUndefined()); +} + static const HashTableValue JSWritableStreamDefaultControllerPrototypeTableValues[] = { { "error"_s, static_cast(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, jsWritableStreamDefaultControllerErrorFunction, 1 } }, @@ -183,29 +215,50 @@ JSWritableStreamDefaultController* JSWritableStreamDefaultController::create( void JSWritableStreamDefaultController::finishCreation(JSC::VM& vm) { Base::finishCreation(vm); - m_queue.set(vm, JSC::constructEmptyArray(vm, nullptr)); - m_abortController.set(vm, WebCore::JSAbortController::create(vm, nullptr, nullptr)); + m_queue.set(vm, this, JSC::constructEmptyArray(globalObject(), nullptr, 0)); + m_abortController.initLater([](const JSC::LazyProperty::Initializer& init) { + Zig::GlobalObject* globalObject = defaultGlobalObject(init.owner->globalObject()); + auto& scriptExecutionContext = *globalObject->scriptExecutionContext(); + Ref abortController = WebCore::AbortController::create(scriptExecutionContext); + JSAbortController* abortControllerValue = jsCast(WebCore::toJSNewlyCreated>(*init.owner->globalObject(), *globalObject, WTFMove(abortController))); + init.set(abortControllerValue); + }); } JSC::JSValue JSWritableStreamDefaultController::abortSignal() const { auto& vm = this->globalObject()->vm(); auto throwScope = DECLARE_THROW_SCOPE(vm); - return WebCore::toJS>(this->globalObject(), defaultGlobalObject(this->globalObject()), throwScope, m_abortController->wrapped().signal()); + return WebCore::toJS>(*this->globalObject(), throwScope, m_abortController.get(this)->wrapped().signal()); } -JSC::JSValue JSWritableStreamDefaultController::error(JSC::JSValue reason) +JSC::JSValue JSWritableStreamDefaultController::error(JSGlobalObject* globalObject, JSValue reason) { - auto* globalObject = JSC::jsCast(m_stream->globalObject()); - JSC::VM& vm = globalObject->vm(); + VM& vm = globalObject->vm(); auto scope = DECLARE_THROW_SCOPE(vm); - if (m_stream->state() != JSWritableStream::State::Writable) - return JSC::jsUndefined(); + // 1. Let stream be this.[[stream]]. + JSWritableStream* stream = m_stream.get(); - performWritableStreamDefaultControllerError(this, reason); + // 2. Assert: stream is not undefined. + ASSERT(stream); - RELEASE_AND_RETURN(scope, JSC::jsUndefined()); + // 3. Let state be stream.[[state]]. + auto state = stream->state(); + + // 4. Assert: state is "writable". + if (state != JSWritableStream::State::Writable) + return throwTypeError(globalObject, scope, "WritableStreamDefaultController.error called on non-writable stream"_s); + + // 5. Perform ! WritableStreamDefaultControllerError(this, error). + m_writeAlgorithm.clear(); + m_closeAlgorithm.clear(); + m_abortAlgorithm.clear(); + m_strategySizeAlgorithm.clear(); + + stream->error(globalObject, reason); + + return jsUndefined(); } bool JSWritableStreamDefaultController::shouldCallWrite() const @@ -248,7 +301,7 @@ void JSWritableStreamDefaultController::visitAdditionalChildren(Visitor& visitor visitor.append(m_writeAlgorithm); visitor.append(m_strategySizeAlgorithm); visitor.append(m_queue); - visitor.append(m_abortController); + m_abortController.visit(visitor); } DEFINE_VISIT_CHILDREN(JSWritableStreamDefaultController); @@ -261,4 +314,61 @@ const JSC::ClassInfo JSWritableStreamDefaultController::s_info = { nullptr, CREATE_METHOD_TABLE(JSWritableStreamDefaultController) }; + +JSValue JSWritableStreamDefaultController::close(JSGlobalObject* globalObject) +{ + VM& vm = globalObject->vm(); + auto scope = DECLARE_THROW_SCOPE(vm); + + // 1. Let stream be this.[[stream]]. + JSWritableStream* stream = m_stream.get(); + + // 2. Assert: stream is not undefined. + ASSERT(stream); + + // 3. Let state be stream.[[state]]. + auto state = stream->state(); + + // 4. Assert: state is "writable". + ASSERT(state == JSWritableStream::State::Writable); + + // 5. Let closeRequest be stream.[[closeRequest]]. + // 6. Assert: closeRequest is not undefined. + ASSERT(stream->closeRequest()); + + // 7. Perform ! WritableStreamDefaultControllerClearAlgorithms(this). + m_writeAlgorithm.clear(); + m_closeAlgorithm.clear(); + m_abortAlgorithm.clear(); + m_strategySizeAlgorithm.clear(); + + // 8. Let sinkClosePromise be the result of performing this.[[closeAlgorithm]]. + JSValue sinkClosePromise; + if (m_closeAlgorithm) { + JSObject* closeFunction = m_closeAlgorithm.get(); + if (closeFunction) { + MarkedArgumentBuffer args; + ASSERT(!args.hasOverflowed()); + sinkClosePromise = JSC::profiledCall(globalObject, JSC::ProfilingReason::Microtask, closeFunction, JSC::getCallData(closeFunction), jsUndefined(), args); + RETURN_IF_EXCEPTION(scope, {}); + } else { + sinkClosePromise = jsUndefined(); + } + } else { + sinkClosePromise = jsUndefined(); + } + + // 9. Upon fulfillment of sinkClosePromise: + // a. Perform ! WritableStreamFinishInFlightClose(stream). + // 10. Upon rejection of sinkClosePromise with reason r: + // a. Perform ! WritableStreamFinishInFlightCloseWithError(stream, r). + if (JSPromise* promise = jsDynamicCast(sinkClosePromise)) { + Bun::then(globalObject, promise, jsWritableStreamDefaultControllerCloseFulfill, jsWritableStreamDefaultControllerCloseReject, stream); + } else { + // If not a promise, treat as fulfilled + stream->finishInFlightClose(); + } + + return jsUndefined(); +} } diff --git a/src/bun.js/bindings/BunWritableStreamDefaultController.h b/src/bun.js/bindings/BunWritableStreamDefaultController.h index bcd8039b8f..281076f180 100644 --- a/src/bun.js/bindings/BunWritableStreamDefaultController.h +++ b/src/bun.js/bindings/BunWritableStreamDefaultController.h @@ -35,7 +35,8 @@ public: } // JavaScript-facing methods - JSC::JSValue error(JSC::JSValue reason); + JSC::JSValue error(JSC::JSGlobalObject* globalObject, JSC::JSValue reason); + JSC::JSValue close(JSC::JSGlobalObject* globalObject); // C++-facing methods bool shouldCallWrite() const; @@ -76,9 +77,11 @@ private: // Internal slots per spec JSC::WriteBarrier m_stream; - JSC::WriteBarrier m_abortAlgorithm; - JSC::WriteBarrier m_closeAlgorithm; - JSC::WriteBarrier m_writeAlgorithm; + + // Functions for us to call. + JSC::WriteBarrier m_abortAlgorithm; + JSC::WriteBarrier m_closeAlgorithm; + JSC::WriteBarrier m_writeAlgorithm; double m_strategyHWM { 1.0 }; JSC::WriteBarrier m_strategySizeAlgorithm; @@ -88,7 +91,7 @@ private: bool m_writing { false }; bool m_inFlightWriteRequest { false }; bool m_closeRequested { false }; - JSC::WriteBarrier m_abortController; + JSC::LazyProperty m_abortController; }; } diff --git a/src/bun.js/bindings/BunWritableStreamDefaultWriter.cpp b/src/bun.js/bindings/BunWritableStreamDefaultWriter.cpp index e2f74f9cfe..debe8f2726 100644 --- a/src/bun.js/bindings/BunWritableStreamDefaultWriter.cpp +++ b/src/bun.js/bindings/BunWritableStreamDefaultWriter.cpp @@ -424,4 +424,16 @@ void JSWritableStreamDefaultWriter::release() m_readyPromise->reject(vm(), jsUndefined()); } +void JSWritableStreamDefaultWriter::resolveClosedPromise(JSGlobalObject* globalObject, JSValue value) +{ + if (m_closedPromise) + m_closedPromise->resolve(globalObject, value); +} + +void JSWritableStreamDefaultWriter::rejectClosedPromise(JSGlobalObject* globalObject, JSValue error) +{ + if (m_closedPromise) + m_closedPromise->reject(globalObject, error); +} + } // namespace Bun diff --git a/src/bun.js/bindings/BunWritableStreamDefaultWriter.h b/src/bun.js/bindings/BunWritableStreamDefaultWriter.h index a619397a4e..0b63305feb 100644 --- a/src/bun.js/bindings/BunWritableStreamDefaultWriter.h +++ b/src/bun.js/bindings/BunWritableStreamDefaultWriter.h @@ -1,4 +1,3 @@ - #pragma once #include "root.h" @@ -34,6 +33,9 @@ public: JSC::JSPromise* ready() { return m_readyPromise.get(); } double desiredSize(); + void resolveClosedPromise(JSC::JSGlobalObject* globalObject, JSC::JSValue value); + void rejectClosedPromise(JSC::JSGlobalObject* globalObject, JSC::JSValue error); + // Internal APIs for C++ use JSWritableStream* stream() { return m_stream.get(); } void release(); // For releaseLock()