From ccda3dbff9b144dbbc6c9bf8afb28cbe98f357be Mon Sep 17 00:00:00 2001 From: Ciro Spaciari Date: Mon, 10 Nov 2025 16:18:07 -0800 Subject: [PATCH] more --- src/bun.js/webcore/fetch/FetchTasklet.zig | 2 +- src/bun.js/webcore/fetch/tasklet/Request.zig | 27 +++++++++---------- src/bun.js/webcore/fetch/tasklet/Response.zig | 8 ++++++ .../webcore/fetch/tasklet/SharedData.zig | 4 ++- 4 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/bun.js/webcore/fetch/FetchTasklet.zig b/src/bun.js/webcore/fetch/FetchTasklet.zig index 62391f8bc5..d507944868 100644 --- a/src/bun.js/webcore/fetch/FetchTasklet.zig +++ b/src/bun.js/webcore/fetch/FetchTasklet.zig @@ -437,7 +437,7 @@ pub fn get( fetch_tasklet.is_waiting_request_stream_start = isStream; if (isStream) { const buffer = http.ThreadSafeStreamBuffer.new(.{}); - buffer.setDrainCallback(FetchTasklet, FetchTasklet.onWriteRequestDataDrain, fetch_tasklet); + buffer.setDrainCallback(FetchTaskletSharedData, FetchTaskletSharedData.resumeRequestDataStream, &fetch_tasklet.shared); fetch_tasklet.request_body_streaming_buffer = buffer; fetch_tasklet.http.?.request_body = .{ .stream = .{ diff --git a/src/bun.js/webcore/fetch/tasklet/Request.zig b/src/bun.js/webcore/fetch/tasklet/Request.zig index 956819c70e..43925e5848 100644 --- a/src/bun.js/webcore/fetch/tasklet/Request.zig +++ b/src/bun.js/webcore/fetch/tasklet/Request.zig @@ -1,6 +1,5 @@ const FetchTaskletRequest = @This(); request_body: ?HTTPRequestBody = null, -request_body_streaming_buffer: ?*http.ThreadSafeStreamBuffer = null, request_headers: Headers = Headers{ .allocator = undefined }, sink: ?*ResumableSink = null, metadata: ?http.HTTPResponseMetadata = null, @@ -29,16 +28,15 @@ fn parent(this: *FetchTaskletRequest) *FetchTasklet { pub fn startRequestStream(this: *FetchTaskletRequest) void { this.is_waiting_request_stream_start = false; bun.assert(this.request_body == .ReadableStream); + const tasklet = this.parent(); if (this.request_body.ReadableStream.get(this.global_this)) |stream| { - if (this.signal) |signal| { - if (signal.aborted()) { - stream.abort(this.global_this); - return; - } + const globalThis = tasklet.global_this; + if (tasklet.isAborted()) { + stream.abort(globalThis); + return; } - const globalThis = this.global_this; - this.ref(); // lets only unref when sink is done + tasklet.ref(); // lets only unref when sink is done // +1 because the task refs the sink const sink = ResumableSink.initExactRefs(globalThis, stream, this, 2); this.sink = sink; @@ -46,12 +44,11 @@ pub fn startRequestStream(this: *FetchTaskletRequest) void { } pub fn writeRequestData(this: *FetchTaskletRequest, data: []const u8) ResumableSinkBackpressure { log("writeRequestData {}", .{data.len}); - if (this.signal) |signal| { - if (signal.aborted()) { - return .done; - } + const tasklet = this.parent(); + if (tasklet.isAborted()) { + return .done; } - const thread_safe_stream_buffer = this.request_body_streaming_buffer orelse return .done; + const thread_safe_stream_buffer = tasklet.shared.request_body_streaming_buffer orelse return .done; const stream_buffer = thread_safe_stream_buffer.acquire(); defer thread_safe_stream_buffer.release(); const highWaterMark = if (this.sink) |sink| sink.highWaterMark else 16384; @@ -59,7 +56,7 @@ pub fn writeRequestData(this: *FetchTaskletRequest, data: []const u8) ResumableS var needs_schedule = false; defer if (needs_schedule) { // wakeup the http thread to write the data - http.http_thread.scheduleRequestWrite(this.http.?, .data); + http.http_thread.scheduleRequestWrite(tasklet.http.?, .data); }; // dont have backpressure so we will schedule the data to be written @@ -115,7 +112,7 @@ pub fn writeEndRequest(this: *FetchTaskletRequest, err: ?jsc.JSValue) void { /// This is ALWAYS called from the main thread pub fn resumeRequestDataStream(this: *FetchTaskletRequest) void { - // deref when done because we ref inside onWriteRequestDataDrain + // deref when done because we ref inside SharedData.resumeRequestDataStream const tasklet = this.parent(); defer tasklet.deref(); if (tasklet.isAborted()) { diff --git a/src/bun.js/webcore/fetch/tasklet/Response.zig b/src/bun.js/webcore/fetch/tasklet/Response.zig index e519410a95..bba5f3fd04 100644 --- a/src/bun.js/webcore/fetch/tasklet/Response.zig +++ b/src/bun.js/webcore/fetch/tasklet/Response.zig @@ -528,3 +528,11 @@ const MutableString = bun.MutableString; const jsc = bun.jsc; const http = bun.http; pub const ResumableSink = jsc.WebCore.ResumableFetchSink; +const FetchTasklet = @import("../FetchTasklet.zig").FetchTasklet; +const log = bun.Output.scoped(.FetchTaskletResponse, .visible); +const BoringSSL = bun.BoringSSL.c; +const FetchHeaders = bun.webcore.FetchHeaders; +const Body = jsc.WebCore.Body; +const X509 = @import("../../../api/bun/x509.zig").X509; +const JSValue = jsc.JSValue; +const Blob = jsc.WebCore.Blob; diff --git a/src/bun.js/webcore/fetch/tasklet/SharedData.zig b/src/bun.js/webcore/fetch/tasklet/SharedData.zig index c6331bab8e..5d0529bc05 100644 --- a/src/bun.js/webcore/fetch/tasklet/SharedData.zig +++ b/src/bun.js/webcore/fetch/tasklet/SharedData.zig @@ -5,6 +5,7 @@ ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1), /// buffer being used by AsyncHTTP response_buffer: bun.MutableString = undefined, +request_body_streaming_buffer: ?*bun.http.ThreadSafeStreamBuffer = null, result: bun.http.HTTPClientResult = .{}, signals: bun.http.Signals = .{}, @@ -16,7 +17,7 @@ fn parent(this: *SharedData) *FetchTasklet { } /// This is ALWAYS called from the http thread and we cannot touch the buffer here because is locked -pub fn onWriteRequestDataDrain(this: *SharedData) void { +pub fn resumeRequestDataStream(this: *SharedData) void { // ref until the main thread callback is called const tasklet = this.parent(); tasklet.ref(); @@ -27,3 +28,4 @@ const std = @import("std"); const bun = @import("bun"); const jsc = bun.jsc; const FetchTasklet = @import("../FetchTasklet.zig"); +const ResumableSinkBackpressure = jsc.WebCore.ResumableSinkBackpressure;