diff --git a/.github/workflows/format.yml b/.github/workflows/format.yml index 294b871402..a6f40779d2 100644 --- a/.github/workflows/format.yml +++ b/.github/workflows/format.yml @@ -23,5 +23,7 @@ jobs: name: Run format uses: ./.github/workflows/run-format.yml secrets: inherit + permissions: + contents: write with: zig-version: 0.13.0 diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index c715c62fda..d57fbee820 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -1869,6 +1869,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp blob: JSC.WebCore.AnyBlob = JSC.WebCore.AnyBlob{ .Blob = .{} }, sendfile: SendfileContext = undefined, + + request_body_readable_stream_ref: JSC.WebCore.ReadableStream.Strong = .{}, request_body: ?*JSC.BodyValueRef = null, request_body_buf: std.ArrayListUnmanaged(u8) = .{}, request_body_content_len: usize = 0, @@ -2411,6 +2413,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp this.response_jsvalue = JSC.JSValue.zero; } + this.request_body_readable_stream_ref.deinit(); + if (this.request_weakref.get()) |request| { request.request_context = AnyRequestContext.Null; this.request_weakref.deinit(); @@ -3852,43 +3856,46 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp const vm = this.server.?.vm; const globalThis = this.server.?.globalThis; + // After the user does request.body, + // if they then do .text(), .arrayBuffer(), etc + // we can no longer hold the strong reference from the body value ref. + if (this.request_body_readable_stream_ref.get()) |readable| { + assert(this.request_body_buf.items.len == 0); + vm.eventLoop().enter(); + defer vm.eventLoop().exit(); + + if (!last) { + readable.ptr.Bytes.onData( + .{ + .temporary = bun.ByteList.initConst(chunk), + }, + bun.default_allocator, + ); + } else { + var strong = this.request_body_readable_stream_ref; + this.request_body_readable_stream_ref = .{}; + defer strong.deinit(); + if (this.request_body) |request_body| { + _ = request_body.unref(); + this.request_body = null; + } + + readable.value.ensureStillAlive(); + readable.ptr.Bytes.onData( + .{ + .temporary_and_done = bun.ByteList.initConst(chunk), + }, + bun.default_allocator, + ); + } + + return; + } + // This is the start of a task, so it's a good time to drain if (this.request_body != null) { var body = this.request_body.?; - if (body.value == .Locked) { - if (body.value.Locked.readable.get()) |readable| { - if (readable.ptr == .Bytes) { - assert(this.request_body_buf.items.len == 0); - vm.eventLoop().enter(); - defer vm.eventLoop().exit(); - - if (!last) { - readable.ptr.Bytes.onData( - .{ - .temporary = bun.ByteList.initConst(chunk), - }, - bun.default_allocator, - ); - } else { - var prev = body.value.Locked.readable; - body.value.Locked.readable = .{}; - readable.value.ensureStillAlive(); - defer prev.deinit(); - readable.value.ensureStillAlive(); - readable.ptr.Bytes.onData( - .{ - .temporary_and_done = bun.ByteList.initConst(chunk), - }, - bun.default_allocator, - ); - } - - return; - } - } - } - if (last) { var bytes = &this.request_body_buf; @@ -3989,6 +3996,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } } + pub fn onRequestBodyReadableStreamAvailable(ptr: *anyopaque, globalThis: *JSC.JSGlobalObject, readable: JSC.WebCore.ReadableStream) void { + var this = bun.cast(*RequestContext, ptr); + bun.debugAssert(this.request_body_readable_stream_ref.held.ref == null); + this.request_body_readable_stream_ref = JSC.WebCore.ReadableStream.Strong.init(readable, globalThis); + } + pub fn onStartBufferingCallback(this: *anyopaque) void { onStartBuffering(bun.cast(*RequestContext, this)); } @@ -6792,6 +6805,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp .global = this.globalThis, .onStartBuffering = RequestContext.onStartBufferingCallback, .onStartStreaming = RequestContext.onStartStreamingRequestBodyCallback, + .onReadableStreamAvailable = RequestContext.onRequestBodyReadableStreamAvailable, }, }; ctx.flags.is_waiting_for_request_body = true; diff --git a/src/bun.js/api/streams.classes.ts b/src/bun.js/api/streams.classes.ts index e766278c26..f4419c5db8 100644 --- a/src/bun.js/api/streams.classes.ts +++ b/src/bun.js/api/streams.classes.ts @@ -39,6 +39,32 @@ function source(name) { isClosed: { getter: "getIsClosedFromJS", }, + ...(name !== "File" + ? // Buffered versions + // not implemented in File, yet. + { + text: { + fn: "textFromJS", + length: 0, + }, + json: { + fn: "jsonFromJS", + length: 0, + }, + arrayBuffer: { + fn: "arrayBufferFromJS", + length: 0, + }, + blob: { + fn: "blobFromJS", + length: 0, + }, + bytes: { + fn: "bytesFromJS", + length: 0, + }, + } + : {}), ...(name === "File" ? { setRawMode: { diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig index 504fe02cc1..c140be3e69 100644 --- a/src/bun.js/base.zig +++ b/src/bun.js/base.zig @@ -430,6 +430,19 @@ pub const ArrayBuffer = extern struct { return out; } + extern "C" fn JSArrayBuffer__fromDefaultAllocator(*JSC.JSGlobalObject, ptr: [*]u8, len: usize) JSC.JSValue; + pub fn toJSFromDefaultAllocator(globalThis: *JSC.JSGlobalObject, bytes: []u8) JSC.JSValue { + return JSArrayBuffer__fromDefaultAllocator(globalThis, bytes.ptr, bytes.len); + } + + pub fn fromDefaultAllocator(globalThis: *JSC.JSGlobalObject, bytes: []u8, comptime typed_array_type: JSC.JSValue.JSType) JSC.JSValue { + return switch (typed_array_type) { + .ArrayBuffer => JSArrayBuffer__fromDefaultAllocator(globalThis, bytes.ptr, bytes.len), + .Uint8Array => JSC.JSUint8Array.fromBytes(globalThis, bytes), + else => @compileError("Not implemented yet"), + }; + } + pub fn fromBytes(bytes: []u8, typed_array_type: JSC.JSValue.JSType) ArrayBuffer { return ArrayBuffer{ .offset = 0, .len = @as(u32, @intCast(bytes.len)), .byte_len = @as(u32, @intCast(bytes.len)), .typed_array_type = typed_array_type, .ptr = bytes.ptr }; } diff --git a/src/bun.js/bindings/BunObject.cpp b/src/bun.js/bindings/BunObject.cpp index 703525981d..caed3789fb 100644 --- a/src/bun.js/bindings/BunObject.cpp +++ b/src/bun.js/bindings/BunObject.cpp @@ -86,6 +86,14 @@ static inline JSC::EncodedJSValue flattenArrayOfBuffersIntoArrayBufferOrUint8Arr size_t arrayLength = array->length(); if (arrayLength < 1) { + if (asUint8Array) { + return JSValue::encode( + JSC::JSUint8Array::create( + lexicalGlobalObject, + lexicalGlobalObject->m_typedArrayUint8.get(lexicalGlobalObject), + 0)); + } + RELEASE_AND_RETURN(throwScope, JSValue::encode(JSC::JSArrayBuffer::create(vm, lexicalGlobalObject->arrayBufferStructure(), JSC::ArrayBuffer::create(static_cast(0), 1)))); } diff --git a/src/bun.js/bindings/Uint8Array.cpp b/src/bun.js/bindings/Uint8Array.cpp index baa3c73003..697bf61a5a 100644 --- a/src/bun.js/bindings/Uint8Array.cpp +++ b/src/bun.js/bindings/Uint8Array.cpp @@ -1,17 +1,14 @@ #include "root.h" +#include "JavaScriptCore/JSArrayBuffer.h" #include "JavaScriptCore/TypedArrayType.h" -#include "JavaScriptCore/JSArrayBufferViewInlines.h" -#include "JavaScriptCore/JSArrayBufferView.h" -#include "JavaScriptCore/JSTypedArrayViewPrototype.h" #include "mimalloc.h" namespace Bun { extern "C" JSC::EncodedJSValue JSUint8Array__fromDefaultAllocator(JSC::JSGlobalObject* lexicalGlobalObject, uint8_t* ptr, size_t length) { - - JSC::JSUint8Array* uint8Array = nullptr; + JSC::JSUint8Array* uint8Array; if (LIKELY(length > 0)) { auto buffer = ArrayBuffer::createFromBytes({ ptr, length }, createSharedTask([](void* p) { @@ -25,4 +22,23 @@ extern "C" JSC::EncodedJSValue JSUint8Array__fromDefaultAllocator(JSC::JSGlobalO return JSC::JSValue::encode(uint8Array); } + +extern "C" JSC::EncodedJSValue JSArrayBuffer__fromDefaultAllocator(JSC::JSGlobalObject* lexicalGlobalObject, uint8_t* ptr, size_t length) +{ + + JSC::JSArrayBuffer* arrayBuffer; + + if (LIKELY(length > 0)) { + RefPtr buffer = ArrayBuffer::createFromBytes({ ptr, length }, createSharedTask([](void* p) { + mi_free(p); + })); + + arrayBuffer = JSC::JSArrayBuffer::create(lexicalGlobalObject->vm(), lexicalGlobalObject->arrayBufferStructure(), WTFMove(buffer)); + } else { + arrayBuffer = JSC::JSArrayBuffer::create(lexicalGlobalObject->vm(), lexicalGlobalObject->arrayBufferStructure(), nullptr); + } + + return JSC::JSValue::encode(arrayBuffer); +} + } \ No newline at end of file diff --git a/src/bun.js/webcore/blob.zig b/src/bun.js/webcore/blob.zig index 0148703fc7..cf6592aaa3 100644 --- a/src/bun.js/webcore/blob.zig +++ b/src/bun.js/webcore/blob.zig @@ -1706,6 +1706,21 @@ pub const Blob = struct { assert(old > 0); } + pub fn hasOneRef(this: *const Store) bool { + return this.ref_count.load(.monotonic) == 1; + } + + /// Caller is responsible for derefing the Store. + pub fn toAnyBlob(this: *Store) ?AnyBlob { + if (this.hasOneRef()) { + if (this.data == .bytes) { + return .{ .InternalBlob = this.data.bytes.toInternalBlob() }; + } + } + + return null; + } + pub fn external(ptr: ?*anyopaque, _: ?*anyopaque, _: usize) callconv(.C) void { if (ptr == null) return; var this = bun.cast(*Store, ptr); @@ -3202,6 +3217,20 @@ pub const Blob = struct { return ByteStore.init(list.items, allocator); } + pub fn toInternalBlob(this: *ByteStore) InternalBlob { + const result = InternalBlob{ + .bytes = std.ArrayList(u8){ + .items = this.ptr[0..this.len], + .capacity = this.cap, + .allocator = this.allocator, + }, + }; + + this.allocator = bun.default_allocator; + this.len = 0; + this.cap = 0; + return result; + } pub fn slice(this: ByteStore) []u8 { return this.ptr[0..this.len]; } @@ -3298,11 +3327,18 @@ pub const Blob = struct { this: *Blob, globalThis: *JSC.JSGlobalObject, _: *JSC.CallFrame, + ) JSC.JSValue { + return this.getTextClone(globalThis); + } + + pub fn getTextClone( + this: *Blob, + globalObject: *JSC.JSGlobalObject, ) JSC.JSValue { const store = this.store; if (store) |st| st.ref(); defer if (store) |st| st.deref(); - return JSC.JSPromise.wrap(globalThis, lifetimeWrap(toString, .clone), .{ this, globalThis }); + return JSC.JSPromise.wrap(globalObject, lifetimeWrap(toString, .clone), .{ this, globalObject }); } pub fn getTextTransfer( @@ -3319,14 +3355,19 @@ pub const Blob = struct { this: *Blob, globalThis: *JSC.JSGlobalObject, _: *JSC.CallFrame, + ) JSC.JSValue { + return this.getJSONShare(globalThis); + } + + pub fn getJSONShare( + this: *Blob, + globalObject: *JSC.JSGlobalObject, ) JSC.JSValue { const store = this.store; if (store) |st| st.ref(); defer if (store) |st| st.deref(); - - return JSC.JSPromise.wrap(globalThis, lifetimeWrap(toJSON, .share), .{ this, globalThis }); + return JSC.JSPromise.wrap(globalObject, lifetimeWrap(toJSON, .share), .{ this, globalObject }); } - pub fn getArrayBufferTransfer( this: *Blob, globalThis: *JSC.JSGlobalObject, @@ -3338,26 +3379,50 @@ pub const Blob = struct { return JSC.JSPromise.wrap(globalThis, lifetimeWrap(toArrayBuffer, .transfer), .{ this, globalThis }); } + pub fn getArrayBufferClone( + this: *Blob, + globalThis: *JSC.JSGlobalObject, + ) JSC.JSValue { + const store = this.store; + if (store) |st| st.ref(); + defer if (store) |st| st.deref(); + return JSC.JSPromise.wrap(globalThis, lifetimeWrap(toArrayBuffer, .clone), .{ this, globalThis }); + } + pub fn getArrayBuffer( this: *Blob, globalThis: *JSC.JSGlobalObject, _: *JSC.CallFrame, + ) JSValue { + return this.getArrayBufferClone(globalThis); + } + + pub fn getBytesClone( + this: *Blob, + globalThis: *JSC.JSGlobalObject, ) JSValue { const store = this.store; if (store) |st| st.ref(); defer if (store) |st| st.deref(); - return JSC.JSPromise.wrap(globalThis, lifetimeWrap(toArrayBuffer, .clone), .{ this, globalThis }); + return JSC.JSPromise.wrap(globalThis, lifetimeWrap(toUint8Array, .clone), .{ this, globalThis }); } pub fn getBytes( this: *Blob, globalThis: *JSC.JSGlobalObject, _: *JSC.CallFrame, + ) JSValue { + return this.getBytesClone(globalThis); + } + + pub fn getBytesTransfer( + this: *Blob, + globalThis: *JSC.JSGlobalObject, ) JSValue { const store = this.store; if (store) |st| st.ref(); defer if (store) |st| st.deref(); - return JSC.JSPromise.wrap(globalThis, lifetimeWrap(toUint8Array, .clone), .{ this, globalThis }); + return JSC.JSPromise.wrap(globalThis, lifetimeWrap(toUint8Array, .transfer), .{ this, globalThis }); } pub fn getFormData( @@ -4759,6 +4824,14 @@ pub const AnyBlob = union(enum) { InternalBlob: InternalBlob, WTFStringImpl: bun.WTF.StringImpl, + pub fn hasOneRef(this: *const AnyBlob) bool { + if (this.store()) |s| { + return s.hasOneRef(); + } + + return false; + } + pub fn getFileName(this: *const AnyBlob) ?[]const u8 { return switch (this.*) { .Blob => this.Blob.getFileName(), @@ -4783,6 +4856,65 @@ pub const AnyBlob = union(enum) { }; } + fn toInternalBlobIfPossible(this: *AnyBlob) void { + if (this.* == .Blob) { + if (this.Blob.store) |s| { + if (s.data == .bytes and s.hasOneRef()) { + this.* = .{ .InternalBlob = s.data.bytes.toInternalBlob() }; + s.deref(); + return; + } + } + } + } + + pub fn toActionValue(this: *AnyBlob, globalThis: *JSGlobalObject, action: JSC.WebCore.BufferedReadableStreamAction) JSC.JSValue { + if (action != .blob) { + this.toInternalBlobIfPossible(); + } + + switch (action) { + .text => { + if (this.* == .Blob) { + return this.toString(globalThis, .clone); + } + + return this.toStringTransfer(globalThis); + }, + .bytes => { + if (this.* == .Blob) { + return this.toArrayBufferView(globalThis, .clone, .Uint8Array); + } + + return this.toUint8ArrayTransfer(globalThis); + }, + .blob => { + const result = Blob.new(this.toBlob(globalThis)); + result.allocator = bun.default_allocator; + result.globalThis = globalThis; + return result.toJS(globalThis); + }, + .arrayBuffer => { + if (this.* == .Blob) { + return this.toArrayBufferView(globalThis, .clone, .ArrayBuffer); + } + + return this.toArrayBufferTransfer(globalThis); + }, + .json => { + return this.toJSON(globalThis, .share); + }, + } + } + + pub fn toPromise(this: *AnyBlob, globalThis: *JSGlobalObject, action: JSC.WebCore.BufferedReadableStreamAction) JSC.JSValue { + return JSC.JSPromise.wrap(globalThis, toActionValue, .{ this, globalThis, action }); + } + + pub fn wrap(this: *AnyBlob, promise: JSC.AnyPromise, globalThis: *JSGlobalObject, action: JSC.WebCore.BufferedReadableStreamAction) void { + promise.wrap(globalThis, toActionValue, .{ this, globalThis, action }); + } + pub fn toJSON(this: *AnyBlob, global: *JSGlobalObject, comptime lifetime: JSC.WebCore.Lifetime) JSValue { switch (this.*) { .Blob => return this.Blob.toJSON(global, lifetime), @@ -4839,6 +4971,26 @@ pub const AnyBlob = union(enum) { return this.toArrayBuffer(global, .transfer); } + pub fn toBlob(this: *AnyBlob, global: *JSGlobalObject) Blob { + if (this.size() == 0) { + return Blob.initEmpty(global); + } + + if (this.* == .Blob) { + return this.Blob.dupe(); + } + + if (this.* == .WTFStringImpl) { + const blob = Blob.create(this.slice(), bun.default_allocator, global, true); + this.* = .{ .Blob = .{} }; + return blob; + } + + const blob = Blob.init(this.InternalBlob.slice(), this.InternalBlob.bytes.allocator, global); + this.* = .{ .Blob = .{} }; + return blob; + } + pub fn toString(this: *AnyBlob, global: *JSGlobalObject, comptime lifetime: JSC.WebCore.Lifetime) JSValue { switch (this.*) { .Blob => return this.Blob.toString(global, lifetime), @@ -4900,11 +5052,12 @@ pub const AnyBlob = union(enum) { const bytes = this.InternalBlob.toOwnedSlice(); this.* = .{ .Blob = .{} }; - const value = JSC.ArrayBuffer.fromBytes( + + return JSC.ArrayBuffer.fromDefaultAllocator( + global, bytes, TypedArrayView, ); - return value.toJS(global, null); }, .WTFStringImpl => { const str = bun.String.init(this.WTFStringImpl); @@ -4913,11 +5066,11 @@ pub const AnyBlob = union(enum) { const out_bytes = str.toUTF8WithoutRef(bun.default_allocator); if (out_bytes.isAllocated()) { - const value = JSC.ArrayBuffer.fromBytes( + return JSC.ArrayBuffer.fromDefaultAllocator( + global, @constCast(out_bytes.slice()), TypedArrayView, ); - return value.toJS(global, null); } return JSC.ArrayBuffer.create(global, out_bytes.slice(), TypedArrayView); diff --git a/src/bun.js/webcore/body.zig b/src/bun.js/webcore/body.zig index eae591e11b..c1cd64639f 100644 --- a/src/bun.js/webcore/body.zig +++ b/src/bun.js/webcore/body.zig @@ -115,7 +115,7 @@ pub const Body = struct { /// used in HTTP server to ignore request bodies unless asked for it onStartBuffering: ?*const fn (ctx: *anyopaque) void = null, onStartStreaming: ?*const fn (ctx: *anyopaque) JSC.WebCore.DrainResult = null, - onReadableStreamAvailable: ?*const fn (ctx: *anyopaque, readable: JSC.WebCore.ReadableStream) void = null, + onReadableStreamAvailable: ?*const fn (ctx: *anyopaque, globalThis: *JSC.JSGlobalObject, readable: JSC.WebCore.ReadableStream) void = null, size_hint: Blob.SizeType = 0, deinit: bool = false, @@ -194,28 +194,16 @@ pub const Body = struct { pub fn setPromise(value: *PendingValue, globalThis: *JSC.JSGlobalObject, action: Action) JSValue { value.action = action; - if (value.readable.get()) |readable| handle_stream: { + if (value.readable.get()) |readable| { switch (action) { .getFormData, .getText, .getJSON, .getBlob, .getArrayBuffer, .getBytes => { - value.promise = switch (action) { + const promise = switch (action) { .getJSON => globalThis.readableStreamToJSON(readable.value), .getArrayBuffer => globalThis.readableStreamToArrayBuffer(readable.value), .getBytes => globalThis.readableStreamToBytes(readable.value), .getText => globalThis.readableStreamToText(readable.value), .getBlob => globalThis.readableStreamToBlob(readable.value), .getFormData => |form_data| brk: { - if (value.onStartBuffering != null) { - if (readable.isDisturbed(globalThis)) { - form_data.?.deinit(); - value.readable.deinit(); - value.action = .{ .none = {} }; - return JSC.JSPromise.rejectedPromiseValue(globalThis, globalThis.createErrorInstance("ReadableStream is already used", .{})); - } else { - value.readable.deinit(); - } - - break :handle_stream; - } defer { form_data.?.deinit(); value.action.getFormData = null; @@ -228,13 +216,11 @@ pub const Body = struct { }, else => unreachable, }; - value.promise.?.ensureStillAlive(); - - readable.detachIfPossible(globalThis); value.readable.deinit(); - value.promise.?.protect(); - - return value.promise.?; + // The ReadableStream within is expected to keep this Promise alive. + // If you try to protect() this, it will leak memory because the other end of the ReadableStream won't call it. + // See https://github.com/oven-sh/bun/issues/13678 + return promise; }, .none => {}, @@ -534,7 +520,7 @@ pub const Body = struct { }, globalThis); if (locked.onReadableStreamAvailable) |onReadableStreamAvailable| { - onReadableStreamAvailable(locked.task.?, locked.readable.get().?); + onReadableStreamAvailable(locked.task.?, globalThis, locked.readable.get().?); } return locked.readable.get().?.value; @@ -747,7 +733,7 @@ pub const Body = struct { defer async_form_data.deinit(); async_form_data.toJS(global, blob.slice(), promise); }, - else => { + .none, .getBlob => { var blob = Blob.new(new.use()); blob.allocator = bun.default_allocator; if (headers) |fetch_headers| { @@ -1057,7 +1043,7 @@ pub fn BodyMixin(comptime Type: type) type { } if (value.* == .Locked) { - if (value.Locked.isDisturbed(Type, globalObject, callframe.this())) { + if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) { return handleBodyAlreadyUsed(globalObject); } @@ -1089,6 +1075,10 @@ pub fn BodyMixin(comptime Type: type) type { switch (this.getBodyValue().*) { .Used => true, .Locked => |*pending| brk: { + if (pending.action != .none) { + break :brk true; + } + if (pending.readable.get()) |*stream| { break :brk stream.isDisturbed(globalObject); } @@ -1119,10 +1109,14 @@ pub fn BodyMixin(comptime Type: type) type { } if (value.* == .Locked) { - if (value.Locked.isDisturbed(Type, globalObject, callframe.this())) { + if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) { return handleBodyAlreadyUsed(globalObject); } - return value.Locked.setPromise(globalObject, .{ .getJSON = {} }); + + value.toBlobIfPossible(); + if (value.* == .Locked) { + return value.Locked.setPromise(globalObject, .{ .getJSON = {} }); + } } var blob = value.useAsAnyBlobAllowNonUTF8String(); @@ -1146,10 +1140,14 @@ pub fn BodyMixin(comptime Type: type) type { } if (value.* == .Locked) { - if (value.Locked.isDisturbed(Type, globalObject, callframe.this())) { + if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) { return handleBodyAlreadyUsed(globalObject); } - return value.Locked.setPromise(globalObject, .{ .getArrayBuffer = {} }); + value.toBlobIfPossible(); + + if (value.* == .Locked) { + return value.Locked.setPromise(globalObject, .{ .getArrayBuffer = {} }); + } } // toArrayBuffer in AnyBlob checks for non-UTF8 strings @@ -1170,10 +1168,13 @@ pub fn BodyMixin(comptime Type: type) type { } if (value.* == .Locked) { - if (value.Locked.isDisturbed(Type, globalObject, callframe.this())) { + if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) { return handleBodyAlreadyUsed(globalObject); } - return value.Locked.setPromise(globalObject, .{ .getBytes = {} }); + value.toBlobIfPossible(); + if (value.* == .Locked) { + return value.Locked.setPromise(globalObject, .{ .getBytes = {} }); + } } // toArrayBuffer in AnyBlob checks for non-UTF8 strings @@ -1193,9 +1194,10 @@ pub fn BodyMixin(comptime Type: type) type { } if (value.* == .Locked) { - if (value.Locked.isDisturbed(Type, globalObject, callframe.this())) { + if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) { return handleBodyAlreadyUsed(globalObject); } + value.toBlobIfPossible(); } var encoder = this.getFormDataEncoding() orelse { @@ -1233,14 +1235,15 @@ pub fn BodyMixin(comptime Type: type) type { pub fn getBlob( this: *Type, globalObject: *JSC.JSGlobalObject, - _: *JSC.CallFrame, + callframe: *JSC.CallFrame, ) JSC.JSValue { - return this.getBlobWithoutCallFrame(globalObject); + return getBlobWithThisValue(this, globalObject, callframe.this()); } - pub fn getBlobWithoutCallFrame( + pub fn getBlobWithThisValue( this: *Type, globalObject: *JSC.JSGlobalObject, + this_value: JSValue, ) JSC.JSValue { var value: *Body.Value = this.getBodyValue(); @@ -1249,10 +1252,18 @@ pub fn BodyMixin(comptime Type: type) type { } if (value.* == .Locked) { - if (value.Locked.promise == null or value.Locked.promise.?.isEmptyOrUndefinedOrNull()) { + if (value.Locked.action != .none or + ((this_value != .zero and value.Locked.isDisturbed(Type, globalObject, this_value)) or + (this_value == .zero and value.Locked.readable.isDisturbed(globalObject)))) + { + return handleBodyAlreadyUsed(globalObject); + } + + value.toBlobIfPossible(); + + if (value.* == .Locked) { return value.Locked.setPromise(globalObject, .{ .getBlob = {} }); } - return handleBodyAlreadyUsed(globalObject); } var blob = Blob.new(value.use()); @@ -1281,6 +1292,13 @@ pub fn BodyMixin(comptime Type: type) type { } return JSC.JSPromise.resolvedPromiseValue(globalObject, blob.toJS(globalObject)); } + + pub fn getBlobWithoutCallFrame( + this: *Type, + globalObject: *JSC.JSGlobalObject, + ) JSC.JSValue { + return getBlobWithThisValue(this, globalObject, .zero); + } }; } diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index d767a4c4b1..f665dd8619 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -1033,9 +1033,18 @@ pub const Fetch = struct { var prev = this.readable_stream_ref; this.readable_stream_ref = .{}; defer prev.deinit(); + buffer_reset = false; + this.memory_reporter.discard(scheduled_response_buffer.allocatedSlice()); + this.scheduled_response_buffer = .{ + .allocator = bun.default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }; readable.ptr.Bytes.onData( .{ - .temporary_and_done = bun.ByteList.initConst(chunk), + .owned_and_done = bun.ByteList.initConst(chunk), }, bun.default_allocator, ); @@ -1430,9 +1439,9 @@ pub const Fetch = struct { return .{ .SystemError = fetch_error }; } - pub fn onReadableStreamAvailable(ctx: *anyopaque, readable: JSC.WebCore.ReadableStream) void { + pub fn onReadableStreamAvailable(ctx: *anyopaque, globalThis: *JSC.JSGlobalObject, readable: JSC.WebCore.ReadableStream) void { const this = bun.cast(*FetchTasklet, ctx); - this.readable_stream_ref = JSC.WebCore.ReadableStream.Strong.init(readable, this.global_this); + this.readable_stream_ref = JSC.WebCore.ReadableStream.Strong.init(readable, globalThis); } pub fn onStartStreamingRequestBodyCallback(ctx: *anyopaque) JSC.WebCore.DrainResult { @@ -2572,7 +2581,7 @@ pub const Fetch = struct { } if (request) |req| { - if (req.body.value == .Used or (req.body.value == .Locked and req.body.value.Locked.isDisturbed(Request, globalThis, first_arg))) { + if (req.body.value == .Used or (req.body.value == .Locked and (req.body.value.Locked.action != .none or req.body.value.Locked.isDisturbed(Request, globalThis, first_arg)))) { globalThis.ERR_BODY_ALREADY_USED("Request body already used", .{}).throw(); is_error = true; return .zero; diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 4e02f13ace..cfee795eff 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -57,6 +57,14 @@ pub const ReadableStream = struct { return this.held.globalThis; } + pub fn isDisturbed(this: *const Strong, global: *JSC.JSGlobalObject) bool { + if (this.get()) |stream| { + return stream.isDisturbed(global); + } + + return false; + } + pub fn init(this: ReadableStream, global: *JSGlobalObject) Strong { return .{ .held = JSC.Strong.create(this.value, global), @@ -102,13 +110,10 @@ pub const ReadableStream = struct { switch (stream.ptr) { .Blob => |blobby| { - var blob = JSC.WebCore.Blob.initWithStore(blobby.store orelse return null, globalThis); - blob.offset = blobby.offset; - blob.size = blobby.remain; - blob.store.?.ref(); - stream.done(globalThis); - - return AnyBlob{ .Blob = blob }; + if (blobby.toAnyBlob(globalThis)) |blob| { + stream.done(globalThis); + return blob; + } }, .File => |blobby| { if (blobby.lazy == .blob) { @@ -124,11 +129,7 @@ pub const ReadableStream = struct { // If we've received the complete body by the time this function is called // we can avoid streaming it and convert it to a Blob - if (bytes.has_received_last_chunk) { - var blob: JSC.WebCore.AnyBlob = undefined; - blob.from(bytes.buffer); - bytes.buffer.items = &.{}; - bytes.buffer.capacity = 0; + if (bytes.toAnyBlob()) |blob| { stream.done(globalThis); return blob; } @@ -2613,6 +2614,14 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { pub const HTTPSResponseSink = HTTPServerWritable(true); pub const HTTPResponseSink = HTTPServerWritable(false); +pub const BufferedReadableStreamAction = enum { + text, + arrayBuffer, + blob, + bytes, + json, +}; + pub fn ReadableStreamSource( comptime Context: type, comptime name_: []const u8, @@ -2622,6 +2631,7 @@ pub fn ReadableStreamSource( comptime deinit_fn: fn (this: *Context) void, comptime setRefUnrefFn: ?fn (this: *Context, enable: bool) void, comptime drainInternalBuffer: ?fn (this: *Context) bun.ByteList, + comptime toBufferedValue: ?fn (this: *Context, globalThis: *JSC.JSGlobalObject, action: BufferedReadableStreamAction) JSC.JSValue, ) type { return struct { context: Context, @@ -2634,7 +2644,6 @@ pub fn ReadableStreamSource( globalThis: *JSGlobalObject = undefined, this_jsvalue: JSC.JSValue = .zero, is_closed: bool = false, - const This = @This(); const ReadableStreamSourceType = @This(); @@ -2781,6 +2790,11 @@ pub fn ReadableStreamSource( pub const finalize = JSReadableStreamSource.finalize; pub const construct = JSReadableStreamSource.construct; pub const getIsClosedFromJS = JSReadableStreamSource.isClosed; + pub const textFromJS = JSReadableStreamSource.text; + pub const jsonFromJS = JSReadableStreamSource.json; + pub const arrayBufferFromJS = JSReadableStreamSource.arrayBuffer; + pub const blobFromJS = JSReadableStreamSource.blob; + pub const bytesFromJS = JSReadableStreamSource.bytes; pub const JSReadableStreamSource = struct { pub fn construct(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) ?*ReadableStreamSourceType { _ = callFrame; // autofix @@ -2952,6 +2966,66 @@ pub fn ReadableStreamSource( } return JSValue.jsUndefined(); } + + pub fn text(this: *ReadableStreamSourceType, globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) JSC.JSValue { + JSC.markBinding(@src()); + this.this_jsvalue = callFrame.this(); + + if (toBufferedValue) |to_buffered_value| { + return to_buffered_value(&this.context, globalThis, .text); + } + + globalThis.throwTODO("This is not implemented yet"); + return .zero; + } + + pub fn arrayBuffer(this: *ReadableStreamSourceType, globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) JSC.JSValue { + JSC.markBinding(@src()); + this.this_jsvalue = callFrame.this(); + + if (toBufferedValue) |to_buffered_value| { + return to_buffered_value(&this.context, globalThis, .arrayBuffer); + } + + globalThis.throwTODO("This is not implemented yet"); + return .zero; + } + + pub fn blob(this: *ReadableStreamSourceType, globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) JSC.JSValue { + JSC.markBinding(@src()); + this.this_jsvalue = callFrame.this(); + + if (toBufferedValue) |to_buffered_value| { + return to_buffered_value(&this.context, globalThis, .blob); + } + + globalThis.throwTODO("This is not implemented yet"); + return .zero; + } + + pub fn bytes(this: *ReadableStreamSourceType, globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) JSC.JSValue { + JSC.markBinding(@src()); + this.this_jsvalue = callFrame.this(); + + if (toBufferedValue) |to_buffered_value| { + return to_buffered_value(&this.context, globalThis, .bytes); + } + + globalThis.throwTODO("This is not implemented yet"); + return .zero; + } + + pub fn json(this: *ReadableStreamSourceType, globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) JSC.JSValue { + JSC.markBinding(@src()); + this.this_jsvalue = callFrame.this(); + + if (toBufferedValue) |to_buffered_value| { + return to_buffered_value(&this.context, globalThis, .json); + } + + globalThis.throwTODO("This is not implemented yet"); + return .zero; + } }; }; } @@ -4104,6 +4178,7 @@ pub const FileReader = struct { deinit, setRefOrUnref, drain, + null, ); }; @@ -4177,6 +4252,25 @@ pub const ByteBlobLoader = struct { return .{ .into_array = .{ .value = array, .len = copied } }; } + pub fn toAnyBlob(this: *ByteBlobLoader, globalThis: *JSC.JSGlobalObject) ?AnyBlob { + if (this.store) |store| { + _ = this.detachStore(); + if (this.offset == 0 and this.remain == store.size()) { + if (store.toAnyBlob()) |blob| { + defer store.deref(); + return blob; + } + } + + var blob = JSC.WebCore.Blob.initWithStore(store, globalThis); + blob.offset = this.offset; + blob.size = this.remain; + this.parent().is_closed = true; + return .{ .Blob = blob }; + } + return null; + } + pub fn detachStore(this: *ByteBlobLoader) ?*Blob.Store { if (this.store) |store| { this.store = null; @@ -4216,6 +4310,15 @@ pub const ByteBlobLoader = struct { return bun.ByteList.fromList(cloned); } + pub fn toBufferedValue(this: *ByteBlobLoader, globalThis: *JSC.JSGlobalObject, action: BufferedReadableStreamAction) JSC.JSValue { + if (this.toAnyBlob(globalThis)) |blob_| { + var blob = blob_; + return blob.toPromise(globalThis, action); + } + + return .zero; + } + pub const Source = ReadableStreamSource( @This(), "Blob", @@ -4225,6 +4328,7 @@ pub const ByteBlobLoader = struct { deinit, null, drain, + toBufferedValue, ); }; @@ -4276,6 +4380,56 @@ pub const ByteStream = struct { highWaterMark: Blob.SizeType = 0, pipe: Pipe = .{}, size_hint: Blob.SizeType = 0, + buffer_action: ?BufferAction = null, + + const BufferAction = union(BufferedReadableStreamAction) { + text: JSC.JSPromise.Strong, + arrayBuffer: JSC.JSPromise.Strong, + blob: JSC.JSPromise.Strong, + bytes: JSC.JSPromise.Strong, + json: JSC.JSPromise.Strong, + + pub fn fulfill(this: *BufferAction, blob: *AnyBlob) void { + blob.wrap(.{ .Normal = this.swap() }, this.globalThis().?, this.*); + } + pub fn reject(this: *BufferAction, err: StreamResult.StreamError) void { + this.swap().reject(this.globalThis().?, err.toJSWeak(this.globalThis().?)[0]); + } + + pub fn resolve(this: *BufferAction, value_: JSC.JSValue) void { + this.swap().resolve(this.globalThis().?, value_); + } + + pub fn globalThis(this: *BufferAction) ?*JSC.JSGlobalObject { + return switch (this.*) { + inline else => |promise| promise.strong.globalThis, + }; + } + + pub fn value(this: *BufferAction) JSC.JSValue { + return switch (this.*) { + inline else => |promise| promise.value(), + }; + } + + pub fn get(this: *BufferAction) *JSC.JSPromise { + return switch (this.*) { + inline else => |promise| promise.get(), + }; + } + + pub fn swap(this: *BufferAction) *JSC.JSPromise { + return switch (this.*) { + inline else => |*promise| promise.swap(), + }; + } + + pub fn deinit(this: *BufferAction) void { + switch (this.*) { + inline else => |*promise| promise.deinit(), + } + } + }; pub const tag = ReadableStream.Tag.Bytes; @@ -4289,14 +4443,18 @@ pub const ByteStream = struct { } if (this.has_received_last_chunk) { - return .{ .chunk_size = @min(1024 * 1024 * 2, this.buffer.items.len) }; + return .{ .owned_and_done = bun.ByteList.fromList(this.buffer.moveToUnmanaged()) }; } if (this.highWaterMark == 0) { return .{ .ready = {} }; } - return .{ .chunk_size = @max(this.highWaterMark, std.mem.page_size) }; + // For HTTP, the maximum streaming response body size will be 512 KB. + // #define LIBUS_RECV_BUFFER_LENGTH 524288 + // For HTTPS, the size is probably quite a bit lower like 64 KB due to TLS transmission. + // We add 1 extra page size so that if there's a little bit of excess buffered data, we avoid extra allocations. + return .{ .chunk_size = @min(512 * 1024 + std.mem.page_size, @max(this.highWaterMark, std.mem.page_size)) }; } pub fn value(this: *@This()) JSValue { @@ -4341,6 +4499,51 @@ pub const ByteStream = struct { const chunk = stream.slice(); + if (this.buffer_action) |*action| { + if (stream == .err) { + defer { + this.buffer.clearAndFree(); + this.pending.result.deinit(); + this.pending.result = .{ .done = {} }; + this.buffer_action = null; + } + + action.reject(stream.err); + return; + } + + if (this.has_received_last_chunk) { + defer { + this.buffer_action = null; + } + + if (this.buffer.capacity == 0 and stream == .owned_and_done) { + this.buffer = std.ArrayList(u8).fromOwnedSlice(bun.default_allocator, @constCast(chunk)); + var blob = this.toAnyBlob().?; + action.fulfill(&blob); + return; + } + defer { + if (stream == .owned_and_done or stream == .owned) { + allocator.free(stream.slice()); + } + } + + this.buffer.appendSlice(chunk) catch bun.outOfMemory(); + var blob = this.toAnyBlob().?; + action.fulfill(&blob); + return; + } else { + this.buffer.appendSlice(chunk) catch bun.outOfMemory(); + + if (stream == .owned_and_done or stream == .owned) { + allocator.free(stream.slice()); + } + } + + return; + } + if (this.pending.state == .pending) { bun.assert(this.buffer.items.len == 0); const to_copy = this.pending_buffer[0..@min(chunk.len, this.pending_buffer.len)]; @@ -4383,19 +4586,20 @@ pub const ByteStream = struct { const remaining = chunk[to_copy.len..]; if (remaining.len > 0) - this.append(stream, to_copy.len, allocator) catch @panic("Out of memory while copying request body"); + this.append(stream, to_copy.len, chunk, allocator) catch @panic("Out of memory while copying request body"); this.pending.run(); return; } - this.append(stream, 0, allocator) catch @panic("Out of memory while copying request body"); + this.append(stream, 0, chunk, allocator) catch @panic("Out of memory while copying request body"); } pub fn append( this: *@This(), stream: StreamResult, offset: usize, + base_address: []const u8, allocator: std.mem.Allocator, ) !void { const chunk = stream.slice()[offset..]; @@ -4426,12 +4630,22 @@ pub const ByteStream = struct { .temporary_and_done, .temporary => { try this.buffer.appendSlice(chunk); }, + .owned_and_done, .owned => { + try this.buffer.appendSlice(chunk); + allocator.free(@constCast(base_address)); + }, .err => { + if (this.buffer_action != null) { + @panic("Expected buffer action to be null"); + } + this.pending.result = .{ .err = stream.err }; }, // We don't support the rest of these yet else => unreachable, } + + return; } pub fn setValue(this: *@This(), view: JSC.JSValue) void { @@ -4446,6 +4660,7 @@ pub const ByteStream = struct { pub fn onPull(this: *@This(), buffer: []u8, view: JSC.JSValue) StreamResult { JSC.markBinding(@src()); bun.assert(buffer.len > 0); + bun.debugAssert(this.buffer_action == null); if (this.buffer.items.len > 0) { bun.assert(this.value() == .zero); @@ -4511,6 +4726,11 @@ pub const ByteStream = struct { this.pending.result = .{ .done = {} }; this.pending.run(); } + + if (this.buffer_action) |*action| { + action.reject(.{ .AbortReason = .UserAbort }); + this.buffer_action = null; + } } pub fn deinit(this: *@This()) void { @@ -4526,10 +4746,80 @@ pub const ByteStream = struct { this.pending.result = .{ .done = {} }; this.pending.run(); } - + if (this.buffer_action) |*action| { + action.deinit(); + } this.parent().destroy(); } + pub fn drain(this: *@This()) bun.ByteList { + if (this.buffer.items.len > 0) { + const out = bun.ByteList.fromList(this.buffer); + this.buffer = .{ + .allocator = bun.default_allocator, + .items = &.{}, + .capacity = 0, + }; + + return out; + } + + return .{}; + } + + pub fn toAnyBlob(this: *@This()) ?AnyBlob { + if (this.has_received_last_chunk) { + const buffer = this.buffer; + this.buffer = .{ + .allocator = bun.default_allocator, + .items = &.{}, + .capacity = 0, + }; + this.done = true; + this.pending.result.deinit(); + this.pending.result = .{ .done = {} }; + this.parent().is_closed = true; + return AnyBlob{ + .InternalBlob = JSC.WebCore.InternalBlob{ + .bytes = buffer, + .was_string = false, + }, + }; + } + + return null; + } + + pub fn toBufferedValue(this: *@This(), globalThis: *JSC.JSGlobalObject, action: BufferedReadableStreamAction) JSC.JSValue { + if (this.buffer_action != null) { + globalThis.throw("Cannot buffer value twice", .{}); + return .zero; + } + + if (this.pending.result == .err) { + const err, _ = this.pending.result.err.toJSWeak(globalThis); + this.pending.result.deinit(); + this.done = true; + this.buffer.clearAndFree(); + return JSC.JSPromise.rejectedPromiseValue(globalThis, err); + } + + if (this.toAnyBlob()) |blob_| { + var blob = blob_; + return blob.toPromise(globalThis, action); + } + + this.buffer_action = switch (action) { + .blob => .{ .blob = JSC.JSPromise.Strong.init(globalThis) }, + .bytes => .{ .bytes = JSC.JSPromise.Strong.init(globalThis) }, + .arrayBuffer => .{ .arrayBuffer = JSC.JSPromise.Strong.init(globalThis) }, + .json => .{ .json = JSC.JSPromise.Strong.init(globalThis) }, + .text => .{ .text = JSC.JSPromise.Strong.init(globalThis) }, + }; + + return this.buffer_action.?.value(); + } + pub const Source = ReadableStreamSource( @This(), "Bytes", @@ -4538,7 +4828,8 @@ pub const ByteStream = struct { onCancel, deinit, null, - null, + drain, + toBufferedValue, ); }; diff --git a/src/codegen/buildTypeFlag.ts b/src/codegen/buildTypeFlag.ts new file mode 100644 index 0000000000..fa4717f696 --- /dev/null +++ b/src/codegen/buildTypeFlag.ts @@ -0,0 +1,16 @@ +const buildTypeFlag = process.argv.find(argv => { + if (argv.startsWith("--build-type=")) { + return argv; + } +}); + +const enum BuildType { + debug, + release, +} +if (buildTypeFlag) { + process.argv.splice(process.argv.indexOf(buildTypeFlag), 1); +} +let buildType = buildTypeFlag ? BuildType[buildTypeFlag.split("=")[1].toLowerCase()] : BuildType.release; + +export { BuildType, buildType }; diff --git a/src/js/builtins/ReadableStream.ts b/src/js/builtins/ReadableStream.ts index 5e3a1858d9..d34451a0fd 100644 --- a/src/js/builtins/ReadableStream.ts +++ b/src/js/builtins/ReadableStream.ts @@ -114,7 +114,6 @@ export function readableStreamToArray(stream: ReadableStream): Promise { return $readableStreamToTextDirect(stream, underlyingSource); } if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked")); + + const result = $tryUseReadableStreamBufferedFastPath(stream, "text"); + + if (result) { + return result; + } + return $readableStreamIntoText(stream); } @@ -133,19 +139,27 @@ $linkTimeConstant; export function readableStreamToArrayBuffer(stream: ReadableStream): Promise | ArrayBuffer { // this is a direct stream var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource"); - if (underlyingSource !== undefined) { return $readableStreamToArrayBufferDirect(stream, underlyingSource, false); } if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked")); - var result = Bun.readableStreamToArray(stream); + let result = $tryUseReadableStreamBufferedFastPath(stream, "arrayBuffer"); + + if (result) { + return result; + } + + result = Bun.readableStreamToArray(stream); if ($isPromise(result)) { // `result` is an InternalPromise, which doesn't have a `.then` method // but `.then` isn't user-overridable, so we can use it safely. - return result.then(x => Bun.concatArrayBuffers(x)); + return result.then(x => (x.length === 1 && x[0] instanceof ArrayBuffer ? x[0] : Bun.concatArrayBuffers(x))); } + if (result.length === 1) { + return result[0]; + } return Bun.concatArrayBuffers(result); } @@ -159,13 +173,28 @@ export function readableStreamToBytes(stream: ReadableStream): Prom } if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked")); - var result = Bun.readableStreamToArray(stream); + let result = $tryUseReadableStreamBufferedFastPath(stream, "bytes"); + + if (result) { + return result; + } + + result = Bun.readableStreamToArray(stream); if ($isPromise(result)) { // `result` is an InternalPromise, which doesn't have a `.then` method // but `.then` isn't user-overridable, so we can use it safely. - return result.then(x => Bun.concatArrayBuffers(x, Infinity, true)); + return result.then(x => { + // Micro-optimization: if the result is a single Uint8Array chunk, let's just return it without cloning. + if (x.length === 1 && x[0] instanceof ArrayBuffer) { + return new Uint8Array(x[0]); + } + return Bun.concatArrayBuffers(x, Infinity, true); + }); } + if (result.length === 1 && result[0] instanceof ArrayBuffer) { + return new Uint8Array(result[0]); + } return Bun.concatArrayBuffers(result, Infinity, true); } @@ -183,13 +212,21 @@ export function readableStreamToFormData( $linkTimeConstant; export function readableStreamToJSON(stream: ReadableStream): unknown { if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked")); - return Promise.resolve(Bun.readableStreamToText(stream)).then(globalThis.JSON.parse); + + return ( + $tryUseReadableStreamBufferedFastPath(stream, "json") || + Promise.resolve(Bun.readableStreamToText(stream)).then(globalThis.JSON.parse) + ); } $linkTimeConstant; export function readableStreamToBlob(stream: ReadableStream): Promise { if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked")); - return Promise.resolve(Bun.readableStreamToArray(stream)).then(array => new Blob(array)); + + return ( + $tryUseReadableStreamBufferedFastPath(stream, "blob") || + Promise.resolve(Bun.readableStreamToArray(stream)).then(array => new Blob(array)) + ); } $linkTimeConstant; diff --git a/src/js/builtins/ReadableStreamInternals.ts b/src/js/builtins/ReadableStreamInternals.ts index 3dfae9128b..e26d46689d 100644 --- a/src/js/builtins/ReadableStreamInternals.ts +++ b/src/js/builtins/ReadableStreamInternals.ts @@ -924,6 +924,44 @@ export function onReadableStreamDirectControllerClosed(reason) { $throwTypeError("ReadableStreamDirectController is now closed"); } +export function tryUseReadableStreamBufferedFastPath(stream, method) { + // -- Fast path for Blob.prototype.stream(), fetch body streams, and incoming Request body streams -- + const ptr = stream.$bunNativePtr; + if ( + // only available on native streams + ptr && + // don't even attempt it if the stream was used in some way + !$isReadableStreamDisturbed(stream) && + // feature-detect if supported + $isCallable(ptr[method]) + ) { + const promise = ptr[method](); + // if it throws, let it throw without setting $disturbed + stream.$disturbed = true; + + // Clear the lazy load function. + $putByIdDirectPrivate(stream, "start", undefined); + $putByIdDirectPrivate(stream, "reader", {}); + + if (Bun.peek.status(promise) === "fulfilled") { + stream.$reader = undefined; + $readableStreamCloseIfPossible(stream); + return promise; + } + + return promise + .catch(e => { + stream.$reader = undefined; + $readableStreamCancel(stream, e); + return Promise.$reject(e); + }) + .finally(() => { + stream.$reader = undefined; + $readableStreamCloseIfPossible(stream); + }); + } +} + export function onCloseDirectStream(reason) { var stream = this.$controlledReadableStream; if (!stream || $getByIdDirectPrivate(stream, "state") !== $streamReadable) return; @@ -1862,18 +1900,22 @@ export function readableStreamIntoArray(stream) { var manyResult = reader.readMany(); async function processManyResult(result) { - if (result.done) { - return []; - } + let { done, value } = result; + var chunks = value || []; - var chunks = result.value || []; - - while (true) { - var thisResult = await reader.read(); - if (thisResult.done) { - break; + while (!done) { + var thisResult = reader.readMany(); + if ($isPromise(thisResult)) { + thisResult = await thisResult; + } + + ({ done, value = [] } = thisResult); + const length = value.length || 0; + if (length > 1) { + chunks = chunks.concat(value); + } else if (length === 1) { + chunks.push(value[0]); } - chunks = chunks.concat(thisResult.value); } return chunks; diff --git a/test/js/bun/http/body-leak-test-fixture.ts b/test/js/bun/http/body-leak-test-fixture.ts index c1e1a495b9..f9702c54e1 100644 --- a/test/js/bun/http/body-leak-test-fixture.ts +++ b/test/js/bun/http/body-leak-test-fixture.ts @@ -1,7 +1,8 @@ const server = Bun.serve({ port: 0, async fetch(req: Request) { - if (req.url.endsWith("/report")) { + const url = req.url; + if (url.endsWith("/report")) { Bun.gc(true); await Bun.sleep(10); return new Response(JSON.stringify(process.memoryUsage.rss()), { @@ -10,22 +11,25 @@ const server = Bun.serve({ }, }); } - if (req.url.endsWith("/buffering")) { + if (url.endsWith("/buffering")) { await req.text(); - } else if (req.url.endsWith("/streaming")) { - const reader = req.body?.getReader(); + } else if (url.endsWith("/buffering+body-getter")) { + req.body; + await req.text(); + } else if (url.endsWith("/streaming")) { + const reader = req.body.getReader(); while (reader) { const { done, value } = await reader?.read(); if (done) { break; } } - } else if (req.url.endsWith("/incomplete-streaming")) { + } else if (url.endsWith("/incomplete-streaming")) { const reader = req.body?.getReader(); if (!reader) { reader?.read(); } - } else if (req.url.endsWith("/streaming-echo")) { + } else if (url.endsWith("/streaming-echo")) { return new Response(req.body, { headers: { "Content-Type": "application/octet-stream", @@ -36,3 +40,4 @@ const server = Bun.serve({ }, }); console.log(server.url.href); +process?.send?.(server.url.href); diff --git a/test/js/bun/http/bun-serve-static.test.ts b/test/js/bun/http/bun-serve-static.test.ts index 012baef2a5..2f2e96992b 100644 --- a/test/js/bun/http/bun-serve-static.test.ts +++ b/test/js/bun/http/bun-serve-static.test.ts @@ -1,4 +1,4 @@ -import { afterAll, beforeAll, describe, expect, it, mock } from "bun:test"; +import { afterAll, beforeAll, describe, expect, it, mock, test } from "bun:test"; import { fillRepeating, isWindows } from "harness"; const routes = { @@ -11,9 +11,14 @@ const routes = { "/big": new Response( (() => { const buf = Buffer.alloc(1024 * 1024 * 4); + const alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_*^!@#$%^&*()+=?><:;{}[]|\\ \n"; + + function randomAnyCaseLetter() { + return alphabet[(Math.random() * alphabet.length) | 0]; + } for (let i = 0; i < 1024; i++) { - buf[i] = (Math.random() * 256) | 0; + buf[i] = randomAnyCaseLetter(); } fillRepeating(buf, 0, 1024); return buf; @@ -98,54 +103,69 @@ describe("static", () => { expect(handler.mock.calls.length, "Handler should not be called").toBe(previousCallCount); }); - it( - "stress", - async () => { - const bytes = await static_responses[path].arrayBuffer(); - // macOS limits backlog to 128. - // When we do the big request, reduce number of connections but increase number of iterations - const batchSize = Math.ceil((bytes.size > 1024 * 1024 ? 48 : 64) / (isWindows ? 8 : 1)); - const iterations = Math.ceil((bytes.size > 1024 * 1024 ? 10 : 12) / (isWindows ? 8 : 1)); + describe.each(["access .body", "don't access .body"])("stress (%s)", label => { + test.each(["arrayBuffer", "blob", "bytes", "text"])( + "%s", + async method => { + const byteSize = static_responses[path][method]?.size; - async function iterate() { - let array = new Array(batchSize); - const route = `${server.url}${path.substring(1)}`; - for (let i = 0; i < batchSize; i++) { - array[i] = fetch(route) - .then(res => { - expect(res.status).toBe(200); + const bytes = method === "blob" ? static_responses[path] : await static_responses[path][method](); - expect(res.url).toBe(route); - return res.arrayBuffer(); - }) - .then(output => { - expect(output).toStrictEqual(bytes); - }); + // macOS limits backlog to 128. + // When we do the big request, reduce number of connections but increase number of iterations + const batchSize = Math.ceil((byteSize > 1024 * 1024 ? 48 : 64) / (isWindows ? 8 : 1)); + const iterations = Math.ceil((byteSize > 1024 * 1024 ? 10 : 12) / (isWindows ? 8 : 1)); + + async function iterate() { + let array = new Array(batchSize); + const route = `${server.url}${path.substring(1)}`; + for (let i = 0; i < batchSize; i++) { + array[i] = fetch(route) + .then(res => { + expect(res.status).toBe(200); + expect(res.url).toBe(route); + if (label === "access .body") { + res.body; + } + return res[method](); + }) + .then(output => { + expect(output).toStrictEqual(bytes); + }); + } + + await Promise.all(array); + + Bun.gc(); } - await Promise.all(array); - console.count("Iteration: " + path); - Bun.gc(); - } + for (let i = 0; i < iterations; i++) { + await iterate(); + } - for (let i = 0; i < iterations; i++) { - await iterate(); - } + Bun.gc(true); + const baseline = (process.memoryUsage.rss() / 1024 / 1024) | 0; + let lastRSS = baseline; + console.log("Start RSS", baseline); + for (let i = 0; i < iterations; i++) { + await iterate(); + const rss = (process.memoryUsage.rss() / 1024 / 1024) | 0; + if (lastRSS + 50 < rss) { + console.log("RSS Growth", rss - lastRSS); + } + lastRSS = rss; + } + Bun.gc(true); - Bun.gc(true); - const baseline = (process.memoryUsage.rss() / 1024 / 1024) | 0; - console.log("Baseline RSS", baseline); - for (let i = 0; i < iterations; i++) { - await iterate(); - console.log("RSS", (process.memoryUsage.rss() / 1024 / 1024) | 0); - } - Bun.gc(true); - - const rss = (process.memoryUsage.rss() / 1024 / 1024) | 0; - expect(rss).toBeLessThan(baseline * 4); - }, - 30 * 1000, - ); + const rss = (process.memoryUsage.rss() / 1024 / 1024) | 0; + expect(rss).toBeLessThan(4092); + const delta = rss - baseline; + console.log("Final RSS", rss); + console.log("Delta RSS", delta); + }, + 40 * 1000, + ); + }); }); it("/redirect", async () => { diff --git a/test/js/bun/http/serve-body-leak.test.ts b/test/js/bun/http/serve-body-leak.test.ts index 64eab0bfb9..5c2b7bad65 100644 --- a/test/js/bun/http/serve-body-leak.test.ts +++ b/test/js/bun/http/serve-body-leak.test.ts @@ -1,5 +1,5 @@ import type { Subprocess } from "bun"; -import { afterAll, beforeAll, expect, it } from "bun:test"; +import { afterEach, beforeEach, expect, it } from "bun:test"; import { bunEnv, bunExe, isDebug } from "harness"; import { join } from "path"; @@ -10,20 +10,26 @@ const zeroCopyPayload = new Blob([payload]); let url: URL; let process: Subprocess<"ignore", "pipe", "inherit"> | null = null; -beforeAll(async () => { +beforeEach(async () => { + if (process) { + process?.kill(); + } + + let defer = Promise.withResolvers(); process = Bun.spawn([bunExe(), "--smol", join(import.meta.dirname, "body-leak-test-fixture.ts")], { env: bunEnv, - stdout: "pipe", + stdout: "inherit", stderr: "inherit", stdin: "ignore", + ipc(message) { + defer.resolve(message); + }, }); - const { value } = await process.stdout.getReader().read(); - url = new URL(new TextDecoder().decode(value)); + url = new URL(await defer.promise); process.unref(); - await warmup(); }); -afterAll(() => { +afterEach(() => { process?.kill(); }); @@ -57,6 +63,14 @@ async function callBuffering() { }).then(res => res.text()); expect(result).toBe("Ok"); } + +async function callBufferingBodyGetter() { + const result = await fetch(`${url.origin}/buffering+body-getter`, { + method: "POST", + body: zeroCopyPayload, + }).then(res => res.text()); + expect(result).toBe("Ok"); +} async function callStreaming() { const result = await fetch(`${url.origin}/streaming`, { method: "POST", @@ -128,6 +142,7 @@ async function calculateMemoryLeak(fn: () => Promise) { for (const test_info of [ ["#10265 should not leak memory when ignoring the body", callIgnore, false, 64], ["should not leak memory when buffering the body", callBuffering, false, 64], + ["should not leak memory when buffering the body and accessing req.body", callBufferingBodyGetter, false, 64], ["should not leak memory when streaming the body", callStreaming, false, 64], ["should not leak memory when streaming the body incompletely", callIncompleteStreaming, false, 64], ["should not leak memory when streaming the body and echoing it back", callStreamingEcho, false, 64], @@ -138,7 +153,7 @@ for (const test_info of [ async () => { const report = await calculateMemoryLeak(fn); // peak memory is too high - expect(report.peak_memory > report.start_memory * 2).toBe(false); + expect(report.peak_memory).not.toBeGreaterThan(report.start_memory * 2.5); // acceptable memory leak expect(report.leak).toBeLessThanOrEqual(maxMemoryGrowth); expect(report.end_memory).toBeLessThanOrEqual(512 * 1024 * 1024); diff --git a/test/js/web/fetch/body-stream.test.ts b/test/js/web/fetch/body-stream.test.ts index 5d06ba4eb2..0fd6efe9d9 100644 --- a/test/js/web/fetch/body-stream.test.ts +++ b/test/js/web/fetch/body-stream.test.ts @@ -45,124 +45,142 @@ var port = 0; for (let RequestPrototypeMixin of BodyMixin) { for (let useRequestObject of useRequestObjectValues) { - describe(`Request.prototoype.${RequestPrototypeMixin.name}() ${ - useRequestObject ? "fetch(req)" : "fetch(url)" - }`, () => { - const inputFixture = [ - [JSON.stringify("Hello World"), JSON.stringify("Hello World")], - [JSON.stringify("Hello World 123"), Buffer.from(JSON.stringify("Hello World 123")).buffer], - [JSON.stringify("Hello World 456"), Buffer.from(JSON.stringify("Hello World 456"))], - [ - JSON.stringify("EXTREMELY LONG VERY LONG STRING WOW SO LONG YOU WONT BELIEVE IT! ".repeat(100)), - Buffer.from( + // When you do req.body + // We go through Bun.readableStreamTo${Method}(stream) + for (let forceReadableStreamConversionFastPath of [true, false]) { + describe(`Request.prototoype.${RequestPrototypeMixin.name}() ${ + useRequestObject + ? "fetch(req)" + : "fetch(url)" + (forceReadableStreamConversionFastPath ? " (force fast ReadableStream conversion)" : "") + }`, () => { + const inputFixture = [ + [JSON.stringify("Hello World"), JSON.stringify("Hello World")], + [JSON.stringify("Hello World 123"), Buffer.from(JSON.stringify("Hello World 123")).buffer], + [JSON.stringify("Hello World 456"), Buffer.from(JSON.stringify("Hello World 456"))], + [ JSON.stringify("EXTREMELY LONG VERY LONG STRING WOW SO LONG YOU WONT BELIEVE IT! ".repeat(100)), - ), - ], - [ - JSON.stringify("EXTREMELY LONG 🔥 UTF16 🔥 VERY LONG STRING WOW SO LONG YOU WONT BELIEVE IT! ".repeat(100)), - Buffer.from( + Buffer.from( + JSON.stringify("EXTREMELY LONG VERY LONG STRING WOW SO LONG YOU WONT BELIEVE IT! ".repeat(100)), + ), + ], + [ JSON.stringify( "EXTREMELY LONG 🔥 UTF16 🔥 VERY LONG STRING WOW SO LONG YOU WONT BELIEVE IT! ".repeat(100), ), - ), - ], - ]; + Buffer.from( + JSON.stringify( + "EXTREMELY LONG 🔥 UTF16 🔥 VERY LONG STRING WOW SO LONG YOU WONT BELIEVE IT! ".repeat(100), + ), + ), + ], + ]; - for (const [name, input] of inputFixture) { - test(`${name.slice(0, Math.min(name.length ?? name.byteLength, 64))}`, async () => { - await runInServer( - { - async fetch(req) { - var result = await RequestPrototypeMixin.call(req); - if (RequestPrototypeMixin === Request.prototype.json) { - result = JSON.stringify(result); - } - if (typeof result === "string") { - expect(result.length).toBe(name.length); - expect(result).toBe(name); - } else if (result && result instanceof Blob) { - expect(result.size).toBe(new TextEncoder().encode(name).byteLength); - expect(await result.text()).toBe(name); - } else { - expect(result.byteLength).toBe(Buffer.from(input).byteLength); - expect(Bun.SHA1.hash(result, "base64")).toBe(Bun.SHA1.hash(input, "base64")); - } - return new Response(result, { - headers: req.headers, - }); + for (const [name, input] of inputFixture) { + test(`${name.slice(0, Math.min(name.length ?? name.byteLength, 64))}`, async () => { + await runInServer( + { + async fetch(req) { + if (forceReadableStreamConversionFastPath) { + req.body; + } + var result = await RequestPrototypeMixin.call(req); + if (RequestPrototypeMixin === Request.prototype.json) { + result = JSON.stringify(result); + } + if (typeof result === "string") { + expect(result.length).toBe(name.length); + expect(result).toBe(name); + } else if (result && result instanceof Blob) { + expect(result.size).toBe(new TextEncoder().encode(name).byteLength); + expect(await result.text()).toBe(name); + } else { + expect(result.byteLength).toBe(Buffer.from(input).byteLength); + expect(Bun.SHA1.hash(result, "base64")).toBe(Bun.SHA1.hash(input, "base64")); + } + return new Response(result, { + headers: req.headers, + }); + }, }, - }, - async url => { - var response; + async url => { + var response; - // once, then batch of 5 + // once, then batch of 5 - if (useRequestObject) { - response = await fetch( - new Request({ - body: input, - method: "POST", - url: url, - headers: { - "content-type": "text/plain", - }, - }), - ); - } else { - response = await fetch(url, { - body: input, - method: "POST", - headers: { - "content-type": "text/plain", - }, - }); - } - - expect(response.status).toBe(200); - expect(response.headers.get("content-length")).toBe(String(Buffer.from(input).byteLength)); - expect(response.headers.get("content-type")).toBe("text/plain"); - expect(await response.text()).toBe(name); - - var promises = new Array(5); - for (let i = 0; i < 5; i++) { if (useRequestObject) { - promises[i] = await fetch( + response = await fetch( new Request({ body: input, method: "POST", url: url, headers: { "content-type": "text/plain", - "x-counter": i, }, }), ); } else { - promises[i] = await fetch(url, { + response = await fetch(url, { body: input, method: "POST", headers: { "content-type": "text/plain", - "x-counter": i, }, }); } - } - const results = await Promise.all(promises); - for (let i = 0; i < 5; i++) { - const response = results[i]; + if (forceReadableStreamConversionFastPath) { + response.body; + } + expect(response.status).toBe(200); expect(response.headers.get("content-length")).toBe(String(Buffer.from(input).byteLength)); expect(response.headers.get("content-type")).toBe("text/plain"); - expect(response.headers.get("x-counter")).toBe(String(i)); expect(await response.text()).toBe(name); - } - }, - ); - }); - } - }); + + var promises = new Array(5); + for (let i = 0; i < 5; i++) { + if (useRequestObject) { + promises[i] = await fetch( + new Request({ + body: input, + method: "POST", + url: url, + headers: { + "content-type": "text/plain", + "x-counter": i, + }, + }), + ); + } else { + promises[i] = await fetch(url, { + body: input, + method: "POST", + headers: { + "content-type": "text/plain", + "x-counter": i, + }, + }); + } + } + + const results = await Promise.all(promises); + for (let i = 0; i < 5; i++) { + const response = results[i]; + if (forceReadableStreamConversionFastPath) { + response.body; + } + expect(response.status).toBe(200); + expect(response.headers.get("content-length")).toBe(String(Buffer.from(input).byteLength)); + expect(response.headers.get("content-type")).toBe("text/plain"); + expect(response.headers.get("x-counter")).toBe(String(i)); + expect(await response.text()).toBe(name); + } + }, + ); + }); + } + }); + } } } } @@ -226,276 +244,292 @@ function gc() { } describe("reader", function () { - for (let withDelay of [false, true]) { - try { - // - 1 byte - // - less than the InlineBlob limit - // - multiple chunks - // - backpressure + for (let forceReadableStreamConversionFastPath of [true, false]) { + for (let withDelay of [false, true]) { + try { + // - 1 byte + // - less than the InlineBlob limit + // - multiple chunks + // - backpressure - for (let inputLength of [1, 2, 12, 95, 1024, 1024 * 1024, 1024 * 1024 * 2]) { - var bytes = new Uint8Array(inputLength); - { - const chunk = Math.min(bytes.length, 256); - for (var i = 0; i < chunk; i++) { - bytes[i] = 255 - i; + for (let inputLength of [1, 2, 12, 95, 1024, 1024 * 1024, 1024 * 1024 * 2]) { + var bytes = new Uint8Array(inputLength); + { + const chunk = Math.min(bytes.length, 256); + for (var i = 0; i < chunk; i++) { + bytes[i] = 255 - i; + } } - } - if (bytes.length > 255) fillRepeating(bytes, 0, bytes.length); + if (bytes.length > 255) fillRepeating(bytes, 0, bytes.length); - for (const huge_ of [ - bytes, - bytes.buffer, - new DataView(bytes.buffer), - new Int8Array(bytes), - new Blob([bytes]), + for (const huge_ of [ + bytes, + bytes.buffer, + new DataView(bytes.buffer), + new Int8Array(bytes), + new Blob([bytes]), - new Uint16Array(bytes), - new Uint32Array(bytes), - new Float64Array(bytes), + new Uint16Array(bytes), + new Uint32Array(bytes), + new Float64Array(bytes), - new Int16Array(bytes), - new Int32Array(bytes), - new Float16Array(bytes), - new Float32Array(bytes), + new Int16Array(bytes), + new Int32Array(bytes), + new Float16Array(bytes), + new Float32Array(bytes), - // make sure we handle subarray() as expected when reading - // typed arrays from native code - new Int16Array(bytes).subarray(1), - new Int16Array(bytes).subarray(0, new Int16Array(bytes).byteLength - 1), - new Int32Array(bytes).subarray(1), - new Int32Array(bytes).subarray(0, new Int32Array(bytes).byteLength - 1), - new Float16Array(bytes).subarray(1), - new Float16Array(bytes).subarray(0, new Float16Array(bytes).byteLength - 1), - new Float32Array(bytes).subarray(1), - new Float32Array(bytes).subarray(0, new Float32Array(bytes).byteLength - 1), - new Int16Array(bytes).subarray(0, 1), - new Int32Array(bytes).subarray(0, 1), - new Float16Array(bytes).subarray(0, 1), - new Float32Array(bytes).subarray(0, 1), - ]) { - gc(); - const thisArray = huge_; - if (Number(thisArray.byteLength ?? thisArray.size) === 0) continue; + // make sure we handle subarray() as expected when reading + // typed arrays from native code + new Int16Array(bytes).subarray(1), + new Int16Array(bytes).subarray(0, new Int16Array(bytes).byteLength - 1), + new Int32Array(bytes).subarray(1), + new Int32Array(bytes).subarray(0, new Int32Array(bytes).byteLength - 1), + new Float16Array(bytes).subarray(1), + new Float16Array(bytes).subarray(0, new Float16Array(bytes).byteLength - 1), + new Float32Array(bytes).subarray(1), + new Float32Array(bytes).subarray(0, new Float32Array(bytes).byteLength - 1), + new Int16Array(bytes).subarray(0, 1), + new Int32Array(bytes).subarray(0, 1), + new Float16Array(bytes).subarray(0, 1), + new Float32Array(bytes).subarray(0, 1), + ]) { + gc(); + const thisArray = huge_; + if (Number(thisArray.byteLength ?? thisArray.size) === 0) continue; - it( - `works with ${thisArray.constructor.name}(${ - thisArray.byteLength ?? thisArray.size - }:${inputLength}) via req.body.getReader() in chunks` + (withDelay ? " with delay" : ""), - async () => { - var huge = thisArray; - var called = false; - gc(); + it( + `works with ${thisArray.constructor.name}(${ + thisArray.byteLength ?? thisArray.size + }:${inputLength}) via req.body.getReader() in chunks` + + (withDelay ? " with delay" : "") + + (forceReadableStreamConversionFastPath ? " (force ReadableStream conversion)" : ""), + async () => { + var huge = thisArray; + var called = false; + gc(); - const expectedHash = - huge instanceof Blob ? Bun.SHA1.hash(await huge.bytes(), "base64") : Bun.SHA1.hash(huge, "base64"); - const expectedSize = huge instanceof Blob ? huge.size : huge.byteLength; + const expectedHash = + huge instanceof Blob ? Bun.SHA1.hash(await huge.bytes(), "base64") : Bun.SHA1.hash(huge, "base64"); + const expectedSize = huge instanceof Blob ? huge.size : huge.byteLength; - const out = await runInServer( - { - async fetch(req) { - try { - if (withDelay) await 1; + const out = await runInServer( + { + async fetch(req) { + try { + if (withDelay) await 1; - expect(req.headers.get("x-custom")).toBe("hello"); - expect(req.headers.get("content-type")).toBe("text/plain"); - expect(req.headers.get("user-agent")).toBe(navigator.userAgent); + expect(req.headers.get("x-custom")).toBe("hello"); + expect(req.headers.get("content-type")).toBe("text/plain"); + expect(req.headers.get("user-agent")).toBe(navigator.userAgent); - gc(); - expect(req.headers.get("x-custom")).toBe("hello"); - expect(req.headers.get("content-type")).toBe("text/plain"); - expect(req.headers.get("user-agent")).toBe(navigator.userAgent); + gc(); + expect(req.headers.get("x-custom")).toBe("hello"); + expect(req.headers.get("content-type")).toBe("text/plain"); + expect(req.headers.get("user-agent")).toBe(navigator.userAgent); - var reader = req.body.getReader(); - called = true; - var buffers = []; - while (true) { - var { done, value } = await reader.read(); - if (done) break; - buffers.push(value); + var reader = req.body.getReader(); + called = true; + var buffers = []; + while (true) { + var { done, value } = await reader.read(); + if (done) break; + buffers.push(value); + } + const out = new Blob(buffers); + gc(); + expect(out.size).toBe(expectedSize); + expect(Bun.SHA1.hash(await out.arrayBuffer(), "base64")).toBe(expectedHash); + expect(req.headers.get("x-custom")).toBe("hello"); + expect(req.headers.get("content-type")).toBe("text/plain"); + expect(req.headers.get("user-agent")).toBe(navigator.userAgent); + gc(); + return new Response(out, { + headers: req.headers, + }); + } catch (e) { + console.error(e); + throw e; } - const out = new Blob(buffers); - gc(); - expect(out.size).toBe(expectedSize); - expect(Bun.SHA1.hash(await out.arrayBuffer(), "base64")).toBe(expectedHash); - expect(req.headers.get("x-custom")).toBe("hello"); - expect(req.headers.get("content-type")).toBe("text/plain"); - expect(req.headers.get("user-agent")).toBe(navigator.userAgent); - gc(); - return new Response(out, { - headers: req.headers, - }); - } catch (e) { - console.error(e); - throw e; - } - }, - }, - async url => { - gc(); - if (withDelay) await 1; - const pendingResponse = await fetch(url, { - body: huge, - method: "POST", - headers: { - "content-type": "text/plain", - "x-custom": "hello", - "x-typed-array": thisArray.constructor.name, }, - }); - if (withDelay) { - await 1; - } - const response = await pendingResponse; - huge = undefined; - expect(response.status).toBe(200); - const response_body = await response.bytes(); + }, + async url => { + gc(); + if (withDelay) await 1; + const pendingResponse = await fetch(url, { + body: huge, + method: "POST", + headers: { + "content-type": "text/plain", + "x-custom": "hello", + "x-typed-array": thisArray.constructor.name, + }, + }); + if (withDelay) { + await 1; + } + const response = await pendingResponse; + if (forceReadableStreamConversionFastPath) { + response.body; + } + huge = undefined; + expect(response.status).toBe(200); + const response_body = await response.bytes(); - expect(response_body.byteLength).toBe(expectedSize); - expect(Bun.SHA1.hash(response_body, "base64")).toBe(expectedHash); + expect(response_body.byteLength).toBe(expectedSize); + expect(Bun.SHA1.hash(response_body, "base64")).toBe(expectedHash); - gc(); - expect(response.headers.get("content-type")).toBe("text/plain"); - gc(); - }, - ); - expect(called).toBe(true); - gc(); - return out; - }, - ); + gc(); + expect(response.headers.get("content-type")).toBe("text/plain"); + gc(); + }, + ); + expect(called).toBe(true); + gc(); + return out; + }, + ); - for (let isDirectStream of [true, false]) { - const positions = ["begin", "end"]; - const inner = thisArray => { - for (let position of positions) { - it(`streaming back ${thisArray.constructor.name}(${ - thisArray.byteLength ?? thisArray.size - }:${inputLength}) starting request.body.getReader() at ${position}`, async () => { - var huge = thisArray; - var called = false; - gc(); + for (let isDirectStream of [true, false]) { + const positions = ["begin", "end"]; + const inner = thisArray => { + for (let position of positions) { + it( + `streaming back ${thisArray.constructor.name}(${ + thisArray.byteLength ?? thisArray.size + }:${inputLength}) starting request.body.getReader() at ${position}` + + (forceReadableStreamConversionFastPath ? " (force ReadableStream conversion)" : ""), + async () => { + var huge = thisArray; + var called = false; + gc(); - const expectedHash = - huge instanceof Blob ? Bun.SHA1.hash(await huge.bytes(), "base64") : Bun.SHA1.hash(huge, "base64"); - const expectedSize = huge instanceof Blob ? huge.size : huge.byteLength; + const expectedHash = + huge instanceof Blob + ? Bun.SHA1.hash(await huge.bytes(), "base64") + : Bun.SHA1.hash(huge, "base64"); + const expectedSize = huge instanceof Blob ? huge.size : huge.byteLength; - const out = await runInServer( - { - async fetch(req) { - try { - var reader; + const out = await runInServer( + { + async fetch(req) { + try { + var reader; - if (withDelay) await 1; + if (withDelay) await 1; - if (position === "begin") { - reader = req.body.getReader(); + if (position === "begin") { + reader = req.body.getReader(); + } + + if (position === "end") { + await 1; + reader = req.body.getReader(); + } + + expect(req.headers.get("x-custom")).toBe("hello"); + expect(req.headers.get("content-type")).toBe("text/plain"); + expect(req.headers.get("user-agent")).toBe(navigator.userAgent); + + gc(); + expect(req.headers.get("x-custom")).toBe("hello"); + expect(req.headers.get("content-type")).toBe("text/plain"); + expect(req.headers.get("user-agent")).toBe(navigator.userAgent); + + const direct = { + type: "direct", + async pull(controller) { + if (withDelay) await 1; + + while (true) { + const { done, value } = await reader.read(); + if (done) { + called = true; + controller.end(); + + return; + } + controller.write(value); + } + }, + }; + + const web = { + async start() { + if (withDelay) await 1; + }, + async pull(controller) { + while (true) { + const { done, value } = await reader.read(); + if (done) { + called = true; + controller.close(); + return; + } + controller.enqueue(value); + } + }, + }; + + return new Response(new ReadableStream(isDirectStream ? direct : web), { + headers: req.headers, + }); + } catch (e) { + console.error(e); + throw e; + } + }, + }, + async url => { + gc(); + const response = await fetch(url, { + body: huge, + method: "POST", + headers: { + "content-type": "text/plain", + "x-custom": "hello", + "x-typed-array": thisArray.constructor.name, + }, + }); + huge = undefined; + expect(response.status).toBe(200); + if (forceReadableStreamConversionFastPath) { + response.body; } + const response_body = await response.bytes(); - if (position === "end") { - await 1; - reader = req.body.getReader(); - } - - expect(req.headers.get("x-custom")).toBe("hello"); - expect(req.headers.get("content-type")).toBe("text/plain"); - expect(req.headers.get("user-agent")).toBe(navigator.userAgent); + expect(response_body.byteLength).toBe(expectedSize); + expect(Bun.SHA1.hash(response_body, "base64")).toBe(expectedHash); gc(); - expect(req.headers.get("x-custom")).toBe("hello"); - expect(req.headers.get("content-type")).toBe("text/plain"); - expect(req.headers.get("user-agent")).toBe(navigator.userAgent); + if (!response.headers.has("content-type")) { + console.error(Object.fromEntries(response.headers.entries())); + } - const direct = { - type: "direct", - async pull(controller) { - if (withDelay) await 1; - - while (true) { - const { done, value } = await reader.read(); - if (done) { - called = true; - controller.end(); - - return; - } - controller.write(value); - } - }, - }; - - const web = { - async start() { - if (withDelay) await 1; - }, - async pull(controller) { - while (true) { - const { done, value } = await reader.read(); - if (done) { - called = true; - controller.close(); - return; - } - controller.enqueue(value); - } - }, - }; - - return new Response(new ReadableStream(isDirectStream ? direct : web), { - headers: req.headers, - }); - } catch (e) { - console.error(e); - throw e; - } - }, - }, - async url => { - gc(); - const response = await fetch(url, { - body: huge, - method: "POST", - headers: { - "content-type": "text/plain", - "x-custom": "hello", - "x-typed-array": thisArray.constructor.name, + expect(response.headers.get("content-type")).toBe("text/plain"); + gc(); }, - }); - huge = undefined; - expect(response.status).toBe(200); - const response_body = await response.bytes(); - - expect(response_body.byteLength).toBe(expectedSize); - expect(Bun.SHA1.hash(response_body, "base64")).toBe(expectedHash); - - gc(); - if (!response.headers.has("content-type")) { - console.error(Object.fromEntries(response.headers.entries())); - } - - expect(response.headers.get("content-type")).toBe("text/plain"); + ); + expect(called).toBe(true); gc(); + return out; }, ); - expect(called).toBe(true); - gc(); - return out; - }); - } - }; + } + }; - if (isDirectStream) { - describe(" direct stream", () => inner(thisArray)); - } else { - describe("default stream", () => inner(thisArray)); + if (isDirectStream) { + describe(" direct stream", () => inner(thisArray)); + } else { + describe("default stream", () => inner(thisArray)); + } } } } + } catch (e) { + console.error(e); + throw e; } - } catch (e) { - console.error(e); - throw e; } } }); diff --git a/test/js/web/fetch/fetch-leak-test-fixture-2.js b/test/js/web/fetch/fetch-leak-test-fixture-2.js index 3fcc6de3dd..b3b14b0774 100644 --- a/test/js/web/fetch/fetch-leak-test-fixture-2.js +++ b/test/js/web/fetch/fetch-leak-test-fixture-2.js @@ -36,7 +36,12 @@ for (let j = 0; j < COUNT; j++) { cert: "-----BEGIN CERTIFICATE-----\nMIIDXTCCAkWgAwIBAgIJAKLdQVPy90jjMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV\nBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX\naWRnaXRzIFB0eSBMdGQwHhcNMTkwMjAzMTQ0OTM1WhcNMjAwMjAzMTQ0OTM1WjBF\nMQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50\nZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB\nCgKCAQEA7i7IIEdICTiSTVx+ma6xHxOtcbd6wGW3nkxlCkJ1UuV8NmY5ovMsGnGD\nhJJtUQ2j5ig5BcJUf3tezqCNW4tKnSOgSISfEAKvpn2BPvaFq3yx2Yjz0ruvcGKp\nDMZBXmB/AAtGyN/UFXzkrcfppmLHJTaBYGG6KnmU43gPkSDy4iw46CJFUOupc51A\nFIz7RsE7mbT1plCM8e75gfqaZSn2k+Wmy+8n1HGyYHhVISRVvPqkS7gVLSVEdTea\nUtKP1Vx/818/HDWk3oIvDVWI9CFH73elNxBkMH5zArSNIBTehdnehyAevjY4RaC/\nkK8rslO3e4EtJ9SnA4swOjCiqAIQEwIDAQABo1AwTjAdBgNVHQ4EFgQUv5rc9Smm\n9c4YnNf3hR49t4rH4yswHwYDVR0jBBgwFoAUv5rc9Smm9c4YnNf3hR49t4rH4ysw\nDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEATcL9CAAXg0u//eYUAlQa\nL+l8yKHS1rsq1sdmx7pvsmfZ2g8ONQGfSF3TkzkI2OOnCBokeqAYuyT8awfdNUtE\nEHOihv4ZzhK2YZVuy0fHX2d4cCFeQpdxno7aN6B37qtsLIRZxkD8PU60Dfu9ea5F\nDDynnD0TUabna6a0iGn77yD8GPhjaJMOz3gMYjQFqsKL252isDVHEDbpVxIzxPmN\nw1+WK8zRNdunAcHikeoKCuAPvlZ83gDQHp07dYdbuZvHwGj0nfxBLc9qt90XsBtC\n4IYR7c/bcLMmKXYf0qoQ4OzngsnPI5M+v9QEHvYWaKVwFY4CTcSNJEwfXw+BAeO5\nOA==\n-----END CERTIFICATE-----", } : null; - oks += !!(await (await fetch(SERVER, { tls })).arrayBuffer())?.byteLength; + oks += !!( + await fetch(SERVER, { tls }).then(r => { + r.body; + return r.arrayBuffer(); + }) + )?.byteLength; })(); } diff --git a/test/js/web/fetch/fetch.stream.test.ts b/test/js/web/fetch/fetch.stream.test.ts index 407d3d8951..21a72ede53 100644 --- a/test/js/web/fetch/fetch.stream.test.ts +++ b/test/js/web/fetch/fetch.stream.test.ts @@ -915,25 +915,44 @@ describe("fetch() with streaming", () => { test(`Content-Length response works (multiple parts) with ${compression} compression`, async () => { { const content = "a".repeat(64 * 1024); + var onReceivedHeaders = Promise.withResolvers(); using server = Bun.serve({ port: 0, - fetch(req) { - return new Response(compress(compression, Buffer.from(content)), { - status: 200, - headers: { - "Content-Type": "text/plain", - ...headers, + async fetch(req) { + const data = compress(compression, Buffer.from(content)); + return new Response( + new ReadableStream({ + async pull(controller) { + const firstChunk = data.slice(0, 64); + const secondChunk = data.slice(firstChunk.length); + controller.enqueue(firstChunk); + await onReceivedHeaders.promise; + await Bun.sleep(1); + controller.enqueue(secondChunk); + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "text/plain", + ...headers, + }, }, - }); + ); }, }); let res = await fetch(`http://${server.hostname}:${server.port}`, {}); + onReceivedHeaders.resolve(); + onReceivedHeaders = Promise.withResolvers(); gcTick(false); const result = await res.text(); gcTick(false); expect(result).toBe(content); res = await fetch(`http://${server.hostname}:${server.port}`, {}); + onReceivedHeaders.resolve(); + onReceivedHeaders = Promise.withResolvers(); gcTick(false); const reader = res.body?.getReader(); diff --git a/test/js/web/fetch/fetch.test.ts b/test/js/web/fetch/fetch.test.ts index c85a5a2556..37818cd03c 100644 --- a/test/js/web/fetch/fetch.test.ts +++ b/test/js/web/fetch/fetch.test.ts @@ -1170,7 +1170,11 @@ describe("Response", () => { describe("should consume body correctly", async () => { it("with text first", async () => { var response = new Response("
hello
"); - expect(await response.text()).toBe("
hello
"); + expect(response.bodyUsed).toBe(false); + const promise = response.text(); + expect(response.bodyUsed).toBe(true); + expect(await promise).toBe("
hello
"); + expect(response.bodyUsed).toBe(true); expect(async () => { await response.text(); }).toThrow("Body already used"); @@ -1189,7 +1193,11 @@ describe("Response", () => { }); it("with json first", async () => { var response = new Response('{ "hello": "world" }'); - expect(await response.json()).toEqual({ "hello": "world" }); + expect(response.bodyUsed).toBe(false); + const promise = response.json(); + expect(response.bodyUsed).toBe(true); + expect(await promise).toEqual({ "hello": "world" }); + expect(response.bodyUsed).toBe(true); expect(async () => { await response.json(); }).toThrow("Body already used"); @@ -1212,7 +1220,11 @@ describe("Response", () => { "content-type": "multipart/form-data;boundary=boundary", }, }); - expect(await response.formData()).toBeInstanceOf(FormData); + expect(response.bodyUsed).toBe(false); + const promise = response.formData(); + expect(response.bodyUsed).toBe(true); + expect(await promise).toBeInstanceOf(FormData); + expect(response.bodyUsed).toBe(true); expect(async () => { await response.formData(); }).toThrow("Body already used"); @@ -1231,15 +1243,17 @@ describe("Response", () => { }); it("with blob first", async () => { var response = new Response("
hello
"); - expect(response.body instanceof ReadableStream).toBe(true); - expect(response.headers instanceof Headers).toBe(true); - expect(response.type).toBe("default"); - var blob = await response.blob(); - expect(blob).toBeInstanceOf(Blob); - expect(blob.stream()).toBeInstanceOf(ReadableStream); + expect(response.bodyUsed).toBe(false); + const promise = response.blob(); + expect(response.bodyUsed).toBe(true); + expect(await promise).toBeInstanceOf(Blob); + expect(response.bodyUsed).toBe(true); expect(async () => { await response.blob(); }).toThrow("Body already used"); + expect(async () => { + await response.bytes(); + }).toThrow("Body already used"); expect(async () => { await response.text(); }).toThrow("Body already used"); @@ -1255,7 +1269,11 @@ describe("Response", () => { }); it("with arrayBuffer first", async () => { var response = new Response("
hello
"); - expect(await response.arrayBuffer()).toBeInstanceOf(ArrayBuffer); + expect(response.bodyUsed).toBe(false); + const promise = response.arrayBuffer(); + expect(response.bodyUsed).toBe(true); + expect(await promise).toBeInstanceOf(ArrayBuffer); + expect(response.bodyUsed).toBe(true); expect(async () => { await response.arrayBuffer(); }).toThrow("Body already used"); diff --git a/test/js/web/fetch/stream-fast-path.test.ts b/test/js/web/fetch/stream-fast-path.test.ts new file mode 100644 index 0000000000..7e856a6ef2 --- /dev/null +++ b/test/js/web/fetch/stream-fast-path.test.ts @@ -0,0 +1,55 @@ +import { + readableStreamToArrayBuffer, + readableStreamToBlob, + readableStreamToBytes, + readableStreamToJSON, + readableStreamToText, +} from "bun"; +import { describe, expect, test } from "bun:test"; + +describe("ByteBlobLoader", () => { + const blobs = [ + ["Empty", new Blob()], + ["Hello, world!", new Blob(["Hello, world!"], { type: "text/plain" })] as const, + ["Bytes", new Blob([new Uint8Array([0x00, 0x01, 0x02, 0x03])], { type: "application/octet-stream" })] as const, + [ + "Mixed", + new Blob(["Hello, world!", new Uint8Array([0x00, 0x01, 0x02, 0x03])], { type: "multipart/mixed" }), + ] as const, + ] as const; + + describe.each([ + ["arrayBuffer", readableStreamToArrayBuffer] as const, + ["bytes", readableStreamToBytes] as const, + ["text", readableStreamToText] as const, + ["blob", readableStreamToBlob] as const, + ] as const)(`%s`, (name, fn) => { + describe.each(blobs)(`%s`, (label, blob) => { + test("works", async () => { + const stream = blob.stream(); + const result = fn(stream); + console.log(Promise, result); + expect(result.then).toBeFunction(); + const awaited = await result; + expect(awaited).toEqual(await new Response(blob)[name]()); + }); + }); + }); + + test("json", async () => { + const blob = new Blob(['"Hello, world!"'], { type: "application/json" }); + const stream = blob.stream(); + const result = readableStreamToJSON(stream); + expect(result.then).toBeFunction(); + const awaited = await result; + expect(awaited).toStrictEqual(await new Response(blob).json()); + }); + + test("returns a rejected Promise for invalid JSON", async () => { + const blob = new Blob(["I AM NOT JSON!"], { type: "application/json" }); + const stream = blob.stream(); + const result = readableStreamToJSON(stream); + expect(result.then).toBeFunction(); + expect(async () => await result).toThrow(); + }); +});