diff --git a/src/bunfig.zig b/src/bunfig.zig index b0c763498a..4fd3b7a555 100644 --- a/src/bunfig.zig +++ b/src/bunfig.zig @@ -126,7 +126,6 @@ pub const Bunfig = struct { if (prop.value.?.data != .e_string) continue; valid_count += 1; } - var buffer = allocator.alloc([]const u8, valid_count * 2) catch unreachable; var keys = buffer[0..valid_count]; var values = buffer[valid_count..]; @@ -442,19 +441,19 @@ pub const Bunfig = struct { if (json.get("jsxImportSource")) |expr| { if (expr.asString(allocator)) |value| { - jsx_import_source = value; + jsx_import_source = try allocator.dupe(u8, value); } } if (json.get("jsxFragment")) |expr| { if (expr.asString(allocator)) |value| { - jsx_fragment = value; + jsx_fragment = try allocator.dupe(u8, value); } } if (json.get("jsxFactory")) |expr| { if (expr.asString(allocator)) |value| { - jsx_factory = value; + jsx_factory = try allocator.dupe(u8, value); } } diff --git a/src/io/io_darwin.zig b/src/io/io_darwin.zig index 17cf9cca6f..1de5d494fb 100644 --- a/src/io/io_darwin.zig +++ b/src/io/io_darwin.zig @@ -541,7 +541,7 @@ pub fn run_for_ns(self: *IO, nanoseconds: u63) !void { fn flush(self: *IO, wait_for_completions: bool) !void { var io_pending = self.io_pending.peek(); - var events: [256]os.Kevent = undefined; + var events: [512]os.Kevent = undefined; // Check timeouts and fill events with completions in io_pending // (they will be submitted through kevent). @@ -711,6 +711,7 @@ const Operation = union(enum) { buf: [*]u8, len: u32, offset: u64, + positional: bool = true, }, recv: struct { socket: os.socket_t, @@ -774,9 +775,13 @@ fn submitWithIncrementPending( self.pending_count += 1; const Context = @TypeOf(context); const onCompleteFn = struct { - fn onComplete(io: *IO, _completion: *Completion) void { + fn onComplete( + io: *IO, + _completion: *Completion, + ) void { // Perform the actual operaton const op_data = &@field(_completion.operation, @tagName(operation_tag)); + const result = OperationImpl.doOperation(op_data); // Requeue onto io_pending if error.WouldBlock @@ -1161,8 +1166,9 @@ pub fn read( completion: *Completion, fd: os.fd_t, buffer: []u8, - offset: u64, + offset: ?u64, ) void { + const offset_ = offset orelse @as(u64, 0); self.submit( context, callback, @@ -1172,16 +1178,21 @@ pub fn read( .fd = fd, .buf = buffer.ptr, .len = @intCast(u32, buffer_limit(buffer.len)), - .offset = offset, + .offset = offset_, + .positional = offset != null, }, struct { fn doOperation(op: anytype) ReadError!usize { while (true) { - const rc = os.system.pread( + const rc = if (op.positional) os.system.pread( op.fd, op.buf, op.len, @bitCast(isize, op.offset), + ) else os.system.read( + op.fd, + op.buf, + op.len, ); return switch (@enumToInt(os.errno(rc))) { 0 => @intCast(usize, rc), diff --git a/src/io/io_linux.zig b/src/io/io_linux.zig index 374ba9d789..b4f21dee59 100644 --- a/src/io/io_linux.zig +++ b/src/io/io_linux.zig @@ -1231,7 +1231,7 @@ pub fn read( completion: *Completion, fd: os.fd_t, buffer: []u8, - offset: u64, + offset: ?u64, ) void { completion.* = .{ .io = self, @@ -1250,7 +1250,8 @@ pub fn read( .read = .{ .fd = fd, .buffer = buffer, - .offset = offset, + // pread is irrelevant here + .offset = offset orelse 0, }, }, }; diff --git a/src/javascript/jsc/bindings/BlobReadableStreamSource.cpp b/src/javascript/jsc/bindings/BlobReadableStreamSource.cpp index 979135d39a..4e85624acc 100644 --- a/src/javascript/jsc/bindings/BlobReadableStreamSource.cpp +++ b/src/javascript/jsc/bindings/BlobReadableStreamSource.cpp @@ -4,27 +4,49 @@ extern "C" void BlobStore__ref(void*); extern "C" void BlobStore__deref(void*); -extern "C" bool BlobStore__requestRead(void* store, WeakPtr ctx, size_t offset, size_t size); -extern "C" bool BlobStore__requestStart(void* store, WeakPtr ctx, size_t offset, size_t size); -extern "C" void BlobStore__onClose(WeakPtr source) + +extern "C" bool BlobStore__requestRead(void* store, void* streamer, WeakPtr ctx, size_t offset, size_t size); +extern "C" bool BlobStore__requestStart(void* store, void** streamer, WeakPtr ctx, size_t offset, size_t size); +extern "C" bool BlobReadableStreamSource_isCancelled(WeakPtr source) +{ + if (source) + return source->isCancelled(); + + return true; +} +extern "C" void BlobStore__onClose(RefPtr source) { if (!source) return; source->close(); } -extern "C" void BlobStore__onError(WeakPtr source, const SystemError* error, Zig::GlobalObject* globalObject) +extern "C" void BlobStore__onError(RefPtr source, const SystemError* error, Zig::GlobalObject* globalObject) { - if (!source) + if (!source || source->isCancelled()) return; - source->cancel(JSC::JSValue::decode(SystemError__toErrorInstance(error, globalObject))); + + source->error(JSC::JSValue::decode(SystemError__toErrorInstance(error, globalObject))); } -extern "C" bool BlobStore__onRead(WeakPtr source, const uint8_t* ptr, size_t read) +extern "C" bool BlobStore__onRead(RefPtr source, const uint8_t* ptr, size_t read) { if (!source) return false; - bool couldHaveMore = source->enqueue(ptr, read); - return couldHaveMore; + auto result = source->enqueue(ptr, read); + source->deref(); + return result; +} + +extern "C" bool BlobStore__onReadExternal(RefPtr source, uint8_t* ptr, size_t read, void* ctx, JSTypedArrayBytesDeallocator bytesDeallocator) +{ + if (!source) { + bytesDeallocator(ctx, ptr); + return false; + } + + auto result = source->enqueue(ptr, read, ctx, bytesDeallocator); + source->deref(); + return result; } extern "C" JSC__JSValue ReadableStream__empty(Zig::GlobalObject* globalObject) @@ -43,6 +65,7 @@ extern "C" JSC__JSValue ReadableStream__empty(Zig::GlobalObject* globalObject) extern "C" JSC__JSValue ReadableStream__fromBlob(Zig::GlobalObject* globalObject, void* store, size_t offset, size_t size) { auto source = WebCore::BlobReadableStreamSource::create(store, offset, size); + auto result = WebCore::ReadableStream::create(*globalObject, WTFMove(source)); if (UNLIKELY(result.hasException())) { auto scope = DECLARE_THROW_SCOPE(globalObject->vm()); @@ -62,19 +85,29 @@ Ref BlobReadableStreamSource::create(void* store, size void BlobReadableStreamSource::doStart() { - if (!BlobStore__requestStart(m_store, WeakPtr { *this }, m_offset, m_size > m_offset ? m_size - m_offset : 0)) { - close(); + RefPtr weakThis = this; + weakThis->ref(); + + if (!BlobStore__requestStart(m_store, &m_streamer, weakThis, m_offset, m_size > m_offset ? m_size - m_offset : 0)) { + if (m_promise) { + close(); + } return; } + + JSC::gcProtect(&this->controller().jsController()); } void BlobReadableStreamSource::doPull() { - - if (!BlobStore__requestRead(m_store, WeakPtr { *this }, m_offset, m_size > m_offset ? m_size - m_offset : 0)) { + RefPtr weakThis = this; + weakThis->ref(); + if (!BlobStore__requestRead(m_store, m_streamer, weakThis, m_offset, m_size > m_offset ? m_size - m_offset : 0)) { close(); return; } + + JSC::gcProtect(&this->controller().jsController()); } void BlobReadableStreamSource::doCancel() @@ -86,19 +119,25 @@ void BlobReadableStreamSource::close() { if (!m_isCancelled) controller().close(); + + JSC::gcUnprotect(&this->controller().jsController()); } void BlobReadableStreamSource::enqueue(JSC::JSValue value) { if (!m_isCancelled) controller().enqueue(value); + + JSC::gcUnprotect(&this->controller().jsController()); } bool BlobReadableStreamSource::enqueue(const uint8_t* ptr, size_t size) { + if (m_isCancelled) return false; + JSC::gcUnprotect(&this->controller().jsController()); auto arrayBuffer = JSC::ArrayBuffer::tryCreate(ptr, size); if (!arrayBuffer) return false; @@ -107,9 +146,30 @@ bool BlobReadableStreamSource::enqueue(const uint8_t* ptr, size_t size) return true; } +bool BlobReadableStreamSource::enqueue(uint8_t* ptr, size_t read, void* ctx, JSTypedArrayBytesDeallocator bytesDeallocator) +{ + + if (m_isCancelled) { + bytesDeallocator(ctx, ptr); + return false; + } + + JSC::gcUnprotect(&this->controller().jsController()); + + auto buffer = ArrayBuffer::createFromBytes(ptr, read, createSharedTask([bytesDeallocator, ctx](void* p) { + if (bytesDeallocator) { + bytesDeallocator(p, ctx); + } + })); + + controller().enqueue(WTFMove(buffer)); + this->m_offset += read; + return true; +} + BlobReadableStreamSource::~BlobReadableStreamSource() { - BlobStore__deref(m_store); - ReadableStreamSource::~ReadableStreamSource(); + if (m_store) + BlobStore__deref(m_store); } } diff --git a/src/javascript/jsc/bindings/BlobReadableStreamSource.h b/src/javascript/jsc/bindings/BlobReadableStreamSource.h index c053d2b055..a0ce35b591 100644 --- a/src/javascript/jsc/bindings/BlobReadableStreamSource.h +++ b/src/javascript/jsc/bindings/BlobReadableStreamSource.h @@ -15,6 +15,8 @@ public: void close(); void enqueue(JSC::JSValue); bool enqueue(const uint8_t* ptr, size_t length); + bool enqueue(uint8_t* ptr, size_t read, void* deallocator, JSTypedArrayBytesDeallocator bytesDeallocator); + bool isCancelled() const { return m_isCancelled; } void* streamer() const { return m_streamer; } diff --git a/src/javascript/jsc/bindings/BunBuiltinNames.h b/src/javascript/jsc/bindings/BunBuiltinNames.h index d0a60611d5..5c90729f26 100644 --- a/src/javascript/jsc/bindings/BunBuiltinNames.h +++ b/src/javascript/jsc/bindings/BunBuiltinNames.h @@ -43,6 +43,7 @@ using namespace JSC; macro(associatedReadableByteStreamController) \ macro(autoAllocateChunkSize) \ macro(backingMap) \ + macro(bunNativeTag) \ macro(backingSet) \ macro(backpressure) \ macro(backpressureChangePromise) \ @@ -58,10 +59,12 @@ using namespace JSC; macro(closeRequest) \ macro(closeRequested) \ macro(closed) \ + macro(closedPromise) \ macro(closedPromiseCapability) \ macro(code) \ macro(connect) \ macro(controlledReadableStream) \ + macro(controller) \ macro(cork) \ macro(createReadableStream) \ macro(createWritableStreamFromInternal) \ @@ -158,6 +161,7 @@ using namespace JSC; macro(readyPromise) \ macro(readyPromiseCapability) \ macro(relative) \ + macro(releaseLock) \ macro(removeEventListener) \ macro(require) \ macro(resolve) \ diff --git a/src/javascript/jsc/bindings/bindings.cpp b/src/javascript/jsc/bindings/bindings.cpp index 7d46becce9..dcb48e2919 100644 --- a/src/javascript/jsc/bindings/bindings.cpp +++ b/src/javascript/jsc/bindings/bindings.cpp @@ -306,10 +306,8 @@ JSC__JSValue SystemError__toErrorInstance(const SystemError* arg0, JSC::JSValue options = JSC::jsUndefined(); - Structure* errorStructure = JSC_GET_DERIVED_STRUCTURE(vm, errorStructure, globalObject->errorPrototype(), globalObject->errorPrototype()); - JSC::JSObject* result - = JSC::ErrorInstance::create(globalObject, errorStructure, message, options); + = JSC::ErrorInstance::create(globalObject, JSC::ErrorInstance::createStructure(vm, globalObject, globalObject->errorPrototype()), message, options); auto clientData = WebCore::clientData(vm); diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js index 38f6b6f171..b2a16d3eac 100644 --- a/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js +++ b/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js @@ -316,30 +316,28 @@ function readableByteStreamControllerEnqueue(controller, chunk) const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); @assert(!@getByIdDirectPrivate(controller, "closeRequested")); @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable); - const buffer = chunk.buffer; const byteOffset = chunk.byteOffset; const byteLength = chunk.byteLength; - const transferredBuffer = @transferBufferToCurrentRealm(buffer); if (@readableStreamHasDefaultReader(stream)) { if (!@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length) - @readableByteStreamControllerEnqueueChunk(controller, transferredBuffer, byteOffset, byteLength); + @readableByteStreamControllerEnqueueChunk(controller, @transferBufferToCurrentRealm(chunk.buffer), byteOffset, byteLength); else { @assert(!@getByIdDirectPrivate(controller, "queue").content.length); - let transferredView = new @Uint8Array(transferredBuffer, byteOffset, byteLength); + const transferredView = chunk.constructor === @Uint8Array ? chunk : new @Uint8Array(chunk.buffer, byteOffset, byteLength); @readableStreamFulfillReadRequest(stream, transferredView, false); } return; } if (@readableStreamHasBYOBReader(stream)) { - @readableByteStreamControllerEnqueueChunk(controller, transferredBuffer, byteOffset, byteLength); + @readableByteStreamControllerEnqueueChunk(controller, @transferBufferToCurrentRealm(chunk.buffer), byteOffset, byteLength); @readableByteStreamControllerProcessPullDescriptors(controller); return; } @assert(!@isReadableStreamLocked(stream)); - @readableByteStreamControllerEnqueueChunk(controller, transferredBuffer, byteOffset, byteLength); + @readableByteStreamControllerEnqueueChunk(controller, @transferBufferToCurrentRealm(chunk.buffer), byteOffset, byteLength); } // Spec name: readableByteStreamControllerEnqueueChunkToQueue. diff --git a/src/javascript/jsc/bindings/builtins/js/StreamInternals.js b/src/javascript/jsc/bindings/builtins/js/StreamInternals.js index bf1200ec95..9c2103293e 100644 --- a/src/javascript/jsc/bindings/builtins/js/StreamInternals.js +++ b/src/javascript/jsc/bindings/builtins/js/StreamInternals.js @@ -106,15 +106,12 @@ function validateAndNormalizeQueuingStrategy(size, highWaterMark) if (size !== @undefined && typeof size !== "function") @throwTypeError("size parameter must be a function"); - const normalizedStrategy = { - size: size, - highWaterMark: @toNumber(highWaterMark) - }; + const newHighWaterMark = @toNumber(highWaterMark); - if (@isNaN(normalizedStrategy.highWaterMark) || normalizedStrategy.highWaterMark < 0) + if (@isNaN(newHighWaterMark) || newHighWaterMark < 0) @throwRangeError("highWaterMark value is negative or not a number"); - return normalizedStrategy; + return { size: size, highWaterMark: newHighWaterMark }; } function newQueue() diff --git a/src/javascript/jsc/bindings/webcore/JSDOMPromiseDeferred.cpp b/src/javascript/jsc/bindings/webcore/JSDOMPromiseDeferred.cpp index da1f9bdd2b..0c9f83ef7e 100644 --- a/src/javascript/jsc/bindings/webcore/JSDOMPromiseDeferred.cpp +++ b/src/javascript/jsc/bindings/webcore/JSDOMPromiseDeferred.cpp @@ -115,6 +115,17 @@ void DeferredPromise::reject(RejectAsHandled rejectAsHandled) reject(lexicalGlobalObject, JSC::jsUndefined(), rejectAsHandled); } +void DeferredPromise::reject(JSC::JSValue value, RejectAsHandled rejectAsHandled) +{ + if (shouldIgnoreRequestToFulfill()) + return; + ASSERT(deferred()); + ASSERT(m_globalObject); + auto& lexicalGlobalObject = *m_globalObject; + JSC::JSLockHolder locker(&lexicalGlobalObject); + reject(lexicalGlobalObject, value, rejectAsHandled); +} + void DeferredPromise::reject(std::nullptr_t, RejectAsHandled rejectAsHandled) { if (shouldIgnoreRequestToFulfill()) diff --git a/src/javascript/jsc/bindings/webcore/JSDOMPromiseDeferred.h b/src/javascript/jsc/bindings/webcore/JSDOMPromiseDeferred.h index ba65dfa1c4..7016086779 100644 --- a/src/javascript/jsc/bindings/webcore/JSDOMPromiseDeferred.h +++ b/src/javascript/jsc/bindings/webcore/JSDOMPromiseDeferred.h @@ -137,6 +137,7 @@ public: void reject(RejectAsHandled = RejectAsHandled::No); void reject(std::nullptr_t, RejectAsHandled = RejectAsHandled::No); WEBCORE_EXPORT void reject(Exception, RejectAsHandled = RejectAsHandled::No); + WEBCORE_EXPORT void reject(JSC::JSValue, RejectAsHandled = RejectAsHandled::No); WEBCORE_EXPORT void reject(ExceptionCode, const String& = {}, RejectAsHandled = RejectAsHandled::No); void reject(const JSC::PrivateName&, RejectAsHandled = RejectAsHandled::No); diff --git a/src/javascript/jsc/bindings/webcore/JSReadableStreamSource.cpp b/src/javascript/jsc/bindings/webcore/JSReadableStreamSource.cpp index eb231277e6..e9a39758e5 100644 --- a/src/javascript/jsc/bindings/webcore/JSReadableStreamSource.cpp +++ b/src/javascript/jsc/bindings/webcore/JSReadableStreamSource.cpp @@ -103,6 +103,11 @@ const ClassInfo JSReadableStreamSourcePrototype::s_info = { "ReadableStreamSourc void JSReadableStreamSourcePrototype::finishCreation(VM& vm) { Base::finishCreation(vm); + // -- BUN ADDITION -- + auto clientData = WebCore::clientData(vm); + this->putDirect(vm, clientData->builtinNames().bunNativeTagPrivateName(), JSC::jsUndefined(), JSC::PropertyAttribute::DontEnum | JSC::PropertyAttribute::DontDelete | 0); + // -- BUN ADDITION -- + reifyStaticProperties(vm, JSReadableStreamSource::info(), JSReadableStreamSourcePrototypeTableValues, *this); JSC_TO_STRING_TAG_WITHOUT_TRANSITION(); } diff --git a/src/javascript/jsc/bindings/webcore/ReadableStream.cpp b/src/javascript/jsc/bindings/webcore/ReadableStream.cpp index 1e8281a8b3..976d3e8009 100644 --- a/src/javascript/jsc/bindings/webcore/ReadableStream.cpp +++ b/src/javascript/jsc/bindings/webcore/ReadableStream.cpp @@ -33,7 +33,6 @@ #include "JSReadableStreamSource.h" #include "WebCoreJSClientData.h" - namespace WebCore { using namespace JSC; @@ -78,6 +77,24 @@ ExceptionOr> ReadableStream::create(JSC::JSGlobalObject& lex return create(*JSC::jsCast(&lexicalGlobalObject), *jsCast(objectOrException.releaseReturnValue())); } +ExceptionOr> ReadableStream::create(JSC::JSGlobalObject& lexicalGlobalObject, RefPtr&& source, JSC::JSValue nativeTag) +{ + auto& builtinNames = WebCore::builtinNames(lexicalGlobalObject.vm()); + RELEASE_ASSERT(source != nullptr); + + auto objectOrException = invokeConstructor(lexicalGlobalObject, builtinNames.ReadableStreamPrivateName(), [&source, nativeTag](auto& args, auto& lexicalGlobalObject, auto& globalObject) { + auto sourceStream = toJSNewlyCreated(&lexicalGlobalObject, &globalObject, source.releaseNonNull()); + auto tag = WebCore::clientData(lexicalGlobalObject.vm())->builtinNames().bunNativeTagPrivateName(); + sourceStream.getObject()->putDirect(lexicalGlobalObject.vm(), tag, nativeTag, JSC::PropertyAttribute::DontDelete | JSC::PropertyAttribute::DontEnum); + args.append(sourceStream); + }); + + if (objectOrException.hasException()) + return objectOrException.releaseException(); + + return create(*JSC::jsCast(&lexicalGlobalObject), *jsCast(objectOrException.releaseReturnValue())); +} + static inline std::optional invokeReadableStreamFunction(JSC::JSGlobalObject& lexicalGlobalObject, const JSC::Identifier& identifier, JSC::JSValue thisValue, const JSC::MarkedArgumentBuffer& arguments) { JSC::VM& vm = lexicalGlobalObject.vm(); @@ -91,7 +108,7 @@ static inline std::optional invokeReadableStreamFunction(JSC::JSGl auto result = call(&lexicalGlobalObject, function, callData, thisValue, arguments); EXCEPTION_ASSERT(!scope.exception() || vm.hasPendingTerminationException()); if (scope.exception()) - return { }; + return {}; return result; } @@ -120,7 +137,7 @@ std::optional, Ref>> ReadableStrea ASSERT(!arguments.hasOverflowed()); auto returnedValue = invokeReadableStreamFunction(lexicalGlobalObject, privateName, JSC::jsUndefined(), arguments); if (!returnedValue) - return { }; + return {}; auto results = Detail::SequenceConverter>::convert(lexicalGlobalObject, *returnedValue); diff --git a/src/javascript/jsc/bindings/webcore/ReadableStream.h b/src/javascript/jsc/bindings/webcore/ReadableStream.h index 9c122934ff..f332eb5151 100644 --- a/src/javascript/jsc/bindings/webcore/ReadableStream.h +++ b/src/javascript/jsc/bindings/webcore/ReadableStream.h @@ -41,6 +41,7 @@ public: static Ref create(JSDOMGlobalObject& globalObject, JSReadableStream& readableStream) { return adoptRef(*new ReadableStream(globalObject, readableStream)); } static ExceptionOr> create(JSC::JSGlobalObject&, RefPtr&&); + static ExceptionOr> create(JSC::JSGlobalObject& lexicalGlobalObject, RefPtr&& source, JSC::JSValue nativeTag); WEBCORE_EXPORT static bool isDisturbed(JSC::JSGlobalObject&, JSC::JSValue); @@ -55,7 +56,10 @@ public: JSReadableStream* readableStream() const { return guarded(); } private: - ReadableStream(JSDOMGlobalObject& globalObject, JSReadableStream& readableStream) : DOMGuarded(globalObject, readableStream) { } + ReadableStream(JSDOMGlobalObject& globalObject, JSReadableStream& readableStream) + : DOMGuarded(globalObject, readableStream) + { + } }; struct JSReadableStreamWrapperConverter { diff --git a/src/javascript/jsc/bindings/webcore/ReadableStreamDefaultController.cpp b/src/javascript/jsc/bindings/webcore/ReadableStreamDefaultController.cpp index 2e927199a5..229a02d383 100644 --- a/src/javascript/jsc/bindings/webcore/ReadableStreamDefaultController.cpp +++ b/src/javascript/jsc/bindings/webcore/ReadableStreamDefaultController.cpp @@ -92,6 +92,29 @@ void ReadableStreamDefaultController::error(const Exception& exception) invokeReadableStreamDefaultControllerFunction(globalObject(), privateName, arguments); } +void ReadableStreamDefaultController::error(JSC::JSValue error) +{ + JSC::JSGlobalObject& lexicalGlobalObject = this->globalObject(); + auto& vm = lexicalGlobalObject.vm(); + JSC::JSLockHolder lock(vm); + auto scope = DECLARE_THROW_SCOPE(vm); + auto value = JSC::Exception::create(vm, error); + + if (UNLIKELY(scope.exception())) { + ASSERT(vm.hasPendingTerminationException()); + return; + } + + JSC::MarkedArgumentBuffer arguments; + arguments.append(&jsController()); + arguments.append(value); + + auto* clientData = static_cast(vm.clientData); + auto& privateName = clientData->builtinFunctions().readableStreamInternalsBuiltins().readableStreamDefaultControllerErrorPrivateName(); + + invokeReadableStreamDefaultControllerFunction(globalObject(), privateName, arguments); +} + bool ReadableStreamDefaultController::enqueue(JSC::JSValue value) { JSC::JSGlobalObject& lexicalGlobalObject = this->globalObject(); diff --git a/src/javascript/jsc/bindings/webcore/ReadableStreamDefaultController.h b/src/javascript/jsc/bindings/webcore/ReadableStreamDefaultController.h index b92f5ceaf9..bb3bc9409a 100644 --- a/src/javascript/jsc/bindings/webcore/ReadableStreamDefaultController.h +++ b/src/javascript/jsc/bindings/webcore/ReadableStreamDefaultController.h @@ -49,15 +49,14 @@ public: bool enqueue(RefPtr&&); bool enqueue(JSC::JSValue); void error(const Exception&); + void error(JSC::JSValue error); void close(); - -private: - JSReadableStreamDefaultController& jsController() const; - JSDOMGlobalObject& globalObject() const; - + JSReadableStreamDefaultController& jsController() const; // The owner of ReadableStreamDefaultController is responsible to keep uncollected the JSReadableStreamDefaultController. JSReadableStreamDefaultController* m_jsController { nullptr }; + +private: }; inline JSReadableStreamDefaultController& ReadableStreamDefaultController::jsController() const diff --git a/src/javascript/jsc/bindings/webcore/ReadableStreamSource.cpp b/src/javascript/jsc/bindings/webcore/ReadableStreamSource.cpp index 2504b1319f..f969c84b17 100644 --- a/src/javascript/jsc/bindings/webcore/ReadableStreamSource.cpp +++ b/src/javascript/jsc/bindings/webcore/ReadableStreamSource.cpp @@ -81,6 +81,17 @@ void ReadableStreamSource::clean() } } +void ReadableStreamSource::error(JSC::JSValue value) +{ + if (m_promise) { + m_promise->reject(value, RejectAsHandled::Yes); + m_promise = nullptr; + setInactive(); + } else { + controller().error(value); + } +} + void SimpleReadableStreamSource::doCancel() { m_isCancelled = true; diff --git a/src/javascript/jsc/bindings/webcore/ReadableStreamSource.h b/src/javascript/jsc/bindings/webcore/ReadableStreamSource.h index 3540490492..0886ab4234 100644 --- a/src/javascript/jsc/bindings/webcore/ReadableStreamSource.h +++ b/src/javascript/jsc/bindings/webcore/ReadableStreamSource.h @@ -41,6 +41,9 @@ public: void start(ReadableStreamDefaultController&&, DOMPromiseDeferred&&); void pull(DOMPromiseDeferred&&); void cancel(JSC::JSValue); + void error(JSC::JSValue error); + + bool hasController() const { return !!m_controller; } bool isPulling() const { return !!m_promise; } @@ -60,8 +63,9 @@ protected: virtual void doPull() = 0; virtual void doCancel() = 0; -private: std::unique_ptr> m_promise; + +private: std::optional m_controller; }; diff --git a/src/javascript/jsc/javascript.zig b/src/javascript/jsc/javascript.zig index 2be4512594..7f0a572aaf 100644 --- a/src/javascript/jsc/javascript.zig +++ b/src/javascript/jsc/javascript.zig @@ -575,7 +575,7 @@ pub const VirtualMachine = struct { response_objects_pool: ?*Response.Pool = null, rare_data: ?*JSC.RareData = null, - io_: ?IO = null, + poller: JSC.WebCore.Poller = JSC.WebCore.Poller{}, pub fn io(this: *VirtualMachine) *IO { if (this.io_ == null) { @@ -721,32 +721,18 @@ pub const VirtualMachine = struct { // TODO: fix this technical debt pub fn tick(this: *EventLoop) void { - if (this.virtual_machine.io_ == null) { - while (true) { - this.tickConcurrent(); + var poller = &this.virtual_machine.poller; + while (true) { + this.tickConcurrent(); - // this.global.vm().doWork(); + // this.global.vm().doWork(); - while (this.tickWithCount() > 0) {} + while (this.tickWithCount() > 0) {} + poller.tick(); - this.tickConcurrent(); + this.tickConcurrent(); - if (this.tickWithCount() == 0) break; - } else { - while (true) { - this.tickConcurrent(); - this.virtual_machine.io().tick() catch unreachable; - - // this.global.vm().doWork(); - - while (this.tickWithCount() > 0) {} - this.virtual_machine.io().tick() catch unreachable; - - this.tickConcurrent(); - - if (this.tickWithCount() == 0) break; - } - } + if (this.tickWithCount() == 0) break; } } diff --git a/src/javascript/jsc/node/syscall.zig b/src/javascript/jsc/node/syscall.zig index 7d82336c4a..561196791f 100644 --- a/src/javascript/jsc/node/syscall.zig +++ b/src/javascript/jsc/node/syscall.zig @@ -93,6 +93,8 @@ pub const Tag = enum(u8) { sendfile, splice, + kevent, + kqueue, pub var strings = std.EnumMap(Tag, JSC.C.JSStringRef).initFull(null); }; const PathString = @import("../../../global.zig").PathString; @@ -279,14 +281,14 @@ pub fn read(fd: os.fd_t, buf: []u8) Maybe(usize) { pub fn recv(fd: os.fd_t, buf: []u8, flag: u32) Maybe(usize) { if (comptime Environment.isMac) { - const rc = system.@"recvfrom$NOCANCEL"(fd, buf.ptr, bun.len, flag, null, null); + const rc = system.@"recvfrom$NOCANCEL"(fd, buf.ptr, buf.len, flag, null, null); if (Maybe(usize).errnoSys(rc, .recv)) |err| { return err; } return Maybe(usize){ .result = @intCast(usize, rc) }; } else { while (true) { - const rc = linux.recvfrom(fd, buf.ptr, bun.len, flag | os.SOCK.CLOEXEC | os.MSG.NOSIGNAL, null, null); + const rc = linux.recvfrom(fd, buf.ptr, buf.len, flag | os.SOCK.CLOEXEC | linux.MSG.CMSG_CLOEXEC, null, null); if (Maybe(usize).errnoSys(rc, .recv)) |err| { if (err.getErrno() == .INTR) continue; return err; diff --git a/src/javascript/jsc/webcore/encoding.zig b/src/javascript/jsc/webcore/encoding.zig index 3fab481272..d7f0da6c6e 100644 --- a/src/javascript/jsc/webcore/encoding.zig +++ b/src/javascript/jsc/webcore/encoding.zig @@ -63,7 +63,8 @@ pub const TextEncoder = struct { // latin1 always has the same length as utf-8 // so we can use the Gigacage to allocate the buffer var array = JSC.JSValue.createUninitializedUint8Array(ctx.ptr(), zig_str.len); - var buffer = array.asArrayBuffer(ctx.ptr()) orelse return JSC.toInvalidArguments("Out of memory", .{}, ctx); + var buffer = array.asArrayBuffer(ctx.ptr()) orelse + return JSC.toInvalidArguments("Out of memory", .{}, ctx); const result = strings.copyLatin1IntoUTF8(buffer.slice(), []const u8, zig_str.slice()); std.debug.assert(result.written == zig_str.len); return array; diff --git a/src/javascript/jsc/webcore/response.zig b/src/javascript/jsc/webcore/response.zig index 676f694ab0..6ea33259c3 100644 --- a/src/javascript/jsc/webcore/response.zig +++ b/src/javascript/jsc/webcore/response.zig @@ -1042,6 +1042,29 @@ pub const ReadableStream = opaque { offset: usize, length: usize, ) JSC.JSValue; + const Base = @import("../../../ast/base.zig"); + pub const StreamTag = enum(usize) { + invalid = 0, + _, + + pub fn init(filedes: JSC.Node.FileDescriptor) StreamTag { + var bytes = [8]u8{ 1, 0, 0, 0, 0, 0, 0, 0 }; + const filedes_ = @bitCast([8]u8, @as(usize, @truncate(u56, @intCast(usize, filedes)))); + bytes[1..8].* = filedes_[0..7].*; + + return @intToEnum(StreamTag, @bitCast(u64, bytes)); + } + + pub fn fd(this: StreamTag) JSC.Node.FileDescriptor { + var bytes = @bitCast([8]u8, @enumToInt(this)); + if (bytes[0] != 1) { + return std.math.maxInt(JSC.Node.FileDescriptor); + } + var out: u64 = 0; + @bitCast([8]u8, out)[0..7].* = bytes[1..8].*; + return @intCast(JSC.Node.FileDescriptor, out); + } + }; }; // https://developer.mozilla.org/en-US/docs/Web/API/Headers @@ -1104,6 +1127,164 @@ const PathOrBlob = union(enum) { } }; +pub const Poller = struct { + /// kqueue() or epoll() + /// 0 == unset + watch_fd: i32 = 0, + + pub const PlatformSpecificFlags = struct {}; + + const Completion = fn (ctx: ?*anyopaque, sizeOrOffset: i64, flags: u16) void; + const kevent64 = std.os.system.kevent64_s; + pub fn dispatchKQueueEvent(kqueue_event: *const kevent64) void { + if (comptime !Environment.isMac) { + unreachable; + } + + const ptr = @intToPtr(?*anyopaque, kqueue_event.udata); + const callback: Completion = @intToPtr(Completion, kqueue_event.ext[0]); + callback(ptr, @bitCast(i64, kqueue_event.data), kqueue_event.flags); + } + const timeout = std.mem.zeroes(std.os.timespec); + + pub fn watch(this: *Poller, fd: JSC.Node.FileDescriptor, flag: Flag, ctx: ?*anyopaque, completion: Completion) JSC.Node.Maybe(void) { + if (comptime Environment.isLinux) { + std.debug.assert(this.watch_fd != 0); + } else if (comptime Environment.isMac) { + if (this.watch_fd == 0) { + this.watch_fd = std.c.kqueue(); + if (this.watch_fd == -1) { + defer this.watch_fd = 0; + return JSC.Node.Maybe(void).errnoSys(this.watch_fd, .kqueue).?; + } + } + std.debug.assert(this.watch_fd != 0); + var events_list = std.mem.zeroes([2]kevent64); + events_list[0] = switch (flag) { + .read => .{ + .ident = @intCast(u64, fd), + .filter = std.os.system.EVFILT_READ, + .data = 0, + .fflags = 0, + .udata = @ptrToInt(ctx), + .flags = std.c.EV_ADD | std.c.EV_ENABLE | std.c.EV_ONESHOT, + .ext = .{ @ptrToInt(completion), 0 }, + }, + .write => .{ + .ident = @intCast(u64, fd), + .filter = std.os.system.EVFILT_WRITE, + .data = 0, + .fflags = 0, + .udata = @ptrToInt(ctx), + .flags = std.c.EV_ADD | std.c.EV_ENABLE | std.c.EV_ONESHOT, + .ext = .{ @ptrToInt(completion), 0 }, + }, + }; + + // The kevent() system call returns the number of events placed in + // the eventlist, up to the value given by nevents. If the time + // limit expires, then kevent() returns 0. + const rc = std.os.system.kevent64( + this.watch_fd, + &events_list, + 1, + // The same array may be used for the changelist and eventlist. + &events_list, + 1, + 0, + &timeout, + ); + + // If an error occurs while + // processing an element of the changelist and there is enough room + // in the eventlist, then the event will be placed in the eventlist + // with EV_ERROR set in flags and the system error in data. + if (events_list[0].flags == std.c.EV_ERROR) { + return JSC.Node.Maybe(void).errnoSys(events_list[0].data, .kevent).?; + // Otherwise, -1 will be returned, and errno will be set to + // indicate the error condition. + } + + switch (rc) { + std.math.minInt(@TypeOf(rc))...-1 => return JSC.Node.Maybe(void).errnoSys(@enumToInt(std.c.getErrno(rc)), .kevent).?, + 0 => return JSC.Node.Maybe(void).success, + 1 => { + dispatchKQueueEvent(&events_list[0]); + return JSC.Node.Maybe(void).success; + }, + 2 => { + dispatchKQueueEvent(&events_list[0]); + dispatchKQueueEvent(&events_list[1]); + return JSC.Node.Maybe(void).success; + }, + else => unreachable, + } + } else { + @compileError("TODO: Poller"); + } + } + + const kqueue_events_ = std.mem.zeroes([4]kevent64); + pub fn tick(this: *Poller) void { + if (comptime Environment.isMac) { + if (this.watch_fd == 0) return; + + var events_list = kqueue_events_; + // ub extern "c" fn kevent64( + // kq: c_int, + // changelist: [*]const kevent64_s, + // nchanges: c_int, + // eventlist: [*]kevent64_s, + // nevents: c_int, + // flags: c_uint, + // timeout: ?*const timespec, + // ) c_int; + const rc = std.os.system.kevent64( + this.watch_fd, + &events_list, + 0, + // The same array may be used for the changelist and eventlist. + &events_list, + 4, + 0, + &timeout, + ); + + switch (rc) { + std.math.minInt(@TypeOf(rc))...-1 => { + // EINTR is fine + switch (std.c.getErrno(rc)) { + .INTR => return, + else => |errno| std.debug.panic("kevent64() failed: {d}", .{errno}), + } + }, + 0 => {}, + 1 => { + dispatchKQueueEvent(&events_list[0]); + }, + 2 => { + dispatchKQueueEvent(&events_list[0]); + dispatchKQueueEvent(&events_list[1]); + }, + 3 => { + dispatchKQueueEvent(&events_list[0]); + dispatchKQueueEvent(&events_list[1]); + dispatchKQueueEvent(&events_list[2]); + }, + 4 => { + dispatchKQueueEvent(&events_list[0]); + dispatchKQueueEvent(&events_list[1]); + dispatchKQueueEvent(&events_list[2]); + dispatchKQueueEvent(&events_list[3]); + }, + else => unreachable, + } + } + } + + pub const Flag = enum { read, write }; +}; + pub const Blob = struct { size: SizeType = 0, offset: SizeType = 0, @@ -1501,15 +1682,29 @@ pub const Blob = struct { this.deref(); } - extern fn BlobStore__onRead(source: ?*anyopaque, ptr: [*]const u8, len: usize) bool; - extern fn BlobStore__onError(source: ?*anyopaque, ptr: [*]const u8, len: usize) bool; + // pub const BlobStoreStreamer = Pointer + extern fn BlobStore__onRead(source: ?*anyopaque, ptr: ?[*]const u8, len: usize) bool; + extern fn BlobStore__onError(source: ?*anyopaque, system_error: *const JSC.SystemError, global: *JSGlobalObject) void; + extern fn BlobReadableStreamSource_isCancelled(source: ?*anyopaque) bool; + extern fn BlobStore__onReadExternal(source: ?*anyopaque, ptr: [*]u8, read: usize, deallocator: ?*anyopaque, deallocator: JSC.C.JSTypedArrayBytesDeallocator) bool; + + pub export fn BlobStore__onFinish(store: *Store, stream: ?*anyopaque) void { + _ = store; + _ = stream; + const fd = @intCast(JSC.Node.FileDescriptor, @ptrToInt(stream)); + _ = fd; + } pub export fn BlobStore__requestRead( store: *Blob.Store, + stream: ReadableStream.StreamTag, ctx: ?*anyopaque, offset: usize, length: usize, ) bool { + _ = stream; + store.ref(); + defer store.deref(); if (comptime JSC.is_bindgen) unreachable; switch (store.data) { .bytes => |*bytes| { @@ -1527,25 +1722,312 @@ pub const Blob = struct { return BlobStore__onRead(ctx, slice.ptr, slice.len) and has_more; }, - else => return false, + .file => |*file| { + _ = file; + const fd = stream.fd(); + + // invalid fd + if (fd == std.math.maxInt(@TypeOf(fd))) { + return false; + } + const auto_close = file.pathlike == .path; + + const chunk_size = if (length > 0) @minimum(length, max_chunk_size) else max_chunk_size; + + const this_chunk_size = if (file.max_size > 0) + @minimum(chunk_size, file.max_size) + else if (std.os.S.ISFIFO(file.mode)) + fifo_chunk_size + else + chunk_size; + + var buf = bun.default_allocator.alloc( + u8, + this_chunk_size, + ) catch { + const err = JSC.SystemError{ + .code = ZigString.init("ENOMEM"), + .path = if (file.pathlike == .path) + ZigString.init(file.pathlike.path.slice()) + else + ZigString.Empty, + .message = ZigString.init("Out of memory"), + .syscall = ZigString.init("malloc"), + }; + BlobStore__onError(ctx, &err, JSC.VirtualMachine.vm.global); + + if (auto_close) { + _ = JSC.Node.Syscall.close(fd); + } + return false; + }; + + const rc = // read() for files + JSC.Node.Syscall.read(fd, buf); + + switch (rc) { + .err => |err| { + const retry = comptime if (Environment.isLinux) + std.os.E.WOULDBLOCK + else + std.os.E.AGAIN; + + switch (err.getErrno()) { + retry => { + outer: { + NetworkThread.init() catch break :outer; + + _ = AsyncStreamRequest.init( + bun.default_allocator, + ctx, + buf, + fd, + file.mode, + if (!std.os.S.ISREG(file.mode)) @as(?u64, null) else offset, + ) catch { + const sys = JSC.SystemError{ + .code = ZigString.init("ENOMEM"), + .path = if (file.pathlike == .path) + ZigString.init(file.pathlike.path.slice()) + else + ZigString.Empty, + .message = ZigString.init("Out of memory"), + .syscall = ZigString.init("malloc"), + }; + BlobStore__onError(ctx, &sys, JSC.VirtualMachine.vm.global); + + if (auto_close) { + _ = JSC.Node.Syscall.close(fd); + } + return false; + }; + return true; + } + }, + else => {}, + } + const sys = err.toSystemError(); + + if (auto_close) { + _ = JSC.Node.Syscall.close(fd); + } + + BlobStore__onError(ctx, &sys, JSC.VirtualMachine.vm.global); + bun.default_allocator.free(buf); + return false; + }, + .result => |result| { + // this handles: + // - empty file + // - stream closed for some reason + if ((result == 0 and (file.seekable orelse false)) or + BlobReadableStreamSource_isCancelled(ctx)) + { + bun.default_allocator.free(buf); + return false; + } + + const will_continue = BlobStore__onReadExternal( + ctx, + buf.ptr, + result, + buf.ptr, + JSC.MarkedArrayBuffer_deallocator, + ) and + // if it's not a regular file, we don't know how much to read so we should continue + (!(file.seekable orelse false) or + // if it is a regular file, we stop reading when we've read all the data + !((file.seekable orelse false) and @intCast(SizeType, result + offset) >= file.max_size)); + + return will_continue; + }, + } + }, } + + return false; } + const StreamingRead = struct { + buf_ptr: [*]u8, + buf_len: SizeType = 0, + buf_tail: SizeType = 0, + ref_count: u32 = 0, + store: *Blob.Store, + ctx: ?*anyopaque = null, + + has_pending_read: bool = false, + + pub fn owns(this: *const StreamingRead, ptr: ?*const anyopaque) bool { + const addr = @ptrToInt(ptr); + return addr >= @ptrToInt(this.buf_ptr) and addr < @ptrToInt(this.buf_ptr) + this.buf_len; + } + + pub fn deref(this: *StreamingRead) void { + if (this.ref_count == 0) + unreachable; + + this.ref_count -= 1; + if (this.ref_count == 0 and !this.has_pending_read) { + bun.default_allocator.destroy(this); + return; + } + } + + pub fn ref(this: *StreamingRead) void { + this.ref_count +|= 1; + } + }; + + const AsyncStreamRequest = struct { + buf: []u8, + fd: JSC.Node.FileDescriptor, + // task: NetworkThread.Task = .{ .callback = callback }, + // completion: AsyncIO.Completion = undefined, + offset: ?u64 = null, + read_len: u64 = 0, + loop: *JSC.VirtualMachine.EventLoop = undefined, + allocator: std.mem.Allocator, + source: ?*anyopaque, + mode: JSC.Node.Mode, + + pub fn init(allocator: std.mem.Allocator, source: ?*anyopaque, buf: []u8, fd: JSC.Node.FileDescriptor, mode: JSC.Node.Mode, offset: ?u64) !*AsyncStreamRequest { + var req = try allocator.create(AsyncStreamRequest); + req.* = .{ + .buf = buf, + .fd = fd, + .source = source, + .offset = offset, + .loop = JSC.VirtualMachine.vm.eventLoop(), + .allocator = allocator, + .mode = mode, + }; + _ = JSC.VirtualMachine.vm.poller.watch(fd, .read, req, callback); + return req; + } + + pub fn callback(task: ?*anyopaque, sizeOrOffset: i64, _: u16) void { + var this: *AsyncStreamRequest = bun.cast(*AsyncStreamRequest, task.?); + var buf = this.buf; + if (comptime Environment.isMac) { + if (std.os.S.ISREG(this.mode)) { + + // Returns when the file pointer is not at the end of + // file. data contains the offset from current position + // to end of file, and may be negative. + buf = buf[0..@minimum(@intCast(usize, sizeOrOffset), this.buf.len)]; + } else if (std.os.S.ISCHR(this.mode) or std.os.S.ISFIFO(this.mode)) { + buf = buf[0..@minimum(@intCast(usize, sizeOrOffset), this.buf.len)]; + } + } + + const rc = JSC.Node.Syscall.read(this.fd, buf); + + switch (rc) { + .err => |err| { + // const retry = comptime if (Environment.isLinux) + // std.os.E.WOULDBLOCK + // else + // std.os.E.AGAIN; + + // switch (err.getErrno()) { + // retry => { + // outer: { + // NetworkThread.init() catch break :outer; + + // _ = AsyncStreamRequest.init( + // bun.default_allocator, + // this.source, + // buf, + // fd, + // if (!std.os.S.ISREG(file.mode)) null else offset, + // ) catch { + // const sys = JSC.SystemError{ + // .code = ZigString.init("ENOMEM"), + // .path = if (file.pathlike == .path) + // ZigString.init(file.pathlike.path.slice()) + // else + // ZigString.Empty, + // .message = ZigString.init("Out of memory"), + // .syscall = ZigString.init("malloc"), + // }; + // BlobStore__onError(ctx, &sys, JSC.VirtualMachine.vm.global); + + // if (auto_close) { + // _ = JSC.Node.Syscall.close(fd); + // } + // return false; + // }; + // return true; + // } + // }, + // else => {}, + // } + const sys = err.toSystemError(); + + BlobStore__onError(this.source, &sys, JSC.VirtualMachine.vm.global); + const allocator = this.allocator; + allocator.free(this.buf); + allocator.destroy(this); + return; + }, + .result => |result| { + // this handles: + // - empty file + // - stream closed for some reason + if (result == 0) { + _ = BlobStore__onRead(this.source, null, 0); + return; + } + + _ = BlobStore__onReadExternal( + this.source, + buf.ptr, + result, + buf.ptr, + JSC.MarkedArrayBuffer_deallocator, + ); + }, + } + // } + + } + + pub fn runFromJS(task: *anyopaque) void { + var this: *AsyncStreamRequest = bun.cast(*AsyncStreamRequest, task); + var allocator = this.allocator; + defer allocator.destroy(this); + if (this.errno) |err| { + const system_error = JSC.SystemError{ + .code = ZigString.init(std.mem.span(@errorName(err))), + .path = ZigString.Empty, // TODO + .syscall = ZigString.init("read"), + }; + BlobStore__onError(this.source, &system_error, this.loop.global); + return; + } + + _ = BlobStore__onReadExternal(this.source, this.buf.ptr, this.read_len, this.buf.ptr, JSC.MarkedArrayBuffer_deallocator); + } + }; + + const fifo_chunk_size = 512; pub export fn BlobStore__requestStart( store: *Blob.Store, + streamer: *ReadableStream.StreamTag, ctx: ?*anyopaque, offset: usize, length: usize, ) bool { + store.ref(); + defer store.deref(); if (comptime JSC.is_bindgen) unreachable; + const chunk_size = if (length > 0) @minimum(length, max_chunk_size) else max_chunk_size; switch (store.data) { .bytes => |*bytes| { var slice = bytes.slice(); var base = slice[@minimum(offset, slice.len)..]; slice = base; - slice = if (length > 0) - slice[0..@minimum(@minimum(slice.len, length), max_chunk_size)] - else - slice[0..@minimum(slice.len, max_chunk_size)]; + slice = slice[0..@minimum(slice.len, chunk_size)]; if (slice.len == 0) return false; @@ -1553,7 +2035,229 @@ pub const Blob = struct { return BlobStore__onRead(ctx, slice.ptr, slice.len) and has_more; }, - else => return false, + .file => |*file| { + var file_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined; + var auto_close = file.pathlike != .fd; + var fd = if (!auto_close) + file.pathlike.fd + else switch (JSC.Node.Syscall.open(file.pathlike.path.sliceZ(&file_buf), std.os.O.RDONLY | std.os.O.NONBLOCK | std.os.O.CLOEXEC, 0)) { + .result => |_fd| _fd, + .err => |err| { + BlobStore__onError(ctx, &err.withPath(file.pathlike.path.slice()).toSystemError(), JSC.VirtualMachine.vm.global); + return false; + }, + }; + + if (!auto_close) { + // ensure we have non-blocking IO set + const flags = (std.os.fcntl(fd, std.os.F.GETFL, 0) catch return false); + + // if we do not, clone the descriptor and set non-blocking + // it is important for us to clone it so we don't cause Weird Things to happen + if ((flags & std.os.O.NONBLOCK) == 0) { + auto_close = false; + fd = @intCast(@TypeOf(fd), std.os.fcntl(fd, std.os.F.DUPFD, 0) catch return false); + _ = std.os.fcntl(fd, std.os.F.SETFL, flags | std.os.O.NONBLOCK) catch return false; + } + } + + const stat: std.os.Stat = switch (JSC.Node.Syscall.fstat(fd)) { + .result => |result| result, + .err => |err| { + BlobStore__onError(ctx, &err.withPath(file.pathlike.path.slice()).toSystemError(), JSC.VirtualMachine.vm.global); + + if (auto_close) { + _ = JSC.Node.Syscall.close(fd); + } + return false; + }, + }; + + if (std.os.S.ISDIR(stat.mode)) { + const err = JSC.SystemError{ + .code = ZigString.init("EISDIR"), + .path = if (file.pathlike == .path) + ZigString.init(file.pathlike.path.slice()) + else + ZigString.Empty, + .message = ZigString.init("Directories cannot be read like files"), + .syscall = ZigString.init("read"), + }; + + BlobStore__onError(ctx, &err, JSC.VirtualMachine.vm.global); + + if (auto_close) { + _ = JSC.Node.Syscall.close(fd); + } + return false; + } + + if (std.os.S.ISSOCK(stat.mode)) { + const err = JSC.SystemError{ + .code = ZigString.init("ENOTSUP"), + .path = if (file.pathlike == .path) + ZigString.init(file.pathlike.path.slice()) + else + ZigString.Empty, + .message = ZigString.init("Sockets cannot be read like files with this API"), + .syscall = ZigString.init("read"), + }; + + BlobStore__onError(ctx, &err, JSC.VirtualMachine.vm.global); + + if (auto_close) { + _ = JSC.Node.Syscall.close(fd); + } + return false; + } + + // if (comptime Environment.isMac) { + // if (std.os.S.ISSOCK(stat.mode)) { + // // darwin doesn't support os.MSG.NOSIGNAL, + // // but instead a socket option to avoid SIGPIPE. + // const _bytes = &std.mem.toBytes(@as(c_int, 1)); + // _ = std.os.darwin.setsockopt(fd, std.os.SOL.SOCKET, std.os.SO.NOSIGPIPE, _bytes, @intCast(std.os.socklen_t, _bytes.len)); + // } + // } + + file.seekable = std.os.S.ISREG(stat.mode); + file.mode = @intCast(JSC.Node.Mode, stat.mode); + + if (file.seekable orelse false) + file.max_size = @intCast(SizeType, stat.size); + + if ((file.seekable orelse false) and file.max_size == 0) { + if (auto_close) { + _ = JSC.Node.Syscall.close(fd); + } + return false; + } + + const this_chunk_size = if (file.max_size > 0) + @minimum(chunk_size, file.max_size) + else if (std.os.S.ISFIFO(stat.mode)) + fifo_chunk_size + else + chunk_size; + + var buf = bun.default_allocator.alloc( + u8, + this_chunk_size, + ) catch { + const err = JSC.SystemError{ + .code = ZigString.init("ENOMEM"), + .path = if (file.pathlike == .path) + ZigString.init(file.pathlike.path.slice()) + else + ZigString.Empty, + .message = ZigString.init("Out of memory"), + .syscall = ZigString.init("malloc"), + }; + BlobStore__onError(ctx, &err, JSC.VirtualMachine.vm.global); + + if (auto_close) { + _ = JSC.Node.Syscall.close(fd); + } + return false; + }; + + const rc = // read() for files + JSC.Node.Syscall.read(fd, buf); + + switch (rc) { + .err => |err| { + const retry = comptime if (Environment.isLinux) + std.os.E.WOULDBLOCK + else + std.os.E.AGAIN; + + switch (err.getErrno()) { + retry => { + outer: { + NetworkThread.init() catch break :outer; + + _ = AsyncStreamRequest.init( + bun.default_allocator, + ctx, + buf, + fd, + file.mode, + if (!std.os.S.ISREG(stat.mode)) @as(?u64, null) else offset, + ) catch { + const sys = JSC.SystemError{ + .code = ZigString.init("ENOMEM"), + .path = if (file.pathlike == .path) + ZigString.init(file.pathlike.path.slice()) + else + ZigString.Empty, + .message = ZigString.init("Out of memory"), + .syscall = ZigString.init("malloc"), + }; + BlobStore__onError(ctx, &sys, JSC.VirtualMachine.vm.global); + + if (auto_close) { + _ = JSC.Node.Syscall.close(fd); + } + return false; + }; + streamer.* = ReadableStream.StreamTag.init(fd); + return true; + } + }, + else => {}, + } + const sys = err.toSystemError(); + + if (auto_close) { + _ = JSC.Node.Syscall.close(fd); + } + + BlobStore__onError(ctx, &sys, JSC.VirtualMachine.vm.global); + bun.default_allocator.free(buf); + return false; + }, + .result => |result| { + // this handles: + // - empty file + // - stream closed for some reason + if ((result == 0 and (file.seekable orelse false)) or + BlobReadableStreamSource_isCancelled(ctx)) + { + if (auto_close) { + _ = JSC.Node.Syscall.close(fd); + } + + bun.default_allocator.free(buf); + return false; + } + + const will_continue = BlobStore__onReadExternal( + ctx, + buf.ptr, + result, + buf.ptr, + JSC.MarkedArrayBuffer_deallocator, + ) and + // if it's not a regular file, we don't know how much to read so we should continue + (!(file.seekable orelse false) or + // if it is a regular file, we stop reading when we've read all the data + !((file.seekable orelse false) and @intCast(SizeType, result) >= file.max_size)); + + // did the first read cover the whole file? + // if yes, then we are done! + // we can safely close the file descriptor now + if (auto_close and !will_continue) { + _ = JSC.Node.Syscall.close(fd); + } + + if (will_continue) { + streamer.* = ReadableStream.StreamTag.init(fd); + } + + return will_continue; + }, + } + }, } } @@ -2754,6 +3458,8 @@ pub const Blob = struct { mime_type: HTTPClient.MimeType = HTTPClient.MimeType.other, is_atty: ?bool = null, mode: JSC.Node.Mode = 0, + seekable: ?bool = null, + max_size: SizeType = 0, pub fn init(pathlike: JSC.Node.PathOrFileDescriptor, mime_type: ?HTTPClient.MimeType) FileStore { return .{ .pathlike = pathlike, .mime_type = mime_type orelse HTTPClient.MimeType.other }; diff --git a/src/js_parser.zig b/src/js_parser.zig index 4ddeba8964..d0d8cc5129 100644 --- a/src/js_parser.zig +++ b/src/js_parser.zig @@ -13437,7 +13437,7 @@ fn NewParser_( // jsxDEV(type, arguments, key, isStaticChildren, source, self) // jsx(type, arguments, key) const include_filename = FeatureFlags.include_filename_in_jsx and p.options.jsx.development; - const args = p.allocator.alloc(Expr, if (p.options.jsx.development) @as(usize, 6) else @as(usize, 4)) catch unreachable; + const args = p.allocator.alloc(Expr, if (p.options.jsx.development) @as(usize, 6) else @as(usize, 2) + @as(usize, @boolToInt(e_.key != null))) catch unreachable; args[0] = tag; const allocator = p.allocator; var props = e_.properties.list(); @@ -13500,7 +13500,7 @@ fn NewParser_( if (e_.key) |key| { args[2] = key; - } else { + } else if (p.options.jsx.development) { // if (maybeKey !== undefined) args[2] = Expr{ .loc = expr.loc, diff --git a/src/runtime.js b/src/runtime.js index 4ac76bff09..485cdc6f6f 100644 --- a/src/runtime.js +++ b/src/runtime.js @@ -9,16 +9,16 @@ var __getOwnPropDesc = Object.getOwnPropertyDescriptor; // We're disabling Object.freeze because it breaks CJS => ESM and can cause // issues with Suspense and other things that expect the CJS module namespace // to be mutable when the ESM module namespace is NOT mutable -var __objectFreezePolyfill = new WeakSet(); +// var __objectFreezePolyfill = new WeakSet(); -globalThis.Object.freeze = function freeze(obj) { - __objectFreezePolyfill.add(obj); - return obj; -}; +// globalThis.Object.freeze = function freeze(obj) { +// __objectFreezePolyfill.add(obj); +// return obj; +// }; -globalThis.Object.isFrozen = function isFrozen(obj) { - return __objectFreezePolyfill.has(obj); -}; +// globalThis.Object.isFrozen = function isFrozen(obj) { +// return __objectFreezePolyfill.has(obj); +// }; export var __markAsModule = (target) => __defProp(target, "__esModule", { value: true, configurable: true });