Compare commits

...

24 Commits

Author SHA1 Message Date
Ciro Spaciari
35338e64b7 Merge branch 'main' into jarred/readable-stream-strong 2025-10-06 13:38:38 -07:00
Claude Bot
d56fd9a886 Fix missing readable ref assignment in drain conversion path
When converting a drain result to a ReadableStream, the code was calling
.set() to populate the GC cache but never updating locked.readable to the
matching owner variant (.Request or .Response). This left it as .empty,
breaking subsequent .get(owner, globalThis) calls.

Now matches the blob conversion pattern by:
1. Setting locked.readable to the correct owner variant (or strong ref fallback)
2. Only calling .set() when owner has a valid JSValue

This ensures the ref type matches the GC cache key for proper stream retrieval.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-05 12:14:40 +00:00
Claude Bot
a68d917e17 Fix critical GC issues in ReadableStream lifecycle
Address CodeRabbit review comments to fix memory safety issues:

1. Move request body abort before deref() in RequestContext.finalize()
   - The abort logic was unreachable because it ran after deref() cleared the weak ref
   - Now aborts locked request body streams before releasing the weak reference

2. Preserve tee'd stream for owner-less clones in Body.tee()
   - When readable_stream_tee is supplied but owner is .empty, keep a strong ref
   - Prevents GC from reclaiming the tee branch before Request/Response gains owner
   - Fixes issue where cloned bodies appeared empty or disturbed

These changes ensure proper stream lifecycle management and prevent premature
garbage collection of tee'd ReadableStream branches.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-05 11:53:42 +00:00
Claude Bot
a6745dffc9 Add request body abort logic in RequestContext.finalize
When a request ends with a pending read on the request body stream,
the stream must be aborted to reject the pending read promise.

This mirrors the existing response body abort logic but uses the
request's this_jsvalue to get the proper owner for the Ref system.
2025-10-05 06:28:50 +00:00
Claude Bot
817ebbef9e Revert unintended test refactoring in body-stream.test.ts 2025-10-05 06:08:34 +00:00
Claude Bot
07b3a65e03 Revert "Add setValue calls in doClone to properly initialize Ref with tee'd streams"
This reverts commit 100b7fcd9f.
2025-10-05 05:45:16 +00:00
Claude Bot
100b7fcd9f Add setValue calls in doClone to properly initialize Ref with tee'd streams
Addresses CodeRabbit review comment: doClone must call setValue on both
the original and cloned Request's Locked.readable Ref after setting the
GC cache, mirroring the constructor pattern. This ensures the Ref is
properly initialized with the tee'd stream value, making the body stream
accessible via the owner-aware get() path.
2025-10-05 04:47:12 +00:00
Claude Bot
53c301d727 Fix Content-Type header regression in ensureFetchHeaders
Use proper Request owner from this_jsvalue when getting readable stream
to extract content_type, instead of hardcoded .empty owner.

This fixes the FormData Content-Type header becoming null after
accessing request.body. The issue was that after my changes, Locked
bodies have Ref type .Request, but ensureFetchHeaders was trying to
get the stream with .empty owner, causing a mismatch and null return.

FormData tests now: 108 pass, 1 fail (timeout - pre-existing on main)
2025-10-05 03:50:47 +00:00
Claude Bot
26cbce6165 Store JSValue wrapper in Request.toJS() and fix owner syntax consistency
- Store the wrapper JSValue in this_jsvalue during toJS() so cloned
  requests and body getters can retrieve the owner from GC cache
- Finalize old this_jsvalue before storing new one to prevent leaks
- Standardize empty owner syntax to use .empty consistently

This fixes the issue where Request.clone() would lose owner context
because toJS() creates the wrapper but never stores it for later
retrieval.
2025-10-05 02:25:18 +00:00
Claude Bot
af6fa5baa4 Add this_jsvalue.finalize() cleanup in Request.finalize()
Properly cleanup the JSRef when Request is finalized to prevent
memory leaks.
2025-10-05 01:52:56 +00:00
Claude Bot
2c847c156c Remove debug logging from Body and ReadableStream
All 86 async-iterator-stream tests now passing!
2025-10-05 01:40:07 +00:00
Claude Bot
ad733f2aab Add this_jsvalue field to Request for proper GC cache support
Instead of fallback strong ref architecture, give Request the same
this_jsvalue field that Response has. This allows Request to use the
standard GC cache mechanism for storing/retrieving body streams.

- Add `this_jsvalue: jsc.JSRef` field to Request struct
- Initialize this_jsvalue in Request constructor
- Use setValue() to properly set GC cache and update Ref type
- Update BodyMixin to use this_jsvalue for both Request and Response
- Remove fallback strong ref mechanism (getStream, stream field, etc)

This is the correct architecture: containers (Request/Response) hold
WEAK refs via GC cache, consumers hold STRONG refs.

Result: 85/86 async-iterator-stream tests passing (1 flaky test passes
in isolation, fails when run with full suite - likely pre-existing)
2025-10-05 01:33:52 +00:00
Claude Bot
c470ef9c93 Implement fallback strong ref architecture for Request bodies
- Add  field to Body.Value.Locked.PendingValue
  as fallback strong ref when no owner JSValue exists (e.g., Request)
- Add  helper to check both GC cache and fallback strong ref
- Update  to use  instead of direct
- Update  to use  and manual stream.tee()
  instead of Ref.tee() to support fallback strong ref
- Remove unnecessary GC cache setting in Request constructor
- Properly cleanup fallback strong ref in deinit()

This architecture keeps Request/Response holding weak refs (via GC cache
or empty) while consumers (RequestContext, .text() operations) hold
strong refs. Request bodies work without needing this_jsvalue field by
using the fallback strong ref storage.

Result: 85/86 async-iterator-stream tests passing (1 flaky test)
2025-10-05 00:56:49 +00:00
Claude Bot
60322d76bc Fix Request body stream timeout by upgrading to strong ref
Fixes remaining 41 test timeouts in async-iterator-stream tests for Request bodies.

## Root Cause

Request doesn't track `this_jsvalue` (see line 1160 TODO comment in Body.zig).
This causes:
1. Request constructor stores stream in GC cache with thisValue
2. Body's `readable.Ref` is set to `.Request`
3. When `getText()`/`json()`/etc is called, owner is always `.empty` (no JSValue)
4. `.get(.empty)` returns null → timeout waiting for stream

## Fix

**Request.zig:800-803** - Upgrade Request body Ref to `.strong` in constructor
- After setting GC cache, retrieve the stream
- Call `.upgrade()` to convert Ref from `.Request` (unusable) to `.strong`
- Now `getText()/etc` can retrieve stream via strong ref

## Results

async-iterator-stream tests:
-  85/86 tests pass (was 45/86)
- 1 test has unrelated chunk length expectation issue

All Request body method tests pass:
- Request.text() 
- Request.json() 
- Request.blob() 
- Request.arrayBuffer() 
- Request.bytes() 

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 23:38:50 +00:00
Claude Bot
ff0fb1ee77 Fix async generator Response body hang by upgrading Ref to strong
Fixes hang in async-iterator-stream tests for Response bodies with async generators.

## Root Cause

When `new Response(async generator)` is created:
1. Response constructor stores stream in GC cache (weak ref)
2. Body's `readable.Ref` is set to `.Response` (relies on GC cache)
3. RequestContext receives Response and protects the JSValue
4. BUT: Between Response creation and RequestContext using it, GC could collect the stream
5. When RequestContext calls `.get()`, it returns `null` → hangs waiting for stream

## Fix

**RequestContext.zig:1833-1836** - Upgrade Ref to `.strong` before using
- Call `.get()` to retrieve the stream
- Call `.upgrade()` to convert Ref to `.strong` (prevents GC)
- Subsequent `.get()` calls succeed with strong reference

**ReadableStream.zig:124** - Fix upgrade() bug (pre-existing)
- Changed `current.value` to `current.*`
- `Strong.init()` expects full ReadableStream struct, not just JSValue

## Results

async-iterator-stream tests:
-  45 tests pass (was 0)
- ⚠️ 41 Request body tests still timeout (different issue)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 23:26:18 +00:00
Claude Bot
460c31a6fb Fix critical GC and stream ownership issues from code review
Addresses CodeRabbit review comments:

1. **JSBunRequest.cpp:143** - Fix WriteBarrier owner for original request body
   - Changed from `this->m_body.set(vm, clone, ...)` to `this->m_body.set(vm, this, ...)`
   - The WriteBarrier must record `this` as the mutated cell, not `clone`
   - Prevents GC from missing the strong edge and prematurely reclaiming the body

2. **Request.zig:647** - Fix tee slot assignment for Response clone
   - Changed from `tee_value[1]` to `tee_value[0]`
   - When teeing a Response body into a Request, slot[0] is for the original Response
   - Prevents Response from getting the clone's stream and losing data

3. **Response.zig:577,635** - Initialize this_jsvalue in constructor
   - Added `response.this_jsvalue = .initWeak(thisValue)` when thisValue != .zero
   - Response constructor now properly records its JS wrapper
   - Enables owner-aware code paths to recover the JS wrapper via tryGet()

These fixes ensure proper memory management and prevent stream corruption during
clone operations.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 22:52:48 +00:00
autofix-ci[bot]
24c7dbf242 [autofix.ci] apply automated fixes 2025-10-04 22:11:04 +00:00
Claude Bot
5f1c7453cb Fix ReadableStream.Ref to use strong references when owner has no JSValue
The issue was that when Request.getBody() or Response.getBody() is called
without a valid JSValue owner (e.g., owner.Request == .zero), the code was
trying to store streams in the GC cache, which failed silently, leading to
null reference panics and hanging tests.

Changes:

1. **Body.zig:443-456** - When converting blob to stream in toReadableStream()
   - Check if owner has valid JSValue
   - Use .strong ref if JSValue is .zero or owner is .empty
   - Only set GC cache if owner has valid JSValue

2. **Body.zig:502-519** - When creating stream from drain in toReadableStream()
   - Same logic: use .strong if no valid owner JSValue
   - Fixes panics from unwrapping null on lines 504/507

3. **ReadableStream.zig:127-142** - Fix Ref.init() for empty/strong owners
   - Return .empty instead of trying to create .strong without a stream
   - Caller should use .set() to populate

This fixes the body-stream.test.ts hang. The tests now pass but the full
suite (4,883 tests) takes a very long time (~3s/test for large streaming
tests = ~4 hours total). Individual test subsets all pass correctly.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 22:07:11 +00:00
Claude Bot
699d8474df Fix body-stream test hang and ReadableStream.Ref ownership issues
This commit fixes several issues with the new ReadableStream.Ref system
that caused body-stream tests to hang:

1. **Body.zig:1020** - Fixed owner mismatch when setting stream after draining
   - Was using `.strong` owner when should use actual owner (Request/Response)
   - This caused Ref.get() to fail since Ref type didn't match owner type

2. **Body.zig:964,1026** - Set proper Ref type for cloned bodies
   - When tee() returns streams via array, cloned body gets proper Ref type
   - Ensures cloned body can access its stream via GC cache

3. **Request.zig:819** - Fixed stream assignment in doClone
   - Stream[0] should go to original request, not cloned request

4. **ReadableStream.zig:108** - Always update original Ref after tee()
   - Original body's Ref must point to its tee'd stream

5. **body-stream.test.ts** - Fixed test discovery hang
   - Changed from eager to lazy array creation using factory function
   - Prevents creating hundreds of 2MB arrays during test discovery
   - Arrays now created on-demand when each test runs

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 14:46:01 +00:00
Jarred Sumner
53001ddde0 Merge branch 'main' into jarred/readable-stream-strong 2025-10-04 06:00:08 -07:00
Jarred Sumner
a4c4bcd4be a 2025-10-02 21:56:29 -07:00
Jarred Sumner
a9c930d797 finish 2025-10-02 15:34:15 -07:00
Jarred Sumner
d9a7dab269 a 2025-09-30 22:33:22 -07:00
Jarred Sumner
da9b27f849 initial 2025-09-30 21:50:41 -07:00
20 changed files with 583 additions and 254 deletions

View File

@@ -277,10 +277,10 @@ pub const Stdio = union(enum) {
};
}
fn extractBodyValue(out_stdio: *Stdio, globalThis: *jsc.JSGlobalObject, i: i32, body: *jsc.WebCore.Body.Value, is_sync: bool) bun.JSError!void {
body.toBlobIfPossible();
fn extractBodyValue(out_stdio: *Stdio, globalThis: *jsc.JSGlobalObject, i: i32, owner: jsc.WebCore.ReadableStream.Ref.Owner, body: *jsc.WebCore.Body.Value, is_sync: bool) bun.JSError!void {
body.toBlobIfPossible(owner);
if (body.tryUseAsAnyBlob()) |blob| {
if (body.tryUseAsAnyBlob(owner)) |blob| {
return out_stdio.extractBlob(globalThis, blob, i);
}
@@ -313,7 +313,7 @@ pub const Stdio = union(enum) {
else => unreachable,
}
const stream_value = try body.toReadableStream(globalThis);
const stream_value = try body.toReadableStream(owner, globalThis);
const stream = (try jsc.WebCore.ReadableStream.fromJS(stream_value, globalThis)) orelse return globalThis.throwInvalidArguments("Failed to create ReadableStream", .{});
@@ -391,9 +391,9 @@ pub const Stdio = union(enum) {
} else if (value.as(jsc.WebCore.Blob)) |blob| {
return out_stdio.extractBlob(globalThis, .{ .Blob = blob.dupe() }, i);
} else if (value.as(jsc.WebCore.Request)) |req| {
return extractBodyValue(out_stdio, globalThis, i, req.getBodyValue(), is_sync);
return extractBodyValue(out_stdio, globalThis, i, .{ .Request = value }, req.getBodyValue(), is_sync);
} else if (value.as(jsc.WebCore.Response)) |res| {
return extractBodyValue(out_stdio, globalThis, i, res.getBodyValue(), is_sync);
return extractBodyValue(out_stdio, globalThis, i, .{ .Response = value }, res.getBodyValue(), is_sync);
}
if (try jsc.WebCore.ReadableStream.fromJS(value, globalThis)) |stream_| {

View File

@@ -197,7 +197,7 @@ pub const HTMLRewriter = struct {
if (kind != .other) {
{
const body_value = try jsc.WebCore.Body.extract(global, response_value);
const body_value = try jsc.WebCore.Body.extract(global, response_value, null);
const resp = bun.new(Response, Response{
.init = .{
.status_code = 200,

View File

@@ -1198,6 +1198,8 @@ pub fn NewServer(protocol_enum: enum { http, https }, development_kind: enum { d
bun.default_allocator,
ctx,
false,
first_arg,
null,
);
} else {
const fetch_error = jsc.WebCore.Fetch.fetch_type_error_strings.get(bun.jsc.C.JSValueGetType(ctx, first_arg.asRef()));

View File

@@ -59,7 +59,8 @@ pub fn memoryCost(this: *const FileRoute) usize {
pub fn fromJS(globalThis: *jsc.JSGlobalObject, argument: jsc.JSValue) bun.JSError!?*FileRoute {
if (argument.as(jsc.WebCore.Response)) |response| {
response.body.value.toBlobIfPossible();
const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} };
response.body.value.toBlobIfPossible(owner);
if (response.body.value == .Blob and response.body.value.Blob.needsToReadFile()) {
if (response.body.value.Blob.store.?.data.file.pathlike == .fd) {
return globalThis.throwTODO("Support serving files from a file descriptor. Please pass a path instead.");

View File

@@ -627,6 +627,8 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
abort.cb(abort.data);
}
const response_jsvalue = this.response_jsvalue;
this.detachResponse();
var any_js_calls = false;
var vm = this.server.?.vm;
@@ -647,6 +649,19 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
}
// we can already clean this strong refs
request.internal_event_callback.deinit();
// Abort request body stream if still locked (e.g., pending read)
// Must do this before deref() so the request is still reachable
if (request.body.value == .Locked) {
const owner: jsc.WebCore.ReadableStream.Ref.Owner = if (request.this_jsvalue.tryGet()) |js_value|
.{ .Request = js_value }
else
.empty;
if (request.body.value.Locked.readable.abort(owner, globalThis)) {
any_js_calls = true;
}
}
this.request_weakref.deref();
}
// if signal is not aborted, abort the signal
@@ -678,11 +693,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
if (this.response_ptr) |response| {
if (response.body.value == .Locked) {
var strong_readable = response.body.value.Locked.readable;
response.body.value.Locked.readable = .{};
defer strong_readable.deinit();
if (strong_readable.get(globalThis)) |readable| {
readable.abort(globalThis);
if (response.body.value.Locked.readable.abort(.{ .Response = response_jsvalue }, globalThis)) {
any_js_calls = true;
}
}
@@ -1216,7 +1227,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
// TODO: should this timeout?
this.response_ptr.?.body.value = .{
.Locked = .{
.readable = jsc.WebCore.ReadableStream.Strong.init(stream, globalThis),
.readable = .{ .strong = .init(stream, globalThis) },
.global = globalThis,
},
};
@@ -1454,7 +1465,8 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
}
}
// not content-length or transfer-encoding so we need to respect the body
response.body.value.toBlobIfPossible();
const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} };
response.body.value.toBlobIfPossible(owner);
switch (response.body.value) {
.InternalBlob, .WTFStringImpl => {
var blob = response.body.value.useAsAnyBlobAllowNonUTF8String();
@@ -1559,7 +1571,8 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
}
return;
} else {
response.body.value.toBlobIfPossible();
const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} };
response.body.value.toBlobIfPossible(owner);
switch (response.body.value) {
.Blob => |*blob| {
@@ -1618,7 +1631,8 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
}
return;
}
response.body.value.toBlobIfPossible();
const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} };
response.body.value.toBlobIfPossible(owner);
switch (response.body.value) {
.Blob => |*blob| {
if (blob.needsToReadFile()) {
@@ -1662,10 +1676,11 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
if (resp.body.value == .Locked) {
const global = resp.body.value.Locked.global;
if (resp.body.value.Locked.readable.get(global)) |stream| {
stream.done(global);
if (resp.this_jsvalue.tryGet()) |value| {
resp.body.value.Locked.readable.done(.{ .Response = value }, global);
} else {
resp.body.value.Locked.readable.deinit();
}
resp.body.value.Locked.readable.deinit();
resp.body.value = .{ .Used = {} };
}
}
@@ -1715,10 +1730,11 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
if (req.response_ptr) |resp| {
if (resp.body.value == .Locked) {
if (resp.body.value.Locked.readable.get(globalThis)) |stream| {
stream.done(globalThis);
if (resp.this_jsvalue.tryGet()) |value| {
resp.body.value.Locked.readable.done(.{ .Response = value }, globalThis);
} else {
resp.body.value.Locked.readable.deinit();
}
resp.body.value.Locked.readable.deinit();
resp.body.value = .{ .Used = {} };
}
}
@@ -1801,7 +1817,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
// If a ReadableStream can trivially be converted to a Blob, do so.
// If it's a WTFStringImpl and it cannot be used as a UTF-8 string, convert it to a Blob.
value.toBlobIfPossible();
value.toBlobIfPossible(.{ .empty = {} });
const globalThis = this.server.?.globalThis;
switch (value.*) {
.Error => |*err_ref| {
@@ -1827,11 +1843,13 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
return;
}
if (lock.readable.get(globalThis)) |stream_| {
const stream: jsc.WebCore.ReadableStream = stream_;
// we hold the stream alive until we're done with it
this.readable_stream_ref = lock.readable;
value.* = .{ .Used = {} };
if (lock.readable.get(.{ .Response = this.response_jsvalue }, globalThis)) |stream| {
{
var old = this.readable_stream_ref;
this.readable_stream_ref = .init(stream, globalThis);
old.deinit();
value.* = .{ .Used = {} };
}
if (stream.isLocked(globalThis)) {
streamLog("was locked but it shouldn't be", .{});
@@ -1839,7 +1857,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
.code = bun.String.static(@tagName(jsc.Node.ErrorCode.ERR_STREAM_CANNOT_PIPE)),
.message = bun.String.static("Stream already used, please create a new one"),
};
stream.value.unprotect();
this.readable_stream_ref.deinit();
this.runErrorHandler(err.toErrorInstance(globalThis));
return;
}
@@ -1883,7 +1901,10 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
}
this.ref();
byte_stream.pipe = jsc.WebCore.Pipe.Wrap(@This(), onPipe).init(this);
this.readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(stream, globalThis);
if (this.readable_stream_ref.has()) {
this.readable_stream_ref.deinit();
}
this.readable_stream_ref = .init(stream, globalThis);
this.byte_stream = byte_stream;
var response_buf = byte_stream.drain();
@@ -1907,7 +1928,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
if (lock.onReceiveValue != null or lock.task != null) {
// someone else is waiting for the stream or waiting for `onStartStreaming`
const readable = value.toReadableStream(globalThis) catch return; // TODO: properly propagate exception upwards
const readable = value.toReadableStream(.{ .Response = this.response_jsvalue }, globalThis) catch return;
readable.ensureStillAlive();
this.doRenderWithBody(value);
return;
@@ -2162,7 +2183,8 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
ctx.flags.response_protected = false;
ctx.response_ptr = response;
response.body.value.toBlobIfPossible();
const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} };
response.body.value.toBlobIfPossible(owner);
switch (response.body.value) {
.Blob => |*blob| {
if (blob.needsToReadFile()) {
@@ -2492,7 +2514,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
pub fn onRequestBodyReadableStreamAvailable(ptr: *anyopaque, globalThis: *jsc.JSGlobalObject, readable: jsc.WebCore.ReadableStream) void {
var this = bun.cast(*RequestContext, ptr);
bun.debugAssert(this.request_body_readable_stream_ref.held.impl == null);
this.request_body_readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis);
this.request_body_readable_stream_ref = .init(readable, globalThis);
}
pub fn onStartBufferingCallback(this: *anyopaque) void {

View File

@@ -91,7 +91,8 @@ pub fn fromJS(globalThis: *jsc.JSGlobalObject, argument: jsc.JSValue) bun.JSErro
// The user may want to pass in the same Response object multiple endpoints
// Let's let them do that.
response.body.value.toBlobIfPossible();
const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} };
response.body.value.toBlobIfPossible(owner);
const blob: AnyBlob = brk: {
switch (response.body.value) {

View File

@@ -96,14 +96,15 @@ JSObject* JSBunRequest::cookies() const
return m_cookies.get();
}
extern "C" void* Request__clone(void* internalZigRequestPointer, JSGlobalObject* globalObject);
extern "C" void* Request__clone(void* internalZigRequestPointer, JSGlobalObject* globalObject, JSC::EncodedJSValue thisValue, JSC::EncodedJSValue* readableStreamTee);
JSBunRequest* JSBunRequest::clone(JSC::VM& vm, JSGlobalObject* globalObject)
{
auto throwScope = DECLARE_THROW_SCOPE(vm);
JSC::EncodedJSValue readableStreamTee[2] { encodedJSValue(), encodedJSValue() };
auto* structure = defaultGlobalObject(globalObject)->m_JSBunRequestStructure.getInitializedOnMainThread(globalObject);
auto* raw = Request__clone(this->wrapped(), globalObject);
auto* raw = Request__clone(this->wrapped(), globalObject, JSValue::encode(this), readableStreamTee);
EXCEPTION_ASSERT(!!raw == !throwScope.exception());
RETURN_IF_EXCEPTION(throwScope, nullptr);
auto* clone = this->create(vm, structure, raw, nullptr);
@@ -134,9 +135,18 @@ JSBunRequest* JSBunRequest::clone(JSC::VM& vm, JSGlobalObject* globalObject)
auto cookieMapClone = cookieMap->clone();
auto cookies = WebCore::toJSNewlyCreated(globalObject, jsCast<JSDOMGlobalObject*>(globalObject), WTFMove(cookieMapClone));
clone->setCookies(cookies.getObject());
RETURN_IF_EXCEPTION(throwScope, nullptr);
}
}
if (readableStreamTee[0] != encodedJSValue()) {
this->m_body.set(vm, this, JSValue::decode(readableStreamTee[0]));
}
if (readableStreamTee[1] != encodedJSValue()) {
clone->m_body.set(vm, clone, JSValue::decode(readableStreamTee[1]));
}
RELEASE_AND_RETURN(throwScope, clone);
}

View File

@@ -899,10 +899,11 @@ pub const JSGlobalObject = opaque {
}
// We're done validating. From now on, deal with extracting the body.
body.toBlobIfPossible();
const owner = jsc.WebCore.ReadableStream.Ref.Owner{ .Response = response_value };
body.toBlobIfPossible(owner);
var any_blob = switch (body.*) {
.Locked => body.tryUseAsAnyBlob() orelse return body.toReadableStream(this),
.Locked => body.tryUseAsAnyBlob(owner) orelse return body.toReadableStream(owner, this),
else => body.useAsAnyBlob(),
};

View File

@@ -86,9 +86,9 @@ pub const BlobOrStringOrBuffer = union(enum) {
}
if (allow_request_response) {
if (value.as(jsc.WebCore.Request)) |request| {
request.body.value.toBlobIfPossible();
request.body.value.toBlobIfPossible(.{ .empty = {} });
if (request.body.value.tryUseAsAnyBlob()) |any_blob_| {
if (request.body.value.tryUseAsAnyBlob(.{ .empty = {} })) |any_blob_| {
var any_blob = any_blob_;
defer any_blob.detach();
return .{ .blob = any_blob.toBlob(global) };
@@ -98,9 +98,10 @@ pub const BlobOrStringOrBuffer = union(enum) {
}
if (value.as(jsc.WebCore.Response)) |response| {
response.body.value.toBlobIfPossible();
const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} };
response.body.value.toBlobIfPossible(owner);
if (response.body.value.tryUseAsAnyBlob()) |any_blob_| {
if (response.body.value.tryUseAsAnyBlob(owner)) |any_blob_| {
var any_blob = any_blob_;
defer any_blob.detach();
return .{ .blob = any_blob.toBlob(global) };

View File

@@ -44,7 +44,7 @@ pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame, b
}
}
return Response.constructor(globalThis, callframe);
return Response.constructor(globalThis, callframe, .zero);
}
pub export fn BakeResponseClass__constructRedirect(globalObject: *jsc.JSGlobalObject, callFrame: *jsc.CallFrame) callconv(jsc.conv) jsc.JSValue {

View File

@@ -1355,13 +1355,13 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr
_ = response.body.value.use();
return jsc.JSPromise.dangerouslyCreateRejectedPromiseValueWithoutNotifyingVM(globalThis, err_ref.toJS(globalThis));
},
.Locked => |*locked| {
.Locked => {
if (destination_blob.isS3()) {
const s3 = &destination_blob.store.?.data.s3;
var aws_options = try s3.getCredentialsWithOptions(options.extra_options, globalThis);
defer aws_options.deinit();
_ = try response.body.value.toReadableStream(globalThis);
if (locked.readable.get(globalThis)) |readable| {
const stream_value = try response.body.value.toReadableStream(.{ .Response = data }, globalThis);
if (try jsc.WebCore.ReadableStream.fromJS(stream_value, globalThis)) |readable| {
if (readable.isDisturbed(globalThis)) {
destination_blob.detach();
return globalThis.throwInvalidArguments("ReadableStream has already been used", .{});
@@ -1416,13 +1416,13 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr
_ = request.body.value.use();
return jsc.JSPromise.dangerouslyCreateRejectedPromiseValueWithoutNotifyingVM(globalThis, err_ref.toJS(globalThis));
},
.Locked => |locked| {
.Locked => {
if (destination_blob.isS3()) {
const s3 = &destination_blob.store.?.data.s3;
var aws_options = try s3.getCredentialsWithOptions(options.extra_options, globalThis);
defer aws_options.deinit();
_ = try request.body.value.toReadableStream(globalThis);
if (locked.readable.get(globalThis)) |readable| {
const stream_value = try request.body.value.toReadableStream(.{ .Request = data }, globalThis);
if (try jsc.WebCore.ReadableStream.fromJS(stream_value, globalThis)) |readable| {
if (readable.isDisturbed(globalThis)) {
destination_blob.detach();
return globalThis.throwInvalidArguments("ReadableStream has already been used", .{});

View File

@@ -5,7 +5,7 @@ const Body = @This();
value: Value, // = Value.empty,
pub fn len(this: *Body) Blob.SizeType {
return this.value.size();
return this.value.size(.{ .empty = {} });
}
pub fn slice(this: *const Body) []const u8 {
@@ -16,9 +16,9 @@ pub fn use(this: *Body) Blob {
return this.value.use();
}
pub fn clone(this: *Body, globalThis: *JSGlobalObject) bun.JSError!Body {
pub fn clone(this: *Body, owner: jsc.WebCore.ReadableStream.Ref.Owner, globalThis: *JSGlobalObject, readable_stream_tee: ?*[2]jsc.JSValue) bun.JSError!Body {
return Body{
.value = try this.value.clone(globalThis),
.value = try this.value.clone(owner, globalThis, readable_stream_tee),
};
}
@@ -38,9 +38,9 @@ pub fn writeFormat(this: *Body, comptime Formatter: type, formatter: *Formatter,
try formatter.printComma(Writer, writer, enable_ansi_colors);
try writer.writeAll("\n");
try formatter.writeIndent(Writer, writer);
try Blob.writeFormatForSize(false, this.value.size(), writer, enable_ansi_colors);
try Blob.writeFormatForSize(false, this.value.size(.{ .empty = {} }), writer, enable_ansi_colors);
} else if (this.value == .Locked) {
if (this.value.Locked.readable.get(this.value.Locked.global)) |stream| {
if (this.value.Locked.readable.get(.{ .empty = {} }, this.value.Locked.global)) |stream| {
try formatter.printComma(Writer, writer, enable_ansi_colors);
try writer.writeAll("\n");
try formatter.writeIndent(Writer, writer);
@@ -55,7 +55,7 @@ pub fn deinit(this: *Body, _: std.mem.Allocator) void {
pub const PendingValue = struct {
promise: ?JSValue = null,
readable: jsc.WebCore.ReadableStream.Strong = .{},
readable: jsc.WebCore.ReadableStream.Ref = .empty,
// writable: jsc.WebCore.Sink
global: *JSGlobalObject,
@@ -78,8 +78,8 @@ pub const PendingValue = struct {
/// when Content-Length is provided this represents the whole size of the request
/// If chunked encoded this will represent the total received size (ignoring the chunk headers)
/// If the size is unknown will be 0
fn sizeHint(this: *const PendingValue) Blob.SizeType {
if (this.readable.get(this.global)) |readable| {
fn sizeHint(this: *const PendingValue, owner: jsc.WebCore.ReadableStream.Ref.Owner) Blob.SizeType {
if (this.readable.get(owner, this.global)) |readable| {
if (readable.ptr == .Bytes) {
return readable.ptr.Bytes.size_hint;
}
@@ -87,46 +87,27 @@ pub const PendingValue = struct {
return this.size_hint;
}
pub fn toAnyBlob(this: *PendingValue) ?AnyBlob {
pub fn toAnyBlob(this: *PendingValue, owner: jsc.WebCore.ReadableStream.Ref.Owner) ?AnyBlob {
if (this.promise != null)
return null;
return this.toAnyBlobAllowPromise();
return this.toAnyBlobAllowPromise(owner);
}
pub fn isDisturbed(this: *const PendingValue, comptime T: type, globalObject: *jsc.JSGlobalObject, this_value: jsc.JSValue) bool {
pub fn isDisturbed(this: *const PendingValue, owner: jsc.WebCore.ReadableStream.Ref.Owner, globalObject: *jsc.JSGlobalObject) bool {
if (this.promise != null) {
return true;
}
if (T.js.bodyGetCached(this_value)) |body_value| {
if (jsc.WebCore.ReadableStream.isDisturbedValue(body_value, globalObject)) {
return true;
}
return false;
}
if (this.readable.get(globalObject)) |readable| {
return readable.isDisturbed(globalObject);
}
return false;
return this.readable.isDisturbed(owner, globalObject);
}
pub fn isDisturbed2(this: *const PendingValue, globalObject: *jsc.JSGlobalObject) bool {
if (this.promise != null) {
return true;
}
if (this.readable.get(globalObject)) |readable| {
return readable.isDisturbed(globalObject);
}
return false;
pub fn abort(this: *PendingValue, owner: jsc.WebCore.ReadableStream.Ref.Owner, globalObject: *jsc.JSGlobalObject) bool {
return this.readable.abort(owner, globalObject);
}
pub fn isStreamingOrBuffering(this: *PendingValue) bool {
return this.readable.held.has() or (this.promise != null and !this.promise.?.isEmptyOrUndefinedOrNull());
return this.readable != .empty or (this.promise != null and !this.promise.?.isEmptyOrUndefinedOrNull());
}
pub fn hasPendingPromise(this: *PendingValue) bool {
@@ -146,20 +127,13 @@ pub const PendingValue = struct {
return false;
}
pub fn toAnyBlobAllowPromise(this: *PendingValue) ?AnyBlob {
var stream = if (this.readable.get(this.global)) |readable| readable else return null;
if (stream.toAnyBlob(this.global)) |blob| {
this.readable.deinit();
return blob;
}
return null;
pub fn toAnyBlobAllowPromise(this: *PendingValue, owner: jsc.WebCore.ReadableStream.Ref.Owner) ?AnyBlob {
return this.readable.toAnyBlob(owner, this.global);
}
pub fn setPromise(value: *PendingValue, globalThis: *jsc.JSGlobalObject, action: Action) JSValue {
pub fn setPromise(value: *PendingValue, owner: jsc.WebCore.ReadableStream.Ref.Owner, globalThis: *jsc.JSGlobalObject, action: Action) JSValue {
value.action = action;
if (value.readable.get(globalThis)) |readable| {
if (value.readable.get(owner, globalThis)) |readable| {
switch (action) {
.getFormData, .getText, .getJSON, .getBlob, .getArrayBuffer, .getBytes => {
const promise = switch (action) {
@@ -340,7 +314,7 @@ pub const Value = union(Tag) {
this.* = .{ .JSValue = .empty };
}
};
pub fn toBlobIfPossible(this: *Value) void {
pub fn toBlobIfPossible(this: *Value, owner: jsc.WebCore.ReadableStream.Ref.Owner) void {
if (this.* == .WTFStringImpl) {
if (this.WTFStringImpl.toUTF8IfNeeded(bun.default_allocator)) |bytes| {
var str = this.WTFStringImpl;
@@ -357,7 +331,7 @@ pub const Value = union(Tag) {
if (this.* != .Locked)
return;
if (this.Locked.toAnyBlob()) |blob| {
if (this.Locked.toAnyBlob(owner)) |blob| {
this.* = switch (blob) {
.Blob => .{ .Blob = blob.Blob },
.InternalBlob => .{ .InternalBlob = blob.InternalBlob },
@@ -367,22 +341,22 @@ pub const Value = union(Tag) {
}
}
pub fn size(this: *Value) Blob.SizeType {
pub fn size(this: *Value, owner: jsc.WebCore.ReadableStream.Ref.Owner) Blob.SizeType {
return switch (this.*) {
.Blob => @truncate(this.Blob.getSizeForBindings()),
.InternalBlob => @as(Blob.SizeType, @truncate(this.InternalBlob.sliceConst().len)),
.WTFStringImpl => @as(Blob.SizeType, @truncate(this.WTFStringImpl.utf8ByteLength())),
.Locked => this.Locked.sizeHint(),
.Locked => this.Locked.sizeHint(owner),
// .InlineBlob => @truncate(Blob.SizeType, this.InlineBlob.sliceConst().len),
else => 0,
};
}
pub fn fastSize(this: *const Value) Blob.SizeType {
pub fn fastSize(this: *const Value, owner: jsc.WebCore.ReadableStream.Ref.Owner) Blob.SizeType {
return switch (this.*) {
.InternalBlob => @as(Blob.SizeType, @truncate(this.InternalBlob.sliceConst().len)),
.WTFStringImpl => @as(Blob.SizeType, @truncate(this.WTFStringImpl.byteSlice().len)),
.Locked => this.Locked.sizeHint(),
.Locked => this.Locked.sizeHint(owner),
// .InlineBlob => @truncate(Blob.SizeType, this.InlineBlob.sliceConst().len),
else => 0,
};
@@ -392,7 +366,7 @@ pub const Value = union(Tag) {
return switch (this.*) {
.InternalBlob => this.InternalBlob.bytes.items.len,
.WTFStringImpl => this.WTFStringImpl.memoryCost(),
.Locked => this.Locked.sizeHint(),
.Locked => this.Locked.sizeHint(.{ .empty = {} }),
// .InlineBlob => this.InlineBlob.sliceConst().len,
else => 0,
};
@@ -402,7 +376,7 @@ pub const Value = union(Tag) {
return switch (this.*) {
.InternalBlob => this.InternalBlob.sliceConst().len,
.WTFStringImpl => this.WTFStringImpl.byteSlice().len,
.Locked => this.Locked.sizeHint(),
.Locked => this.Locked.sizeHint(.{ .empty = {} }),
// .InlineBlob => this.InlineBlob.sliceConst().len,
else => 0,
};
@@ -444,7 +418,7 @@ pub const Value = union(Tag) {
// pub const empty = Value{ .Empty = {} };
pub fn toReadableStream(this: *Value, globalThis: *JSGlobalObject) bun.JSError!JSValue {
pub fn toReadableStream(this: *Value, owner: jsc.WebCore.ReadableStream.Ref.Owner, globalThis: *JSGlobalObject) bun.JSError!JSValue {
jsc.markBinding(@src());
switch (this.*) {
@@ -463,17 +437,28 @@ pub const Value = union(Tag) {
blob.resolveSize();
const value = try jsc.WebCore.ReadableStream.fromBlobCopyRef(globalThis, &blob, blob.size);
const stream = (try jsc.WebCore.ReadableStream.fromJS(value, globalThis)).?;
this.* = .{
.Locked = .{
.readable = jsc.WebCore.ReadableStream.Strong.init((try jsc.WebCore.ReadableStream.fromJS(value, globalThis)).?, globalThis),
.readable = switch (owner) {
.Request => |jsval| if (jsval != .zero) .Request else .{ .strong = .init(stream, globalThis) },
.Response => |jsval| if (jsval != .zero) .Response else .{ .strong = .init(stream, globalThis) },
.strong, .empty => .{ .strong = .init(stream, globalThis) },
},
.global = globalThis,
},
};
// Only set in GC cache if we have a valid JSValue owner
switch (owner) {
.Request => |jsval| if (jsval != .zero) this.Locked.readable.set(owner, stream, globalThis),
.Response => |jsval| if (jsval != .zero) this.Locked.readable.set(owner, stream, globalThis),
.strong, .empty => {},
}
return value;
},
.Locked => {
var locked = &this.Locked;
if (locked.readable.get(globalThis)) |readable| {
if (locked.readable.get(owner, globalThis)) |readable| {
return readable.value;
}
if (locked.promise != null or locked.action != .none) {
@@ -508,16 +493,30 @@ pub const Value = union(Tag) {
reader.context.size_hint = @as(Blob.SizeType, @truncate(drain_result.owned.size_hint));
}
locked.readable = jsc.WebCore.ReadableStream.Strong.init(.{
const stream_value = try reader.toReadableStream(globalThis);
const stream = jsc.WebCore.ReadableStream{
.ptr = .{ .Bytes = &reader.context },
.value = try reader.toReadableStream(globalThis),
}, globalThis);
.value = stream_value,
};
if (locked.onReadableStreamAvailable) |onReadableStreamAvailable| {
onReadableStreamAvailable(locked.task.?, globalThis, locked.readable.get(globalThis).?);
// Use strong ref if owner doesn't have a valid JSValue
locked.readable = switch (owner) {
.Request => |jsval| if (jsval != .zero) .Request else .{ .strong = .init(stream, globalThis) },
.Response => |jsval| if (jsval != .zero) .Response else .{ .strong = .init(stream, globalThis) },
.strong, .empty => .{ .strong = .init(stream, globalThis) },
};
// Only populate GC cache when owner has a valid JSValue
switch (owner) {
.Request => |jsval| if (jsval != .zero) locked.readable.set(owner, stream, globalThis),
.Response => |jsval| if (jsval != .zero) locked.readable.set(owner, stream, globalThis),
.strong, .empty => {},
}
return locked.readable.get(globalThis).?.value;
if (locked.onReadableStreamAvailable) |onReadableStreamAvailable| {
onReadableStreamAvailable(locked.task.?, globalThis, stream);
}
return stream.value;
},
.Error => {
// TODO: handle error properly
@@ -527,6 +526,10 @@ pub const Value = union(Tag) {
}
pub fn fromJS(globalThis: *JSGlobalObject, value: JSValue) bun.JSError!Value {
return fromJSWithReadableStreamValue(globalThis, value, null);
}
pub fn fromJSWithReadableStreamValue(globalThis: *JSGlobalObject, value: JSValue, readable_stream_value: ?*JSValue) bun.JSError!Value {
value.ensureStillAlive();
if (value.isEmptyOrUndefinedOrNull()) {
@@ -621,6 +624,14 @@ pub const Value = union(Tag) {
else => {},
}
if (readable_stream_value) |readable_stream_ptr| {
readable_stream_ptr.* = readable.value;
return .{ .Locked = .{
.global = globalThis,
.readable = .empty,
} };
}
return Body.Value.fromReadableStreamWithoutLockCheck(readable, globalThis);
}
@@ -642,7 +653,7 @@ pub const Value = union(Tag) {
pub fn fromReadableStreamWithoutLockCheck(readable: jsc.WebCore.ReadableStream, globalThis: *JSGlobalObject) Value {
return .{
.Locked = .{
.readable = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis),
.readable = .{ .strong = .init(readable, globalThis) },
.global = globalThis,
},
};
@@ -658,7 +669,7 @@ pub const Value = union(Tag) {
if (to_resolve.* == .Locked) {
var locked = &to_resolve.Locked;
if (locked.readable.get(global)) |readable| {
if (locked.readable.get(.{ .empty = {} }, global)) |readable| {
readable.done(global);
locked.readable.deinit();
}
@@ -755,7 +766,7 @@ pub const Value = union(Tag) {
}
pub fn use(this: *Value) Blob {
this.toBlobIfPossible();
this.toBlobIfPossible(.{ .empty = {} });
switch (this.*) {
.Blob => {
@@ -817,7 +828,7 @@ pub const Value = union(Tag) {
}
}
pub fn tryUseAsAnyBlob(this: *Value) ?AnyBlob {
pub fn tryUseAsAnyBlob(this: *Value, owner: jsc.WebCore.ReadableStream.Ref.Owner) ?AnyBlob {
if (this.* == .WTFStringImpl) {
if (this.WTFStringImpl.canUseAsUTF8()) {
return AnyBlob{ .WTFStringImpl = this.WTFStringImpl };
@@ -828,7 +839,7 @@ pub const Value = union(Tag) {
.Blob => AnyBlob{ .Blob = this.Blob },
.InternalBlob => AnyBlob{ .InternalBlob = this.InternalBlob },
// .InlineBlob => AnyBlob{ .InlineBlob = this.InlineBlob },
.Locked => this.Locked.toAnyBlobAllowPromise() orelse return null,
.Locked => this.Locked.toAnyBlobAllowPromise(owner) orelse return null,
else => return null,
};
@@ -856,7 +867,7 @@ pub const Value = union(Tag) {
}
},
// .InlineBlob => .{ .InlineBlob = this.InlineBlob },
.Locked => this.Locked.toAnyBlobAllowPromise() orelse AnyBlob{ .Blob = .{} },
.Locked => this.Locked.toAnyBlobAllowPromise(.strong) orelse AnyBlob{ .Blob = .{} },
else => .{ .Blob = Blob.initEmpty(undefined) },
};
@@ -873,7 +884,7 @@ pub const Value = union(Tag) {
.InternalBlob => .{ .InternalBlob = this.InternalBlob },
.WTFStringImpl => .{ .WTFStringImpl = this.WTFStringImpl },
// .InlineBlob => .{ .InlineBlob = this.InlineBlob },
.Locked => this.Locked.toAnyBlobAllowPromise() orelse AnyBlob{ .Blob = .{} },
.Locked => this.Locked.toAnyBlobAllowPromise(.strong) orelse AnyBlob{ .Blob = .{} },
else => .{ .Blob = Blob.initEmpty(undefined) },
};
@@ -890,7 +901,7 @@ pub const Value = union(Tag) {
this.* = .{ .Error = err };
var strong_readable = locked.readable;
locked.readable = .{};
locked.readable = .{ .empty = {} };
defer strong_readable.deinit();
if (locked.hasPendingPromise()) {
@@ -905,7 +916,7 @@ pub const Value = union(Tag) {
// The Promise version goes before the ReadableStream version incase the Promise version is used too.
// Avoid creating unnecessary duplicate JSValue.
if (strong_readable.get(global)) |readable| {
if (strong_readable.get(.strong, global)) |readable| {
if (readable.ptr == .Bytes) {
readable.ptr.Bytes.onData(
.{ .err = this.Error.toStreamError(global) },
@@ -938,7 +949,7 @@ pub const Value = union(Tag) {
if (!this.Locked.deinit) {
this.Locked.deinit = true;
this.Locked.readable.deinit();
this.Locked.readable = .{};
this.Locked.readable = .empty;
}
return;
@@ -964,23 +975,39 @@ pub const Value = union(Tag) {
}
}
pub fn tee(this: *Value, globalThis: *jsc.JSGlobalObject) bun.JSError!Value {
pub fn tee(this: *Value, owner: jsc.WebCore.ReadableStream.Ref.Owner, globalThis: *jsc.JSGlobalObject, readable_stream_tee: ?*[2]jsc.JSValue) bun.JSError!Value {
var locked = &this.Locked;
if (locked.readable.isDisturbed(globalThis)) {
return Value{ .Used = {} };
if (locked.readable.isDisturbed(owner, globalThis)) {
return .Used;
}
if (try locked.readable.tee(globalThis)) |readable| {
return Value{
if (try locked.readable.tee(owner, globalThis, readable_stream_tee)) |result| {
if (readable_stream_tee != null) {
return .{
.Locked = .{
.readable = switch (owner) {
.Response => .Response,
.Request => .Request,
// For owner-less clones, keep a strong ref to the tee'd stream
// until the Request/Response gains a real owner
else => .{ .strong = .init(result.@"1", globalThis) },
},
.global = globalThis,
},
};
}
return .{
.Locked = .{
.readable = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis),
.readable = .{ .strong = .init(result.@"1", globalThis) },
.global = globalThis,
},
};
}
if (locked.promise != null or locked.action != .none or locked.readable.has()) {
return Value{ .Used = {} };
if (locked.promise != null or locked.action != .none or locked.readable.has(owner, globalThis)) {
return .Used;
}
var drain_result: jsc.WebCore.DrainResult = .{
@@ -993,8 +1020,8 @@ pub const Value = union(Tag) {
}
if (drain_result == .empty or drain_result == .aborted) {
this.* = .{ .Null = {} };
return Value{ .Null = {} };
this.* = .Null;
return .Null;
}
var reader = jsc.WebCore.ByteStream.Source.new(.{
@@ -1012,30 +1039,45 @@ pub const Value = union(Tag) {
reader.context.size_hint = @as(Blob.SizeType, @truncate(drain_result.owned.size_hint));
}
locked.readable = jsc.WebCore.ReadableStream.Strong.init(.{
const stream_value = try reader.toReadableStream(globalThis);
const stream = jsc.WebCore.ReadableStream{
.ptr = .{ .Bytes = &reader.context },
.value = try reader.toReadableStream(globalThis),
}, globalThis);
.value = stream_value,
};
locked.readable.set(owner, stream, globalThis);
if (locked.onReadableStreamAvailable) |onReadableStreamAvailable| {
onReadableStreamAvailable(locked.task.?, globalThis, locked.readable.get(globalThis).?);
onReadableStreamAvailable(locked.task.?, globalThis, locked.readable.get(owner, globalThis).?);
}
const teed = (try locked.readable.tee(globalThis)) orelse return Value{ .Used = {} };
const tee_result = (try locked.readable.tee(owner, globalThis, readable_stream_tee)) orelse return Value{ .Used = {} };
if (readable_stream_tee != null) {
return .{
.Locked = .{
.readable = switch (owner) {
.Response => .Response,
.Request => .Request,
else => .empty,
},
.global = globalThis,
},
};
}
return Value{
.Locked = .{
.readable = jsc.WebCore.ReadableStream.Strong.init(teed, globalThis),
.readable = .{ .strong = .init(tee_result.@"1", globalThis) },
.global = globalThis,
},
};
}
pub fn clone(this: *Value, globalThis: *jsc.JSGlobalObject) bun.JSError!Value {
this.toBlobIfPossible();
pub fn clone(this: *Value, owner: jsc.WebCore.ReadableStream.Ref.Owner, globalThis: *jsc.JSGlobalObject, readable_stream_tee: ?*[2]jsc.JSValue) bun.JSError!Value {
this.toBlobIfPossible(owner);
if (this.* == .Locked) {
return this.tee(globalThis);
return try this.tee(owner, globalThis, readable_stream_tee);
}
if (this.* == .InternalBlob) {
@@ -1074,10 +1116,11 @@ pub const Value = union(Tag) {
pub fn extract(
globalThis: *JSGlobalObject,
value: JSValue,
readable_stream_value: ?*JSValue,
) bun.JSError!Body {
var body = Body{ .value = Value{ .Null = {} } };
body.value = try Value.fromJS(globalThis, value);
body.value = try Value.fromJSWithReadableStreamValue(globalThis, value, readable_stream_value);
if (body.value == .Blob) {
assert(!body.value.Blob.isHeapAllocated()); // owned by Body
}
@@ -1086,18 +1129,29 @@ pub fn extract(
pub fn Mixin(comptime Type: type) type {
return struct {
inline fn getOwner(this_value: JSValue) jsc.WebCore.ReadableStream.Ref.Owner {
if (Type == jsc.WebCore.Request) {
return .{ .Request = this_value };
} else if (Type == jsc.WebCore.Response) {
return .{ .Response = this_value };
} else {
@compileError("Mixin only supports Request or Response types");
}
}
pub fn getText(this: *Type, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue {
const owner = getOwner(callframe.this());
var value: *Body.Value = this.getBodyValue();
if (value.* == .Used) {
return handleBodyAlreadyUsed(globalObject);
}
if (value.* == .Locked) {
if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) {
if (value.Locked.action != .none or value.Locked.isDisturbed(owner, globalObject)) {
return handleBodyAlreadyUsed(globalObject);
}
return value.Locked.setPromise(globalObject, .{ .getText = {} });
return value.Locked.setPromise(owner, globalObject, .{ .getText = {} });
}
var blob = value.useAsAnyBlobAllowNonUTF8String();
@@ -1105,16 +1159,20 @@ pub fn Mixin(comptime Type: type) type {
}
pub fn getBody(this: *Type, globalThis: *jsc.JSGlobalObject) bun.JSError!JSValue {
const this_value = this.this_jsvalue.tryGet() orelse JSValue.zero;
const owner = if (this_value != .zero) getOwner(this_value) else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} };
var body: *Body.Value = this.getBodyValue();
if (body.* == .Used) {
return jsc.WebCore.ReadableStream.used(globalThis);
}
return body.toReadableStream(globalThis);
return body.toReadableStream(owner, globalThis);
}
pub fn getBodyUsed(this: *Type, globalObject: *jsc.JSGlobalObject) JSValue {
const this_value = this.this_jsvalue.tryGet() orelse JSValue.zero;
const owner = if (this_value != .zero) getOwner(this_value) else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} };
return JSValue.jsBoolean(
switch (this.getBodyValue().*) {
.Used => true,
@@ -1123,7 +1181,7 @@ pub fn Mixin(comptime Type: type) type {
break :brk true;
}
if (pending.readable.get(globalObject)) |*stream| {
if (pending.readable.get(owner, globalObject)) |*stream| {
break :brk stream.isDisturbed(globalObject);
}
@@ -1143,19 +1201,20 @@ pub fn Mixin(comptime Type: type) type {
}
pub fn getJSON(this: *Type, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue {
const owner = getOwner(callframe.this());
var value: *Body.Value = this.getBodyValue();
if (value.* == .Used) {
return handleBodyAlreadyUsed(globalObject);
}
if (value.* == .Locked) {
if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) {
if (value.Locked.action != .none or value.Locked.isDisturbed(owner, globalObject)) {
return handleBodyAlreadyUsed(globalObject);
}
value.toBlobIfPossible();
value.toBlobIfPossible(owner);
if (value.* == .Locked) {
return value.Locked.setPromise(globalObject, .{ .getJSON = {} });
return value.Locked.setPromise(owner, globalObject, .{ .getJSON = {} });
}
}
@@ -1169,6 +1228,7 @@ pub fn Mixin(comptime Type: type) type {
}
pub fn getArrayBuffer(this: *Type, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue {
const owner = getOwner(callframe.this());
var value: *Body.Value = this.getBodyValue();
if (value.* == .Used) {
@@ -1176,13 +1236,13 @@ pub fn Mixin(comptime Type: type) type {
}
if (value.* == .Locked) {
if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) {
if (value.Locked.action != .none or value.Locked.isDisturbed(owner, globalObject)) {
return handleBodyAlreadyUsed(globalObject);
}
value.toBlobIfPossible();
value.toBlobIfPossible(owner);
if (value.* == .Locked) {
return value.Locked.setPromise(globalObject, .{ .getArrayBuffer = {} });
return value.Locked.setPromise(owner, globalObject, .{ .getArrayBuffer = {} });
}
}
@@ -1193,6 +1253,7 @@ pub fn Mixin(comptime Type: type) type {
}
pub fn getBytes(this: *Type, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue {
const owner = getOwner(callframe.this());
var value: *Body.Value = this.getBodyValue();
if (value.* == .Used) {
@@ -1200,12 +1261,12 @@ pub fn Mixin(comptime Type: type) type {
}
if (value.* == .Locked) {
if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) {
if (value.Locked.action != .none or value.Locked.isDisturbed(owner, globalObject)) {
return handleBodyAlreadyUsed(globalObject);
}
value.toBlobIfPossible();
value.toBlobIfPossible(owner);
if (value.* == .Locked) {
return value.Locked.setPromise(globalObject, .{ .getBytes = {} });
return value.Locked.setPromise(owner, globalObject, .{ .getBytes = {} });
}
}
@@ -1215,6 +1276,7 @@ pub fn Mixin(comptime Type: type) type {
}
pub fn getFormData(this: *Type, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue {
const owner = getOwner(callframe.this());
var value: *Body.Value = this.getBodyValue();
if (value.* == .Used) {
@@ -1222,10 +1284,10 @@ pub fn Mixin(comptime Type: type) type {
}
if (value.* == .Locked) {
if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) {
if (value.Locked.action != .none or value.Locked.isDisturbed(owner, globalObject)) {
return handleBodyAlreadyUsed(globalObject);
}
value.toBlobIfPossible();
value.toBlobIfPossible(owner);
}
var encoder = (try this.getFormDataEncoding()) orelse {
@@ -1234,7 +1296,7 @@ pub fn Mixin(comptime Type: type) type {
};
if (value.* == .Locked) {
return value.Locked.setPromise(globalObject, .{ .getFormData = encoder });
return value.Locked.setPromise(owner, globalObject, .{ .getFormData = encoder });
}
var blob: AnyBlob = value.useAsAnyBlob();
@@ -1266,6 +1328,7 @@ pub fn Mixin(comptime Type: type) type {
}
pub fn getBlobWithThisValue(this: *Type, globalObject: *jsc.JSGlobalObject, this_value: JSValue) bun.JSError!jsc.JSValue {
const owner = if (this_value != .zero) getOwner(this_value) else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} };
var value: *Body.Value = this.getBodyValue();
if (value.* == .Used) {
@@ -1273,17 +1336,14 @@ pub fn Mixin(comptime Type: type) type {
}
if (value.* == .Locked) {
if (value.Locked.action != .none or
((this_value != .zero and value.Locked.isDisturbed(Type, globalObject, this_value)) or
(this_value == .zero and value.Locked.readable.isDisturbed(globalObject))))
{
if (value.Locked.action != .none or value.Locked.isDisturbed(owner, globalObject)) {
return handleBodyAlreadyUsed(globalObject);
}
value.toBlobIfPossible();
value.toBlobIfPossible(owner);
if (value.* == .Locked) {
return value.Locked.setPromise(globalObject, .{ .getBlob = {} });
return value.Locked.setPromise(owner, globalObject, .{ .getBlob = {} });
}
}
@@ -1373,7 +1433,7 @@ pub const ValueBufferer = struct {
}
pub fn run(sink: *@This(), value: *jsc.WebCore.Body.Value) !void {
value.toBlobIfPossible();
value.toBlobIfPossible(.{ .empty = {} });
switch (value.*) {
.Used => {
@@ -1566,15 +1626,16 @@ pub const ValueBufferer = struct {
fn bufferLockedBodyValue(sink: *@This(), value: *jsc.WebCore.Body.Value) !void {
assert(value.* == .Locked);
const locked = &value.Locked;
if (locked.readable.get(sink.global)) |stream| {
// keep the stream alive until we're done with it
sink.readable_stream_ref = locked.readable;
value.* = .{ .Used = {} };
if (locked.readable.get(.{ .empty = {} }, sink.global)) |stream| {
if (stream.isLocked(sink.global)) {
return error.StreamAlreadyUsed;
}
// keep the stream alive until we're done with it
sink.readable_stream_ref = .init(stream, sink.global);
value.deinit();
value.* = .{ .Used = {} };
switch (stream.ptr) {
.Invalid => {
return error.InvalidStream;
@@ -1613,7 +1674,7 @@ pub const ValueBufferer = struct {
if (locked.onReceiveValue != null or locked.task != null) {
// someone else is waiting for the stream or waiting for `onStartStreaming`
const readable = try value.toReadableStream(sink.global);
const readable = try value.toReadableStream(.empty, sink.global);
readable.ensureStillAlive();
readable.protect();
return try sink.bufferLockedBodyValue(value);
@@ -1632,7 +1693,7 @@ pub const ValueBufferer = struct {
return;
},
else => {
value.toBlobIfPossible();
value.toBlobIfPossible(.{ .empty = {} });
var input = value.useAsAnyBlobAllowNonUTF8String();
const bytes = input.slice();
log("onReceiveValue {}", .{bytes.len});

View File

@@ -3,6 +3,174 @@ const ReadableStream = @This();
value: JSValue,
ptr: Source,
pub const Ref = union(Type) {
empty: void,
strong: Strong,
Response: void,
Request: void,
pub const Owner = union(Type) {
empty: void,
strong: void,
Response: JSValue,
Request: JSValue,
};
pub const Type = enum {
empty,
strong,
Response,
Request,
};
pub fn get(this: *const Ref, owner: Owner, global: *jsc.JSGlobalObject) ?ReadableStream {
switch (this.*) {
.strong => |*strong| return strong.get(global),
.Response => {
if (owner == .Response) {
if (owner.Response.as(jsc.WebCore.Response)) |_| {
if (Response.js.gc.body.get(owner.Response)) |body_value| {
return ReadableStream.fromJS(body_value, global) catch null;
}
}
}
},
.Request => {
if (owner == .Request) {
if (owner.Request.as(jsc.WebCore.Request)) |_| {
if (Request.js.gc.body.get(owner.Request)) |body_value| {
return ReadableStream.fromJS(body_value, global) catch null;
}
}
}
},
.empty => {},
}
return null;
}
pub fn isDisturbed(this: *const Ref, owner: Owner, global: *jsc.JSGlobalObject) bool {
const stream = get(this, owner, global) orelse return false;
return stream.isDisturbed(global);
}
pub fn setValue(this: *Ref, owner: Owner, stream_jsvalue: jsc.JSValue, global: *jsc.JSGlobalObject) void {
switch (owner) {
.Response => |jsvalue| {
if (jsvalue != .zero) {
Response.js.gc.body.set(jsvalue, global, stream_jsvalue);
this.deinit();
this.* = .Response;
} else {
this.deinit();
this.* = .empty;
}
},
.Request => |jsvalue| {
if (jsvalue != .zero) {
Request.js.gc.body.set(jsvalue, global, stream_jsvalue);
this.deinit();
this.* = .Request;
} else {
this.deinit();
this.* = .empty;
}
},
.strong => {
this.deinit();
this.* = .{ .strong = .{ .held = .create(stream_jsvalue, global) } };
},
.empty => {},
}
}
pub fn set(this: *Ref, owner: Owner, stream: ReadableStream, global: *jsc.JSGlobalObject) void {
this.setValue(owner, stream.value, global);
}
pub fn tee(this: *Ref, owner: Owner, global: *jsc.JSGlobalObject, readable_stream_value: ?*[2]jsc.JSValue) bun.JSError!?struct { ReadableStream, ReadableStream } {
const stream = get(this, owner, global) orelse return null;
const result = try stream.tee(global) orelse return null;
if (readable_stream_value) |value| {
value.* = .{ result.@"0".value, result.@"1".value };
}
// Always update the original Ref to point to the first tee'd stream
this.set(owner, result.@"0", global);
return result;
}
pub fn has(this: *const Ref, owner: Owner, global: *jsc.JSGlobalObject) bool {
_ = get(this, owner, global) orelse return false;
return true;
}
pub fn upgrade(this: *Ref, current: *const ReadableStream, global: *jsc.JSGlobalObject) void {
if (this.* == .strong) {
this.strong.held.set(global, current.value);
return;
}
this.* = .{ .strong = .init(current.*, global) };
}
pub fn init(owner: Owner, global: *jsc.JSGlobalObject) Ref {
_ = global;
switch (owner) {
.Response => {
return .Response;
},
.Request => {
return .Request;
},
.strong, .empty => {
// Strong and empty don't have an owner, so we return empty
// The caller should use .set() to populate with a stream
return .empty;
},
}
}
pub fn deinit(this: *Ref) void {
switch (this.*) {
.strong => |*strong| {
strong.deinit();
},
.Response => {},
.Request => {},
.empty => {},
}
this.* = .empty;
}
pub fn abort(this: *Ref, owner: Owner, global: *jsc.JSGlobalObject) bool {
if (this.get(owner, global)) |value| {
value.abort(global);
this.deinit();
return true;
}
return false;
}
pub fn toAnyBlob(this: *Ref, owner: Owner, global: *jsc.JSGlobalObject) ?Blob.Any {
var value = get(this, owner, global) orelse return null;
if (value.toAnyBlob(global)) |blob| {
this.deinit();
return blob;
}
return null;
}
pub fn done(this: *Ref, owner: Owner, globalObject: *jsc.JSGlobalObject) void {
const stream = this.get(owner, globalObject) orelse return;
stream.done(globalObject);
this.deinit();
}
};
pub const Strong = struct {
held: jsc.Strong.Optional = .empty,
@@ -842,6 +1010,9 @@ const jsc = bun.jsc;
const JSGlobalObject = jsc.JSGlobalObject;
const JSValue = jsc.JSValue;
const Request = jsc.WebCore.Request;
const Response = jsc.WebCore.Response;
const webcore = bun.webcore;
const Blob = webcore.Blob;
const streams = webcore.streams;

View File

@@ -15,6 +15,7 @@ weak_ptr_data: WeakRef.Data = .empty,
// We must report a consistent value for this
reported_estimated_size: usize = 0,
internal_event_callback: InternalJSEventCallback = .{},
this_jsvalue: jsc.JSRef = .empty(),
pub const js = jsc.Codegen.JSRequest;
// NOTE: toJS is overridden
@@ -68,8 +69,8 @@ pub export fn Request__setTimeout(this: *Request, seconds: jsc.JSValue, globalTh
this.setTimeout(seconds.to(c_uint));
}
pub export fn Request__clone(this: *Request, globalThis: *jsc.JSGlobalObject) ?*Request {
return this.clone(bun.default_allocator, globalThis) catch null;
pub export fn Request__clone(this: *Request, globalThis: *jsc.JSGlobalObject, this_value: jsc.JSValue, tee: ?*[2]jsc.JSValue) ?*Request {
return this.clone(bun.default_allocator, globalThis, this_value, tee) catch null;
}
comptime {
@@ -175,7 +176,12 @@ pub export fn Bun__JSRequest__calculateEstimatedByteSize(this: *Request) void {
pub fn toJS(this: *Request, globalObject: *JSGlobalObject) JSValue {
this.calculateEstimatedByteSize();
return js.toJSUnchecked(globalObject, this);
const value = js.toJSUnchecked(globalObject, this);
if (value != .zero) {
this.this_jsvalue.finalize();
this.this_jsvalue = .initWeak(value);
}
return value;
}
extern "C" fn Bun__JSRequest__createForBake(globalObject: *jsc.JSGlobalObject, requestPtr: *Request) callconv(jsc.conv) jsc.JSValue;
@@ -199,7 +205,7 @@ pub fn writeFormat(this: *Request, this_value: JSValue, comptime Formatter: type
.zero => "Request",
else => "BunRequest",
};
try writer.print("{s} ({}) {{\n", .{ class_label, bun.fmt.size(this.body.value.size(), .{}) });
try writer.print("{s} ({}) {{\n", .{ class_label, bun.fmt.size(this.body.value.size(.empty), .{}) });
{
formatter.indent += 1;
defer formatter.indent -|= 1;
@@ -238,7 +244,7 @@ pub fn writeFormat(this: *Request, this_value: JSValue, comptime Formatter: type
} else if (this.body.value == .InternalBlob or this.body.value == .WTFStringImpl) {
try writer.writeAll("\n");
try formatter.writeIndent(Writer, writer);
const size = this.body.value.size();
const size = this.body.value.size(.empty);
if (size == 0) {
var empty = Blob.initEmpty(undefined);
try empty.writeFormat(Formatter, formatter, writer, enable_ansi_colors);
@@ -246,7 +252,7 @@ pub fn writeFormat(this: *Request, this_value: JSValue, comptime Formatter: type
try Blob.writeFormatForSize(false, size, writer, enable_ansi_colors);
}
} else if (this.body.value == .Locked) {
if (this.body.value.Locked.readable.get(this.body.value.Locked.global)) |stream| {
if (this.body.value.Locked.readable.get(.strong, this.body.value.Locked.global)) |stream| {
try writer.writeAll("\n");
try formatter.writeIndent(Writer, writer);
try formatter.printAs(.Object, Writer, writer, stream.value, stream.value.jsType(), enable_ansi_colors);
@@ -352,6 +358,7 @@ pub fn finalizeWithoutDeinit(this: *Request) void {
}
pub fn finalize(this: *Request) void {
this.this_jsvalue.finalize();
this.finalizeWithoutDeinit();
_ = this.body.unref();
if (this.weak_ptr_data.onFinalize()) {
@@ -522,7 +529,7 @@ const Fields = enum {
url,
};
pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSValue) bun.JSError!Request {
pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSValue, readable_stream_tee: ?*[2]jsc.JSValue) bun.JSError!Request {
var success = false;
const vm = globalThis.bunVM();
const body = try vm.initRequestBodyValue(.{ .Null = {} });
@@ -582,7 +589,7 @@ pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSV
if (value_type == .DOMWrapper) {
if (value.asDirect(Request)) |request| {
if (values_to_try.len == 1) {
try request.cloneInto(&req, bun.default_allocator, globalThis, fields.contains(.url));
try request.cloneInto(&req, bun.default_allocator, globalThis, fields.contains(.url), value, readable_stream_tee);
success = true;
return req;
}
@@ -610,7 +617,7 @@ pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSV
switch (request.body.value) {
.Null, .Empty, .Used => {},
else => {
req.body.value = try request.body.value.clone(globalThis);
req.body.value = try request.body.value.clone(.{ .Request = value }, globalThis, readable_stream_tee);
fields.insert(.body);
},
}
@@ -641,7 +648,11 @@ pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSV
switch (response.body.value) {
.Null, .Empty, .Used => {},
else => {
req.body.value = try response.body.value.clone(globalThis);
req.body.value = try response.body.value.clone(.empty, globalThis, readable_stream_tee);
if (readable_stream_tee) |tee_value| {
Response.js.gc.body.set(value, globalThis, tee_value[0]);
}
fields.insert(.body);
},
}
@@ -654,7 +665,7 @@ pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSV
if (!fields.contains(.body)) {
if (try value.fastGet(globalThis, .body)) |body_| {
fields.insert(.body);
req.body.value = try Body.Value.fromJS(globalThis, body_);
req.body.value = try Body.Value.fromJSWithReadableStreamValue(globalThis, body_, if (readable_stream_tee) |tee| &tee[1] else null);
}
if (globalThis.hasException()) return error.JSError;
@@ -775,12 +786,28 @@ pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSV
return req;
}
pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!*Request {
pub fn constructor(
globalThis: *jsc.JSGlobalObject,
callframe: *jsc.CallFrame,
thisValue: JSValue,
) bun.JSError!*Request {
const arguments_ = callframe.arguments_old(2);
const arguments = arguments_.ptr[0..arguments_.len];
var readable_stream_tee: [2]JSValue = .{ .zero, .zero };
const request = try constructInto(globalThis, arguments);
return Request.new(request);
const request = try constructInto(globalThis, arguments, &readable_stream_tee);
const result = Request.new(request);
// Initialize this_jsvalue for GC cache support
if (thisValue != .zero) {
result.this_jsvalue = .initWeak(thisValue);
// Store tee'd stream in GC cache and update Ref if body is a stream
if (readable_stream_tee[1] != .zero and result.body.value == .Locked) {
result.body.value.Locked.readable.setValue(.{ .Request = thisValue }, readable_stream_tee[1], globalThis);
}
}
return result;
}
pub fn getBodyValue(
@@ -795,21 +822,16 @@ pub fn doClone(
callframe: *jsc.CallFrame,
) bun.JSError!jsc.JSValue {
const this_value = callframe.this();
const cloned = try this.clone(bun.default_allocator, globalThis);
var readable_stream_tee: [2]JSValue = .{ .zero, .zero };
const cloned = try this.clone(bun.default_allocator, globalThis, this_value, &readable_stream_tee);
const js_wrapper = cloned.toJS(globalThis);
if (js_wrapper != .zero) {
if (cloned.body.value == .Locked) {
if (cloned.body.value.Locked.readable.get(globalThis)) |readable| {
// If we are teed, then we need to update the cached .body
// value to point to the new readable stream
// We must do this on both the original and cloned request
// but especially the original request since it will have a stale .body value now.
js.bodySetCached(js_wrapper, globalThis, readable.value);
if (this.body.value.Locked.readable.get(globalThis)) |other_readable| {
js.bodySetCached(this_value, globalThis, other_readable.value);
}
}
if (this.body.value == .Locked and readable_stream_tee[0] != .zero) {
js.gc.body.set(this_value, globalThis, readable_stream_tee[0]);
}
if (cloned.body.value == .Locked and readable_stream_tee[1] != .zero) {
js.gc.body.set(js_wrapper, globalThis, readable_stream_tee[1]);
}
}
@@ -852,9 +874,13 @@ pub fn ensureFetchHeaders(
} else {
// we don't have a request context, so we need to create an empty headers object
this._headers = FetchHeaders.createEmpty();
const owner: jsc.WebCore.ReadableStream.Ref.Owner = if (this.this_jsvalue.tryGet()) |js_value|
.{ .Request = js_value }
else
.empty;
const content_type = switch (this.body.value) {
.Blob => |blob| blob.content_type,
.Locked => |locked| if (locked.readable.get(globalThis)) |*readable| switch (readable.ptr) {
.Locked => |locked| if (locked.readable.get(owner, globalThis)) |*readable| switch (readable.ptr) {
.Blob => |blob| blob.content_type,
else => null,
} else null,
@@ -927,11 +953,13 @@ pub fn cloneInto(
allocator: std.mem.Allocator,
globalThis: *JSGlobalObject,
preserve_url: bool,
this_value: jsc.JSValue,
readable_stream_tee: ?*[2]jsc.JSValue,
) bun.JSError!void {
_ = allocator;
this.ensureURL() catch {};
const vm = globalThis.bunVM();
var body_ = try this.body.value.clone(globalThis);
var body_ = try this.body.value.clone(.{ .Request = this_value }, globalThis, readable_stream_tee);
errdefer body_.deinit();
const body = try vm.initRequestBodyValue(body_);
const url = if (preserve_url) req.url else this.url.dupeRef();
@@ -952,10 +980,10 @@ pub fn cloneInto(
}
}
pub fn clone(this: *Request, allocator: std.mem.Allocator, globalThis: *JSGlobalObject) bun.JSError!*Request {
pub fn clone(this: *Request, allocator: std.mem.Allocator, globalThis: *JSGlobalObject, this_value: jsc.JSValue, readable_stream_tee: ?*[2]jsc.JSValue) bun.JSError!*Request {
const req = Request.new(undefined);
errdefer bun.destroy(req);
try this.cloneInto(req, allocator, globalThis, false);
try this.cloneInto(req, allocator, globalThis, false, this_value, readable_stream_tee);
return req;
}

View File

@@ -21,6 +21,8 @@ ref_count: u32 = 1,
// We must report a consistent value for this
reported_estimated_size: usize = 0,
this_jsvalue: jsc.JSRef = .empty(),
pub const getText = ResponseMixin.getText;
pub const getBody = ResponseMixin.getBody;
pub const getBytes = ResponseMixin.getBytes;
@@ -51,7 +53,9 @@ pub fn calculateEstimatedByteSize(this: *Response) void {
pub fn toJS(this: *Response, globalObject: *JSGlobalObject) JSValue {
this.calculateEstimatedByteSize();
return js.toJSUnchecked(globalObject, this);
const value = js.toJSUnchecked(globalObject, this);
this.this_jsvalue = .initWeak(value);
return value;
}
pub fn getBodyValue(
@@ -264,22 +268,23 @@ pub fn doClone(
callframe: *jsc.CallFrame,
) bun.JSError!JSValue {
const this_value = callframe.this();
const cloned = try this.clone(globalThis);
var readable_stream_tee: [2]jsc.JSValue = .{ .zero, .zero };
const cloned = try this.clone(globalThis, this_value, &readable_stream_tee);
const js_wrapper = Response.makeMaybePooled(globalThis, cloned);
if (js_wrapper != .zero) {
if (cloned.body.value == .Locked) {
if (cloned.body.value.Locked.readable.get(globalThis)) |readable| {
// If we are teed, then we need to update the cached .body
// value to point to the new readable stream
// We must do this on both the original and cloned response
// but especially the original response since it will have a stale .body value now.
js.bodySetCached(js_wrapper, globalThis, readable.value);
if (this.body.value.Locked.readable.get(globalThis)) |other_readable| {
js.bodySetCached(this_value, globalThis, other_readable.value);
}
}
// If we are teed, then we need to update the cached .body
// value to point to the new readable stream
// We must do this on both the original and cloned response
// but especially the original response since it will have a stale .body value now.
if (readable_stream_tee[0] != .zero) {
js.gc.body.set(this_value, globalThis, readable_stream_tee[0]);
}
if (readable_stream_tee[1] != .zero) {
js.gc.body.set(js_wrapper, globalThis, readable_stream_tee[1]);
}
}
@@ -293,8 +298,10 @@ pub fn makeMaybePooled(globalObject: *jsc.JSGlobalObject, ptr: *Response) JSValu
pub fn cloneValue(
this: *Response,
globalThis: *JSGlobalObject,
this_value: jsc.JSValue,
readable_stream_tee: ?*[2]jsc.JSValue,
) bun.JSError!Response {
var body = try this.body.clone(globalThis);
var body = try this.body.clone(.{ .Response = this_value }, globalThis, readable_stream_tee);
errdefer body.deinit(bun.default_allocator);
var init = try this.init.clone(globalThis);
errdefer init.deinit(bun.default_allocator);
@@ -306,8 +313,8 @@ pub fn cloneValue(
};
}
pub fn clone(this: *Response, globalThis: *JSGlobalObject) bun.JSError!*Response {
return bun.new(Response, try this.cloneValue(globalThis));
pub fn clone(this: *Response, globalThis: *JSGlobalObject, this_value: jsc.JSValue, readable_stream_tee: ?*[2]jsc.JSValue) bun.JSError!*Response {
return bun.new(Response, try this.cloneValue(globalThis, this_value, readable_stream_tee));
}
pub fn getStatus(
@@ -322,7 +329,7 @@ fn destroy(this: *Response) void {
this.init.deinit(bun.default_allocator);
this.body.deinit(bun.default_allocator);
this.url.deref();
this.this_jsvalue.deinit();
bun.destroy(this);
}
@@ -342,6 +349,7 @@ pub fn unref(this: *Response) void {
pub fn finalize(
this: *Response,
) callconv(.C) void {
this.this_jsvalue.finalize();
this.unref();
}
@@ -532,7 +540,7 @@ pub fn constructError(
return response.toJS(globalThis);
}
pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!*Response {
pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame, thisValue: JSValue) bun.JSError!*Response {
var arguments = callframe.argumentsAsArray(2);
if (!arguments[0].isUndefinedOrNull() and arguments[0].isObject()) {
@@ -560,11 +568,16 @@ pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) b
return s3.throwSignError(sign_err, globalThis);
};
defer result.deinit();
response.init.headers = try response.getOrCreateHeaders(globalThis);
const headers = try response.getOrCreateHeaders(globalThis);
errdefer headers.deref();
try headers.put(.Location, result.url, globalThis);
response.redirected = true;
var headers_ref = response.init.headers.?;
try headers_ref.put(.Location, result.url, globalThis);
return bun.new(Response, response);
response.init.headers = headers;
var allocated_response = bun.new(Response, response);
if (thisValue != .zero) {
allocated_response.this_jsvalue = .initWeak(thisValue);
}
return allocated_response;
}
}
}
@@ -589,13 +602,15 @@ pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) b
return error.JSError;
}
var readable_stream_value: JSValue = .zero;
var body: Body = brk: {
if (arguments[0].isUndefinedOrNull()) {
break :brk Body{
.value = Body.Value{ .Null = {} },
};
}
break :brk try Body.extract(globalThis, arguments[0]);
break :brk try Body.extract(globalThis, arguments[0], &readable_stream_value);
};
errdefer body.deinit(bun.default_allocator);
@@ -603,21 +618,26 @@ pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) b
return error.JSError;
}
if (body.value == .Blob and
init.headers != null and
body.value.Blob.content_type.len > 0 and
!init.headers.?.fastHas(.ContentType))
{
try init.headers.?.put(.ContentType, body.value.Blob.content_type, globalThis);
}
var response = bun.new(Response, Response{
.body = body,
.init = init,
});
if (response.body.value == .Blob and
response.init.headers != null and
response.body.value.Blob.content_type.len > 0 and
!response.init.headers.?.fastHas(.ContentType))
{
try response.init.headers.?.put(.ContentType, response.body.value.Blob.content_type, globalThis);
}
response.calculateEstimatedByteSize();
if (thisValue != .zero) {
response.this_jsvalue = .initWeak(thisValue);
if (readable_stream_value != .zero) {
response.body.value.Locked.readable.setValue(.{ .Response = thisValue }, readable_stream_value, response.body.value.Locked.global);
}
}
return response;
}

View File

@@ -184,17 +184,23 @@ pub const FetchTasklet = struct {
pub fn fromJS(globalThis: *JSGlobalObject, value: JSValue) bun.JSError!HTTPRequestBody {
var body_value = try Body.Value.fromJS(globalThis, value);
if (body_value == .Used or (body_value == .Locked and (body_value.Locked.action != .none or body_value.Locked.isDisturbed2(globalThis)))) {
defer body_value.deinit();
if (body_value == .Used or (body_value == .Locked and (body_value.Locked.action != .none or body_value.Locked.isDisturbed(.empty, globalThis)))) {
return globalThis.ERR(.BODY_ALREADY_USED, "body already used", .{}).throw();
}
if (body_value == .Locked) {
if (body_value.Locked.readable.has()) {
if (body_value.Locked.readable == .strong) {
// just grab the ref
return FetchTasklet.HTTPRequestBody{ .ReadableStream = body_value.Locked.readable };
defer body_value.Locked.readable = .empty;
return FetchTasklet.HTTPRequestBody{ .ReadableStream = body_value.Locked.readable.strong };
}
const readable = try body_value.toReadableStream(globalThis);
if (!readable.isEmptyOrUndefinedOrNull() and body_value == .Locked and body_value.Locked.readable.has()) {
return FetchTasklet.HTTPRequestBody{ .ReadableStream = body_value.Locked.readable };
const readable = try body_value.toReadableStream(.empty, globalThis);
if (!readable.isEmptyOrUndefinedOrNull() and body_value == .Locked and body_value.Locked.readable.has(.empty, globalThis)) {
if (body_value.Locked.readable.get(.empty, globalThis)) |stream| {
return FetchTasklet.HTTPRequestBody{ .ReadableStream = .init(stream, globalThis) };
}
}
}
return FetchTasklet.HTTPRequestBody{ .AnyBlob = body_value.useAsAnyBlob() };
@@ -447,7 +453,8 @@ pub const FetchTasklet = struct {
if (this.getCurrentResponse()) |response| {
var body = &response.body;
if (body.value == .Locked) {
if (body.value.Locked.readable.get(globalThis)) |readable| {
const response_js = response.this_jsvalue.tryGet() orelse .zero;
if (body.value.Locked.readable.get(if (response_js != .zero) .{ .Response = response_js } else .{ .empty = {} }, globalThis)) |readable| {
if (readable.ptr == .Bytes) {
readable.ptr.Bytes.size_hint = this.getSizeHint();
@@ -464,7 +471,7 @@ pub const FetchTasklet = struct {
);
} else {
var prev = body.value.Locked.readable;
body.value.Locked.readable = .{};
body.value.Locked.readable = .{ .empty = {} };
readable.value.ensureStillAlive();
prev.deinit();
readable.value.ensureStillAlive();
@@ -2103,17 +2110,17 @@ pub fn Bun__fetch_(
}
if (request) |req| {
if (req.body.value == .Used or (req.body.value == .Locked and (req.body.value.Locked.action != .none or req.body.value.Locked.isDisturbed(Request, globalThis, first_arg)))) {
if (req.body.value == .Used or (req.body.value == .Locked and (req.body.value.Locked.action != .none or req.body.value.Locked.isDisturbed(.{ .Request = first_arg }, globalThis)))) {
return globalThis.ERR(.BODY_ALREADY_USED, "Request body already used", .{}).throw();
}
if (req.body.value == .Locked) {
if (req.body.value.Locked.readable.has()) {
break :extract_body FetchTasklet.HTTPRequestBody{ .ReadableStream = jsc.WebCore.ReadableStream.Strong.init(req.body.value.Locked.readable.get(globalThis).?, globalThis) };
if (req.body.value.Locked.readable.has(.{ .Request = first_arg }, globalThis)) {
break :extract_body FetchTasklet.HTTPRequestBody{ .ReadableStream = jsc.WebCore.ReadableStream.Strong.init(req.body.value.Locked.readable.get(.{ .Request = first_arg }, globalThis).?, globalThis) };
}
const readable = try req.body.value.toReadableStream(globalThis);
if (!readable.isEmptyOrUndefinedOrNull() and req.body.value == .Locked and req.body.value.Locked.readable.has()) {
break :extract_body FetchTasklet.HTTPRequestBody{ .ReadableStream = jsc.WebCore.ReadableStream.Strong.init(req.body.value.Locked.readable.get(globalThis).?, globalThis) };
const readable = try req.body.value.toReadableStream(.{ .Request = first_arg }, globalThis);
if (!readable.isEmptyOrUndefinedOrNull() and req.body.value == .Locked and req.body.value.Locked.readable.has(.{ .Request = first_arg }, globalThis)) {
break :extract_body FetchTasklet.HTTPRequestBody{ .ReadableStream = jsc.WebCore.ReadableStream.Strong.init(req.body.value.Locked.readable.get(.{ .Request = first_arg }, globalThis).?, globalThis) };
}
}
@@ -2123,7 +2130,7 @@ pub fn Bun__fetch_(
if (request_init_object) |req| {
if (try req.fastGet(globalThis, .body)) |body__| {
if (!body__.isUndefined()) {
break :extract_body try FetchTasklet.HTTPRequestBody.fromJS(ctx, body__);
break :extract_body try FetchTasklet.HTTPRequestBody.fromJS(globalThis, body__);
}
}
}

View File

@@ -4,6 +4,7 @@ export default [
define({
name: "Request",
construct: true,
constructNeedsThis: true,
finalize: true,
final: false,
klass: {},
@@ -68,6 +69,7 @@ export default [
define({
name: "Response",
construct: true,
constructNeedsThis: true,
finalize: true,
final: false,
JSType: "0b11101110",

View File

@@ -1710,7 +1710,7 @@ size_t ${name}::memoryCost(void* ptr) {
size_t ${name}::estimatedSize(JSC::JSCell* cell, JSC::VM& vm) {
auto* thisObject = jsCast<${name}*>(cell);
auto* wrapped = thisObject->wrapped();
return Base::estimatedSize(cell, vm) + ${name}::memoryCost(wrapped);
return Base::estimatedSize(cell, vm) + (wrapped ? ${name}::memoryCost(wrapped) : 0);
}
void ${name}::destroy(JSCell* cell)

View File

@@ -424,7 +424,8 @@ size_t ${controller}::memoryCost(void* sinkPtr) {
}
size_t ${controller}::estimatedSize(JSCell* cell, JSC::VM& vm) {
return Base::estimatedSize(cell, vm) + ${controller}::memoryCost(jsCast<${controller}*>(cell)->wrapped());
auto* wrapped = jsCast<${controller}*>(cell)->wrapped();
return Base::estimatedSize(cell, vm) + (wrapped ? ${controller}::memoryCost(wrapped) : 0);
}
JSC_DECLARE_HOST_FUNCTION(${controller}__close);

View File

@@ -576,7 +576,8 @@ fn initRedirections(this: *Cmd, spawn_args: *Subprocess.SpawnArgs) bun.JSError!?
_ = rstream;
@panic("TODO SHELL READABLE STREAM");
} else if (this.base.interpreter.jsobjs[val.idx].as(jsc.WebCore.Response)) |req| {
req.getBodyValue().toBlobIfPossible();
const owner = if (req.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} };
req.getBodyValue().toBlobIfPossible(owner);
if (this.node.redirect.stdin) {
try spawn_args.stdio[stdin_no].extractBlob(global, req.getBodyValue().useAsAnyBlob(), stdin_no);
}