This commit is contained in:
Jarred Sumner
2024-12-19 05:36:15 -08:00
parent 7bff270e87
commit 33db8447b0
6 changed files with 278 additions and 29 deletions

View File

@@ -3,6 +3,8 @@
#include "BunWritableStream.h"
#include "BunWritableStreamDefaultController.h"
#include "BunWritableStreamDefaultWriter.h"
#include "BunStreamStructures.h"
#include "BunStreamInlines.h"
namespace Bun {
@@ -499,8 +501,49 @@ JSValue JSWritableStream::abort(JSGlobalObject* globalObject, JSValue reason)
if (isLocked())
return JSPromise::rejectedPromise(globalObject, createTypeError(globalObject, "Cannot abort a locked WritableStream"_s));
// 2. Return ! WritableStreamAbort(this, reason).
return Operations::WritableStreamAbort(globalObject, this, reason);
// 2. Let state be this.[[state]].
const auto state = m_state;
// 3. If state is "closed" or state is "errored", return a promise resolved with undefined.
if (state == State::Closed || state == State::Errored)
return JSPromise::resolvedPromise(globalObject, jsUndefined());
// 4. If this.[[pendingAbortRequest]] is not undefined, return this.[[pendingAbortRequest]].[[promise]].
if (auto promise = m_pendingAbortRequestPromise.get())
return promise;
// 5. Assert: state is "writable" or state is "erroring".
ASSERT(state == State::Writable || state == State::Erroring);
// 6. Let wasAlreadyErroring be false.
bool wasAlreadyErroring = false;
// 7. If state is "erroring",
if (state == State::Erroring) {
// a. Set wasAlreadyErroring to true.
wasAlreadyErroring = true;
// b. Set reason to undefined.
reason = jsUndefined();
}
// 8. Let promise be a new promise.
JSPromise* promise = JSPromise::create(vm, globalObject->promiseStructure());
// 9. Set this.[[pendingAbortRequest]] to record {[[promise]]: promise, [[reason]]: reason, [[wasAlreadyErroring]]: wasAlreadyErroring}.
m_pendingAbortRequestPromise.set(vm, this, promise);
m_pendingAbortRequestReason.set(vm, this, reason);
m_wasAlreadyErroring = wasAlreadyErroring;
// 10. If wasAlreadyErroring is false, perform ! WritableStreamStartErroring(this, reason).
if (!wasAlreadyErroring)
Operations::WritableStreamStartErroring(this, reason);
// 11. If this.[[state]] is "errored", perform ! WritableStreamFinishErroring(this).
if (m_state == State::Errored)
Operations::WritableStreamFinishErroring(this);
// 12. Return promise.
return promise;
}
JSValue JSWritableStream::close(JSGlobalObject* globalObject)
@@ -508,22 +551,98 @@ JSValue JSWritableStream::close(JSGlobalObject* globalObject)
VM& vm = globalObject->vm();
auto scope = DECLARE_THROW_SCOPE(vm);
// Cannot close locked stream
if (isLocked() || m_state == State::Errored)
return JSPromise::rejectedPromise(globalObject, createTypeError(globalObject, "Cannot close a locked or errored WritableStream"_s));
// 1. If ! IsWritableStreamLocked(this) is true, return a promise rejected with a TypeError exception.
if (isLocked())
return JSPromise::rejectedPromise(globalObject, createTypeError(globalObject, "Cannot close a locked WritableStream"_s));
// Cannot close if already closing
// 2. If ! WritableStreamCloseQueuedOrInFlight(this) is true, return a promise rejected with a TypeError exception.
if (m_closeRequest || m_inFlightCloseRequest)
return JSPromise::rejectedPromise(globalObject, createTypeError(globalObject, "Cannot close an already closing stream"_s));
// Create close promise
JSPromise* promise = JSPromise::create(vm, globalObject->promiseStructure());
m_closeRequest.set(vm, this, promise);
// 3. Let state be this.[[state]].
const auto state = m_state;
// Note: The controller just queues up the close operation
// 4. If state is "closed", return a promise rejected with a TypeError exception.
if (state == State::Closed)
return JSPromise::rejectedPromise(globalObject, createTypeError(globalObject, "Cannot close an already closed stream"_s));
// 5. If state is "errored", return a promise rejected with this.[[storedError]].
if (state == State::Errored)
return JSPromise::rejectedPromise(globalObject, m_storedError.get());
// 6. If state is "erroring", return a promise rejected with this.[[storedError]].
if (state == State::Erroring)
return JSPromise::rejectedPromise(globalObject, m_storedError.get());
// 7. Assert: state is "writable".
ASSERT(state == State::Writable);
// 8. Let closeRequest be ! WritableStreamCreateCloseRequest(this).
JSPromise* closeRequest = JSPromise::create(vm, globalObject->promiseStructure());
m_closeRequest.set(vm, this, closeRequest);
// 9. Perform ! WritableStreamDefaultControllerClose(this.[[controller]]).
m_controller->close(globalObject);
RELEASE_AND_RETURN(scope, promise);
// 10. Return closeRequest.[[promise]].
return closeRequest;
}
void JSWritableStream::finishInFlightClose()
{
VM& vm = m_controller->vm();
JSGlobalObject* globalObject = m_controller->globalObject();
// 1. Assert: this.[[inFlightCloseRequest]] is not undefined.
ASSERT(m_inFlightCloseRequest);
// 2. Resolve this.[[inFlightCloseRequest]] with undefined.
m_inFlightCloseRequest->resolve(globalObject, jsUndefined());
// 3. Set this.[[inFlightCloseRequest]] to undefined.
m_inFlightCloseRequest.clear();
// 4. Set this.[[state]] to "closed".
m_state = State::Closed;
// 5. Let writer be this.[[writer]].
auto* writer = m_writer.get();
// 6. If writer is not undefined,
if (writer) {
// a. Resolve writer.[[closedPromise]] with undefined.
writer->resolveClosedPromise(globalObject, jsUndefined());
}
}
void JSWritableStream::finishInFlightCloseWithError(JSValue error)
{
VM& vm = m_controller->vm();
JSGlobalObject* globalObject = m_controller->globalObject();
// 1. Assert: this.[[inFlightCloseRequest]] is not undefined.
ASSERT(m_inFlightCloseRequest);
// 2. Reject this.[[inFlightCloseRequest]] with error.
m_inFlightCloseRequest->reject(globalObject, error);
// 3. Set this.[[inFlightCloseRequest]] to undefined.
m_inFlightCloseRequest.clear();
// 4. Set this.[[state]] to "errored".
m_state = State::Errored;
// 5. Set this.[[storedError]] to error.
m_storedError.set(vm, this, error);
// 6. Let writer be this.[[writer]].
auto* writer = m_writer.get();
// 7. If writer is not undefined,
if (writer) {
// a. Reject writer.[[closedPromise]] with error.
writer->rejectClosedPromise(globalObject, error);
}
}
}

View File

@@ -120,6 +120,9 @@ public:
bool hasOperationMarkedInFlight() const { return m_inFlightWriteRequest || m_inFlightCloseRequest; }
void finishInFlightClose();
void finishInFlightCloseWithError(JSValue error);
private:
JSWritableStream(VM&, Structure*);
void finishCreation(VM&);

View File

@@ -1,5 +1,6 @@
#include "root.h"
#include "ZigGlobalObject.h"
#include <JavaScriptCore/JSGlobalObject.h>
#include <JavaScriptCore/JSArray.h>
#include <JavaScriptCore/JSPromise.h>
@@ -10,6 +11,11 @@
#include "JSAbortSignal.h"
#include "IDLTypes.h"
#include "JSDOMBinding.h"
#include "BunStreamStructures.h"
#include <JavaScriptCore/LazyPropertyInlines.h>
#include "BunStreamInlines.h"
#include "JSAbortSignal.h"
#include "DOMJITIDLType.h"
namespace Bun {
@@ -106,7 +112,7 @@ JSC_DEFINE_HOST_FUNCTION(jsWritableStreamDefaultControllerErrorFunction, (JSGlob
return {};
}
return JSValue::encode(controller->error(callFrame->argument(0)));
return JSValue::encode(controller->error(globalObject, callFrame->argument(0)));
}
JSC_DEFINE_CUSTOM_GETTER(jsWritableStreamDefaultControllerGetSignal, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName))
@@ -144,6 +150,32 @@ JSC_DEFINE_CUSTOM_GETTER(jsWritableStreamDefaultControllerGetDesiredSize, (JSGlo
}
}
JSC_DEFINE_HOST_FUNCTION(jsWritableStreamDefaultControllerCloseFulfill, (JSGlobalObject * globalObject, CallFrame* callFrame))
{
VM& vm = globalObject->vm();
auto scope = DECLARE_THROW_SCOPE(vm);
JSWritableStream* stream = jsDynamicCast<JSWritableStream*>(callFrame->argument(1));
if (UNLIKELY(!stream))
return throwVMTypeError(globalObject, scope, "WritableStreamDefaultController.close called with invalid stream"_s);
stream->finishInFlightClose();
return JSValue::encode(jsUndefined());
}
JSC_DEFINE_HOST_FUNCTION(jsWritableStreamDefaultControllerCloseReject, (JSGlobalObject * globalObject, CallFrame* callFrame))
{
VM& vm = globalObject->vm();
auto scope = DECLARE_THROW_SCOPE(vm);
JSWritableStream* stream = jsDynamicCast<JSWritableStream*>(callFrame->argument(1));
if (UNLIKELY(!stream))
return throwVMTypeError(globalObject, scope, "WritableStreamDefaultController.close called with invalid stream"_s);
stream->finishInFlightCloseWithError(callFrame->argument(0));
return JSValue::encode(jsUndefined());
}
static const HashTableValue JSWritableStreamDefaultControllerPrototypeTableValues[] = {
{ "error"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic,
{ HashTableValue::NativeFunctionType, jsWritableStreamDefaultControllerErrorFunction, 1 } },
@@ -183,29 +215,50 @@ JSWritableStreamDefaultController* JSWritableStreamDefaultController::create(
void JSWritableStreamDefaultController::finishCreation(JSC::VM& vm)
{
Base::finishCreation(vm);
m_queue.set(vm, JSC::constructEmptyArray(vm, nullptr));
m_abortController.set(vm, WebCore::JSAbortController::create(vm, nullptr, nullptr));
m_queue.set(vm, this, JSC::constructEmptyArray(globalObject(), nullptr, 0));
m_abortController.initLater([](const JSC::LazyProperty<JSObject, WebCore::JSAbortController>::Initializer& init) {
Zig::GlobalObject* globalObject = defaultGlobalObject(init.owner->globalObject());
auto& scriptExecutionContext = *globalObject->scriptExecutionContext();
Ref<WebCore::AbortController> abortController = WebCore::AbortController::create(scriptExecutionContext);
JSAbortController* abortControllerValue = jsCast<JSAbortController*>(WebCore::toJSNewlyCreated<IDLInterface<WebCore::AbortController>>(*init.owner->globalObject(), *globalObject, WTFMove(abortController)));
init.set(abortControllerValue);
});
}
JSC::JSValue JSWritableStreamDefaultController::abortSignal() const
{
auto& vm = this->globalObject()->vm();
auto throwScope = DECLARE_THROW_SCOPE(vm);
return WebCore::toJS<WebCore::IDLInterface<WebCore::AbortSignal>>(this->globalObject(), defaultGlobalObject(this->globalObject()), throwScope, m_abortController->wrapped().signal());
return WebCore::toJS<WebCore::IDLInterface<WebCore::AbortSignal>>(*this->globalObject(), throwScope, m_abortController.get(this)->wrapped().signal());
}
JSC::JSValue JSWritableStreamDefaultController::error(JSC::JSValue reason)
JSC::JSValue JSWritableStreamDefaultController::error(JSGlobalObject* globalObject, JSValue reason)
{
auto* globalObject = JSC::jsCast<JSC::JSGlobalObject*>(m_stream->globalObject());
JSC::VM& vm = globalObject->vm();
VM& vm = globalObject->vm();
auto scope = DECLARE_THROW_SCOPE(vm);
if (m_stream->state() != JSWritableStream::State::Writable)
return JSC::jsUndefined();
// 1. Let stream be this.[[stream]].
JSWritableStream* stream = m_stream.get();
performWritableStreamDefaultControllerError(this, reason);
// 2. Assert: stream is not undefined.
ASSERT(stream);
RELEASE_AND_RETURN(scope, JSC::jsUndefined());
// 3. Let state be stream.[[state]].
auto state = stream->state();
// 4. Assert: state is "writable".
if (state != JSWritableStream::State::Writable)
return throwTypeError(globalObject, scope, "WritableStreamDefaultController.error called on non-writable stream"_s);
// 5. Perform ! WritableStreamDefaultControllerError(this, error).
m_writeAlgorithm.clear();
m_closeAlgorithm.clear();
m_abortAlgorithm.clear();
m_strategySizeAlgorithm.clear();
stream->error(globalObject, reason);
return jsUndefined();
}
bool JSWritableStreamDefaultController::shouldCallWrite() const
@@ -248,7 +301,7 @@ void JSWritableStreamDefaultController::visitAdditionalChildren(Visitor& visitor
visitor.append(m_writeAlgorithm);
visitor.append(m_strategySizeAlgorithm);
visitor.append(m_queue);
visitor.append(m_abortController);
m_abortController.visit(visitor);
}
DEFINE_VISIT_CHILDREN(JSWritableStreamDefaultController);
@@ -261,4 +314,61 @@ const JSC::ClassInfo JSWritableStreamDefaultController::s_info = {
nullptr,
CREATE_METHOD_TABLE(JSWritableStreamDefaultController)
};
JSValue JSWritableStreamDefaultController::close(JSGlobalObject* globalObject)
{
VM& vm = globalObject->vm();
auto scope = DECLARE_THROW_SCOPE(vm);
// 1. Let stream be this.[[stream]].
JSWritableStream* stream = m_stream.get();
// 2. Assert: stream is not undefined.
ASSERT(stream);
// 3. Let state be stream.[[state]].
auto state = stream->state();
// 4. Assert: state is "writable".
ASSERT(state == JSWritableStream::State::Writable);
// 5. Let closeRequest be stream.[[closeRequest]].
// 6. Assert: closeRequest is not undefined.
ASSERT(stream->closeRequest());
// 7. Perform ! WritableStreamDefaultControllerClearAlgorithms(this).
m_writeAlgorithm.clear();
m_closeAlgorithm.clear();
m_abortAlgorithm.clear();
m_strategySizeAlgorithm.clear();
// 8. Let sinkClosePromise be the result of performing this.[[closeAlgorithm]].
JSValue sinkClosePromise;
if (m_closeAlgorithm) {
JSObject* closeFunction = m_closeAlgorithm.get();
if (closeFunction) {
MarkedArgumentBuffer args;
ASSERT(!args.hasOverflowed());
sinkClosePromise = JSC::profiledCall(globalObject, JSC::ProfilingReason::Microtask, closeFunction, JSC::getCallData(closeFunction), jsUndefined(), args);
RETURN_IF_EXCEPTION(scope, {});
} else {
sinkClosePromise = jsUndefined();
}
} else {
sinkClosePromise = jsUndefined();
}
// 9. Upon fulfillment of sinkClosePromise:
// a. Perform ! WritableStreamFinishInFlightClose(stream).
// 10. Upon rejection of sinkClosePromise with reason r:
// a. Perform ! WritableStreamFinishInFlightCloseWithError(stream, r).
if (JSPromise* promise = jsDynamicCast<JSPromise*>(sinkClosePromise)) {
Bun::then(globalObject, promise, jsWritableStreamDefaultControllerCloseFulfill, jsWritableStreamDefaultControllerCloseReject, stream);
} else {
// If not a promise, treat as fulfilled
stream->finishInFlightClose();
}
return jsUndefined();
}
}

View File

@@ -35,7 +35,8 @@ public:
}
// JavaScript-facing methods
JSC::JSValue error(JSC::JSValue reason);
JSC::JSValue error(JSC::JSGlobalObject* globalObject, JSC::JSValue reason);
JSC::JSValue close(JSC::JSGlobalObject* globalObject);
// C++-facing methods
bool shouldCallWrite() const;
@@ -76,9 +77,11 @@ private:
// Internal slots per spec
JSC::WriteBarrier<JSWritableStream> m_stream;
JSC::WriteBarrier<JSC::JSPromise> m_abortAlgorithm;
JSC::WriteBarrier<JSC::JSPromise> m_closeAlgorithm;
JSC::WriteBarrier<JSC::JSPromise> m_writeAlgorithm;
// Functions for us to call.
JSC::WriteBarrier<JSC::JSObject> m_abortAlgorithm;
JSC::WriteBarrier<JSC::JSObject> m_closeAlgorithm;
JSC::WriteBarrier<JSC::JSObject> m_writeAlgorithm;
double m_strategyHWM { 1.0 };
JSC::WriteBarrier<JSC::JSObject> m_strategySizeAlgorithm;
@@ -88,7 +91,7 @@ private:
bool m_writing { false };
bool m_inFlightWriteRequest { false };
bool m_closeRequested { false };
JSC::WriteBarrier<WebCore::JSAbortController> m_abortController;
JSC::LazyProperty<JSObject, WebCore::JSAbortController> m_abortController;
};
}

View File

@@ -424,4 +424,16 @@ void JSWritableStreamDefaultWriter::release()
m_readyPromise->reject(vm(), jsUndefined());
}
void JSWritableStreamDefaultWriter::resolveClosedPromise(JSGlobalObject* globalObject, JSValue value)
{
if (m_closedPromise)
m_closedPromise->resolve(globalObject, value);
}
void JSWritableStreamDefaultWriter::rejectClosedPromise(JSGlobalObject* globalObject, JSValue error)
{
if (m_closedPromise)
m_closedPromise->reject(globalObject, error);
}
} // namespace Bun

View File

@@ -1,4 +1,3 @@
#pragma once
#include "root.h"
@@ -34,6 +33,9 @@ public:
JSC::JSPromise* ready() { return m_readyPromise.get(); }
double desiredSize();
void resolveClosedPromise(JSC::JSGlobalObject* globalObject, JSC::JSValue value);
void rejectClosedPromise(JSC::JSGlobalObject* globalObject, JSC::JSValue error);
// Internal APIs for C++ use
JSWritableStream* stream() { return m_stream.get(); }
void release(); // For releaseLock()