mirror of
https://github.com/oven-sh/bun
synced 2026-02-05 00:18:53 +00:00
Compare commits
6 Commits
claude/imp
...
jarred/fet
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6ba9b0a27a | ||
|
|
12dee5c720 | ||
|
|
9e454a43ad | ||
|
|
8cec29c95a | ||
|
|
b941dd6f0b | ||
|
|
f4e0948055 |
@@ -124,6 +124,18 @@ pub const Features = struct {
|
||||
return Formatter{};
|
||||
}
|
||||
|
||||
const JSC = bun.JSC;
|
||||
pub fn toJS(globalThis: *JSC.JSGlobalObject, _: *JSC.CallFrame) JSC.JSValue {
|
||||
const object = JSC.JSValue.createEmptyObjectWithNullPrototype(globalThis);
|
||||
inline for (comptime std.meta.declarations(Features)) |decl| {
|
||||
if (@typeInfo(@TypeOf(@field(Features, decl.name))) == .Int) {
|
||||
object.put(globalThis, decl.name, JSC.JSValue.jsNumber(@field(Features, decl.name)));
|
||||
}
|
||||
}
|
||||
object.put(globalThis, "concurrentDecompressionCount", JSC.JSValue.jsNumber(bun.http.HTTPClientResult.DecompressionTask.count.load(.monotonic)));
|
||||
return object;
|
||||
}
|
||||
|
||||
pub const Formatter = struct {
|
||||
pub fn format(_: Formatter, comptime _: []const u8, _: std.fmt.FormatOptions, writer: anytype) !void {
|
||||
const fields = comptime brk: {
|
||||
@@ -353,3 +365,5 @@ pub const GenerateHeader = struct {
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
pub const createInternalStatsObject = Features.toJS;
|
||||
|
||||
@@ -4105,7 +4105,7 @@ static void populateStackFramePosition(const JSC::StackFrame* stackFrame, BunStr
|
||||
// It is key to not clone this data because source code strings are large.
|
||||
// Usage of toStringView (non-owning) is safe as we ref the provider.
|
||||
provider->ref();
|
||||
ASSERT(*referenced_source_provider == nullptr);
|
||||
// ASSERT(*referenced_source_provider == nullptr);
|
||||
*referenced_source_provider = provider;
|
||||
source_lines[0] = Bun::toStringView(sourceString.substring(lineStart, lineEnd - lineStart));
|
||||
source_line_numbers[0] = location.line();
|
||||
|
||||
@@ -1000,7 +1000,7 @@ pub const Fetch = struct {
|
||||
const success = this.result.isSuccess();
|
||||
const globalThis = this.global_this;
|
||||
// reset the buffer if we are streaming or if we are not waiting for bufferig anymore
|
||||
var buffer_reset = true;
|
||||
var buffer_reset = !this.result.decompression_task.pending;
|
||||
defer {
|
||||
if (buffer_reset) {
|
||||
this.scheduled_response_buffer.reset();
|
||||
@@ -1038,7 +1038,7 @@ pub const Fetch = struct {
|
||||
}
|
||||
|
||||
if (this.readable_stream_ref.get()) |readable| {
|
||||
if (readable.ptr == .Bytes) {
|
||||
if (readable.ptr == .Bytes and !this.result.decompression_task.pending) {
|
||||
readable.ptr.Bytes.size_hint = this.getSizeHint();
|
||||
// body can be marked as used but we still need to pipe the data
|
||||
const scheduled_response_buffer = this.scheduled_response_buffer.list;
|
||||
@@ -1078,7 +1078,7 @@ pub const Fetch = struct {
|
||||
|
||||
if (this.getCurrentResponse()) |response| {
|
||||
var body = &response.body;
|
||||
if (body.value == .Locked) {
|
||||
if (body.value == .Locked and !this.result.decompression_task.pending) {
|
||||
if (body.value.Locked.readable.get()) |readable| {
|
||||
if (readable.ptr == .Bytes) {
|
||||
readable.ptr.Bytes.size_hint = this.getSizeHint();
|
||||
@@ -1149,7 +1149,7 @@ pub const Fetch = struct {
|
||||
log("onProgressUpdate", .{});
|
||||
this.mutex.lock();
|
||||
this.has_schedule_callback.store(false, .monotonic);
|
||||
const is_done = !this.result.has_more;
|
||||
const is_done = this.result.hasCompleteResponseBody();
|
||||
|
||||
const vm = this.javascript_vm;
|
||||
// vm is shutting down we cannot touch JS
|
||||
@@ -1564,7 +1564,7 @@ pub const Fetch = struct {
|
||||
// at this point we always should have metadata
|
||||
const metadata = this.metadata.?;
|
||||
const http_response = metadata.response;
|
||||
this.is_waiting_body = this.result.has_more;
|
||||
this.is_waiting_body = !this.result.hasCompleteResponseBody();
|
||||
return Response{
|
||||
.url = bun.String.createAtomIfPossible(metadata.url),
|
||||
.redirected = this.result.redirected,
|
||||
@@ -1816,10 +1816,84 @@ pub const Fetch = struct {
|
||||
return node;
|
||||
}
|
||||
|
||||
fn onDecompressFromThreadPool(task: *bun.ThreadPool.Task) void {
|
||||
const input_decompression_task: *http.HTTPClientResult.DecompressionTask = @fieldParentPtr("task", task);
|
||||
const client_result: *http.HTTPClientResult = @fieldParentPtr("decompression_task", input_decompression_task);
|
||||
var this: *FetchTasklet = @fieldParentPtr("result", client_result);
|
||||
|
||||
var response_buffer: MutableString = undefined;
|
||||
defer response_buffer.deinit();
|
||||
// Avoid potential data races by moving this to the stack
|
||||
var decompression_task: http.HTTPClientResult.DecompressionTask = brk: {
|
||||
this.mutex.lock();
|
||||
defer this.mutex.unlock();
|
||||
response_buffer = this.response_buffer;
|
||||
this.response_buffer = .{
|
||||
.allocator = bun.default_allocator,
|
||||
.list = .{
|
||||
.items = &.{},
|
||||
.capacity = 0,
|
||||
},
|
||||
};
|
||||
const old = this.result.decompression_task;
|
||||
this.result.decompression_task = .{
|
||||
.compressed_body = MutableString{ .allocator = bun.default_allocator, .list = .{} },
|
||||
// keep it marked as pending so the other thread if it receives an update simultaneously, doesn't think we had an e
|
||||
.pending = true,
|
||||
.encoding = .identity,
|
||||
};
|
||||
break :brk old;
|
||||
};
|
||||
defer decompression_task.compressed_body.deinit();
|
||||
|
||||
var fail: ?anyerror = null;
|
||||
|
||||
// Do not lock the mutex on this thread while decompressing!
|
||||
decompression_task.decompress(&response_buffer) catch |err| {
|
||||
// if decompression fails, we mark it as an error
|
||||
fail = err;
|
||||
};
|
||||
|
||||
this.mutex.lock();
|
||||
const has_only_one_ref = this.ref_count.load(.monotonic) == 1;
|
||||
defer if (!has_only_one_ref) this.mutex.unlock();
|
||||
defer this.deref();
|
||||
|
||||
_ = this.scheduled_response_buffer.write(response_buffer.list.items) catch bun.outOfMemory();
|
||||
|
||||
this.result.fail = fail;
|
||||
this.result.decompression_task.pending = false;
|
||||
|
||||
scheduleMainThreadTask(this);
|
||||
}
|
||||
|
||||
fn scheduleMainThreadTask(task: *FetchTasklet) void {
|
||||
if (task.ref_count.load(.monotonic) == 1) {
|
||||
// If the JS side already rejected the Promise, or it was otherwise already finalized.
|
||||
// We can skip enqueuing and rely on the caller to deinit immediately.
|
||||
log("deinit early due to only one ref", .{});
|
||||
return;
|
||||
}
|
||||
|
||||
if (task.has_schedule_callback.cmpxchgStrong(false, true, .acquire, .monotonic)) |has_schedule_callback| {
|
||||
if (has_schedule_callback) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task, .manual_deinit));
|
||||
}
|
||||
|
||||
pub fn callback(task: *FetchTasklet, async_http: *http.AsyncHTTP, result: http.HTTPClientResult) void {
|
||||
var is_done = !result.has_more;
|
||||
|
||||
task.mutex.lock();
|
||||
defer task.mutex.unlock();
|
||||
const is_done = !result.has_more;
|
||||
const has_only_one_ref = task.ref_count.load(.monotonic) == 1;
|
||||
var should_unlock_mutex = !has_only_one_ref;
|
||||
|
||||
// Prevent a use-after-free of this mutex if
|
||||
defer if (should_unlock_mutex) task.mutex.unlock();
|
||||
|
||||
// we are done with the http client so we can deref our side
|
||||
defer if (is_done) task.deref();
|
||||
|
||||
@@ -1857,6 +1931,16 @@ pub const Fetch = struct {
|
||||
if (task.ignore_data) {
|
||||
task.response_buffer.reset();
|
||||
|
||||
// If we ignored the response body and it is compressed, immediately free it before we start decompressing it
|
||||
if (task.result.decompression_task.pending) {
|
||||
var compressed_body = &task.result.decompression_task.compressed_body;
|
||||
compressed_body.deinit();
|
||||
task.result.decompression_task.pending = false;
|
||||
|
||||
// if we have more data, we should not have had a decompression task.
|
||||
bun.debugAssert(!result.has_more);
|
||||
}
|
||||
|
||||
if (task.scheduled_response_buffer.list.capacity > 0) {
|
||||
task.scheduled_response_buffer.deinit();
|
||||
task.scheduled_response_buffer = .{
|
||||
@@ -1873,19 +1957,26 @@ pub const Fetch = struct {
|
||||
}
|
||||
} else {
|
||||
if (success) {
|
||||
if (task.result.decompression_task.pending) {
|
||||
bun.debugAssert(task.result.decompression_task.compressed_body.list.items.len > 0);
|
||||
task.result.decompression_task.task = .{ .callback = &onDecompressFromThreadPool };
|
||||
is_done = false;
|
||||
should_unlock_mutex = false;
|
||||
// Ensure we don't block the other thread on this mutex.
|
||||
task.mutex.unlock();
|
||||
|
||||
JSC.WorkPool.schedule(&task.result.decompression_task.task);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
_ = task.scheduled_response_buffer.write(task.response_buffer.list.items) catch bun.outOfMemory();
|
||||
}
|
||||
// reset for reuse
|
||||
task.response_buffer.reset();
|
||||
}
|
||||
|
||||
if (task.has_schedule_callback.cmpxchgStrong(false, true, .acquire, .monotonic)) |has_schedule_callback| {
|
||||
if (has_schedule_callback) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task, .manual_deinit));
|
||||
scheduleMainThreadTask(task);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -1711,13 +1711,9 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
|
||||
if (comptime !@hasField(SinkType, "signal"))
|
||||
return;
|
||||
|
||||
const ptr = this.sink.signal.ptr;
|
||||
if (this.sink.signal.isDead())
|
||||
return;
|
||||
this.sink.signal.clear();
|
||||
const value = @as(JSValue, @enumFromInt(@as(JSC.JSValueReprInt, @bitCast(@intFromPtr(ptr)))));
|
||||
value.unprotect();
|
||||
detachPtr(value);
|
||||
}
|
||||
|
||||
pub fn detachPtr(ptr: JSValue) callconv(.C) void {
|
||||
|
||||
@@ -44,6 +44,7 @@ pub const Run = struct {
|
||||
|
||||
pub fn bootStandalone(ctx: Command.Context, entry_path: string, graph: bun.StandaloneModuleGraph) !void {
|
||||
JSC.markBinding(@src());
|
||||
bun.http.allow_multithreaded_decompression = !ctx.runtime_options.smol;
|
||||
bun.JSC.initialize(false);
|
||||
bun.Analytics.Features.standalone_executable += 1;
|
||||
|
||||
@@ -183,7 +184,7 @@ pub const Run = struct {
|
||||
}
|
||||
|
||||
bun.JSC.initialize(ctx.runtime_options.eval.eval_and_print);
|
||||
|
||||
bun.http.allow_multithreaded_decompression = !ctx.runtime_options.smol;
|
||||
js_ast.Expr.Data.Store.create();
|
||||
js_ast.Stmt.Data.Store.create();
|
||||
var arena = try Arena.init();
|
||||
|
||||
@@ -741,6 +741,7 @@ pub const TestCommand = struct {
|
||||
break :brk loader;
|
||||
};
|
||||
bun.JSC.initialize(false);
|
||||
bun.http.allow_multithreaded_decompression = !ctx.runtime_options.smol;
|
||||
HTTPThread.init(&.{});
|
||||
|
||||
var snapshot_file_buf = std.ArrayList(u8).init(ctx.allocator);
|
||||
|
||||
134
src/http.zig
134
src/http.zig
@@ -40,8 +40,8 @@ const uws = bun.uws;
|
||||
pub const MimeType = @import("./http/mime_type.zig");
|
||||
pub const URLPath = @import("./http/url_path.zig");
|
||||
// This becomes Arena.allocator
|
||||
pub var default_allocator: std.mem.Allocator = undefined;
|
||||
var default_arena: Arena = undefined;
|
||||
|
||||
const default_allocator = bun.default_allocator;
|
||||
pub var http_thread: HTTPThread = undefined;
|
||||
const HiveArray = @import("./hive_array.zig").HiveArray;
|
||||
const Batch = bun.ThreadPool.Batch;
|
||||
@@ -53,6 +53,7 @@ var socket_async_http_abort_tracker = std.AutoArrayHashMap(u32, uws.InternalSock
|
||||
var async_http_id: std.atomic.Value(u32) = std.atomic.Value(u32).init(0);
|
||||
const MAX_REDIRECT_URL_LENGTH = 128 * 1024;
|
||||
var custom_ssl_context_map = std.AutoArrayHashMap(*SSLConfig, *NewHTTPContext(true)).init(bun.default_allocator);
|
||||
pub var allow_multithreaded_decompression: bool = false;
|
||||
|
||||
pub var max_http_header_size: usize = 16 * 1024;
|
||||
comptime {
|
||||
@@ -68,7 +69,7 @@ var shared_request_headers_buf: [256]picohttp.Header = undefined;
|
||||
|
||||
// this doesn't need to be stack memory because it is immediately cloned after use
|
||||
var shared_response_headers_buf: [256]picohttp.Header = undefined;
|
||||
|
||||
var requests_this_tick: usize = 0;
|
||||
const end_of_chunked_http1_1_encoding_response_body = "0\r\n\r\n";
|
||||
|
||||
pub const Signals = struct {
|
||||
@@ -1100,8 +1101,6 @@ pub const HTTPThread = struct {
|
||||
|
||||
pub fn onStart(opts: InitOpts) void {
|
||||
Output.Source.configureNamedThread("HTTP Client");
|
||||
default_arena = Arena.init() catch unreachable;
|
||||
default_allocator = default_arena.allocator();
|
||||
|
||||
const loop = bun.JSC.MiniEventLoop.initGlobal(null);
|
||||
|
||||
@@ -1257,6 +1256,7 @@ pub const HTTPThread = struct {
|
||||
this.loop.loop.inc();
|
||||
this.loop.loop.tick();
|
||||
this.loop.loop.dec();
|
||||
requests_this_tick = 0;
|
||||
|
||||
// this.loop.run();
|
||||
if (comptime Environment.isDebug) {
|
||||
@@ -1543,7 +1543,7 @@ pub inline fn getAllocator() std.mem.Allocator {
|
||||
}
|
||||
|
||||
pub inline fn cleanup(force: bool) void {
|
||||
default_arena.gc(force);
|
||||
_ = force; // autofix
|
||||
}
|
||||
|
||||
pub const Headers = @import("./http/headers.zig");
|
||||
@@ -1805,8 +1805,10 @@ pub const InternalState = struct {
|
||||
received_last_chunk: bool = false,
|
||||
did_set_content_encoding: bool = false,
|
||||
is_redirect_pending: bool = false,
|
||||
is_libdeflate_fast_path_disabled: bool = false,
|
||||
has_streamed_response_body: bool = false,
|
||||
resend_request_body_on_redirect: bool = false,
|
||||
is_libdeflate_fast_path_disabled: bool = true,
|
||||
should_defer_decompression_to_another_thread: bool = false,
|
||||
};
|
||||
|
||||
pub fn init(body: HTTPRequestBody, body_out_str: *MutableString) InternalState {
|
||||
@@ -1877,8 +1879,9 @@ pub const InternalState = struct {
|
||||
return this.flags.received_last_chunk;
|
||||
}
|
||||
|
||||
fn decompressBytes(this: *InternalState, buffer: []const u8, body_out_str: *MutableString, is_final_chunk: bool) !void {
|
||||
defer this.compressed_body.reset();
|
||||
fn decompressBytes(this: *InternalState, buffer: []const u8, body_out_str: *MutableString, is_final_chunk: bool, owns_buffer: bool) !void {
|
||||
var should_reset_compressed_body = true;
|
||||
defer if (should_reset_compressed_body) this.compressed_body.reset();
|
||||
var gzip_timer: std.time.Timer = undefined;
|
||||
|
||||
if (extremely_verbose)
|
||||
@@ -1888,7 +1891,7 @@ pub const InternalState = struct {
|
||||
|
||||
if (FeatureFlags.isLibdeflateEnabled()) {
|
||||
// Fast-path: use libdeflate
|
||||
if (is_final_chunk and !this.flags.is_libdeflate_fast_path_disabled and this.encoding.canUseLibDeflate() and this.isDone()) libdeflate: {
|
||||
if (is_final_chunk and !this.flags.has_streamed_response_body and !this.flags.is_libdeflate_fast_path_disabled and this.encoding.canUseLibDeflate() and this.isDone()) libdeflate: {
|
||||
this.flags.is_libdeflate_fast_path_disabled = true;
|
||||
|
||||
log("Decompressing {d} bytes with libdeflate\n", .{buffer.len});
|
||||
@@ -1929,6 +1932,28 @@ pub const InternalState = struct {
|
||||
}
|
||||
}
|
||||
|
||||
const should_defer_decompression_to_another_thread = still_needs_to_decompress and
|
||||
is_final_chunk and
|
||||
// This is only supported when not streaming the response body.
|
||||
!this.flags.has_streamed_response_body and
|
||||
!this.flags.should_defer_decompression_to_another_thread and
|
||||
owns_buffer and
|
||||
allow_multithreaded_decompression and
|
||||
// It needs to be a large enough buffer to make it worthwhile.
|
||||
buffer.len > 1024 * 2 and
|
||||
// If this is the only active request, there's really no point in paying the cost of concurrency.
|
||||
(requests_this_tick > 1 or AsyncHTTP.active_requests_count.load(.monotonic) > 1);
|
||||
|
||||
if (should_defer_decompression_to_another_thread) {
|
||||
log("Deferring decompression of {d} bytes to another thread\n", .{buffer.len});
|
||||
if (!this.compressed_body.owns(buffer)) {
|
||||
this.compressed_body.appendSlice(buffer) catch bun.outOfMemory();
|
||||
}
|
||||
this.flags.should_defer_decompression_to_another_thread = true;
|
||||
should_reset_compressed_body = false;
|
||||
return;
|
||||
}
|
||||
|
||||
// Slow path, or brotli: use the .decompressor
|
||||
if (still_needs_to_decompress) {
|
||||
log("Decompressing {d} bytes\n", .{buffer.len});
|
||||
@@ -1952,18 +1977,22 @@ pub const InternalState = struct {
|
||||
this.gzip_elapsed = gzip_timer.read();
|
||||
}
|
||||
|
||||
fn decompress(this: *InternalState, buffer: MutableString, body_out_str: *MutableString, is_final_chunk: bool) !void {
|
||||
try this.decompressBytes(buffer.list.items, body_out_str, is_final_chunk);
|
||||
fn decompress(this: *InternalState, buffer: MutableString, body_out_str: *MutableString, is_final_chunk: bool, owns_buffer: bool) !void {
|
||||
try this.decompressBytes(buffer.list.items, body_out_str, is_final_chunk, owns_buffer);
|
||||
}
|
||||
|
||||
pub fn processBodyBuffer(this: *InternalState, buffer: MutableString, is_final_chunk: bool) !bool {
|
||||
pub fn processBodyBuffer(this: *InternalState, buffer: MutableString, is_final_chunk: bool, owns_buffer: bool) !bool {
|
||||
if (this.flags.is_redirect_pending) return false;
|
||||
|
||||
var body_out_str = this.body_out_str.?;
|
||||
|
||||
switch (this.encoding) {
|
||||
Encoding.brotli, Encoding.gzip, Encoding.deflate => {
|
||||
try this.decompress(buffer, body_out_str, is_final_chunk);
|
||||
bun.debugAssert(!this.flags.should_defer_decompression_to_another_thread);
|
||||
try this.decompress(buffer, body_out_str, is_final_chunk, owns_buffer);
|
||||
if (this.flags.should_defer_decompression_to_another_thread) {
|
||||
return true;
|
||||
}
|
||||
},
|
||||
else => {
|
||||
if (!body_out_str.owns(buffer.list.items)) {
|
||||
@@ -2859,6 +2888,8 @@ fn start_(this: *HTTPClient, comptime is_ssl: bool) void {
|
||||
this.fail(error.ConnectionClosed);
|
||||
return;
|
||||
}
|
||||
|
||||
requests_this_tick +|= 1;
|
||||
}
|
||||
|
||||
const Task = ThreadPool.Task;
|
||||
@@ -3507,12 +3538,13 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
const out_str = this.state.body_out_str.?;
|
||||
const body = out_str.*;
|
||||
const result = this.toResult();
|
||||
const is_done = !result.has_more;
|
||||
|
||||
log("progressUpdate {}", .{is_done});
|
||||
log("progressUpdate is_done={}", .{is_done});
|
||||
|
||||
const callback = this.result_callback;
|
||||
|
||||
@@ -3527,6 +3559,7 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
|
||||
this.connected_url.getPortAuto(),
|
||||
);
|
||||
} else if (!socket.isClosed()) {
|
||||
// closeSocket detaches the pointer, so this will not close the socket
|
||||
NewHTTPContext(is_ssl).closeSocket(socket);
|
||||
}
|
||||
|
||||
@@ -3545,7 +3578,6 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
|
||||
if (print_every_i % print_every == 0) {
|
||||
Output.prettyln("Heap stats for HTTP thread\n", .{});
|
||||
Output.flush();
|
||||
default_arena.dumpThreadStats();
|
||||
print_every_i = 0;
|
||||
}
|
||||
}
|
||||
@@ -3553,6 +3585,7 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
|
||||
}
|
||||
|
||||
pub const HTTPClientResult = struct {
|
||||
decompression_task: DecompressionTask = .{},
|
||||
body: ?*MutableString = null,
|
||||
has_more: bool = false,
|
||||
redirected: bool = false,
|
||||
@@ -3569,6 +3602,37 @@ pub const HTTPClientResult = struct {
|
||||
body_size: BodySize = .unknown,
|
||||
certificate_info: ?CertificateInfo = null,
|
||||
|
||||
pub fn hasCompleteResponseBody(this: *const HTTPClientResult) bool {
|
||||
return !this.has_more and !this.decompression_task.pending;
|
||||
}
|
||||
|
||||
pub const DecompressionTask = struct {
|
||||
compressed_body: MutableString = .{
|
||||
.allocator = undefined,
|
||||
.list = .{},
|
||||
},
|
||||
encoding: Encoding = .identity,
|
||||
pending: bool = false,
|
||||
task: bun.ThreadPool.Task = .{ .callback = &runFromThreadPool },
|
||||
|
||||
pub var count = std.atomic.Value(u32).init(0);
|
||||
|
||||
pub fn decompress(this: *DecompressionTask, out: *MutableString) !void {
|
||||
bun.assert(this.encoding.isCompressed());
|
||||
var decompressor = Decompressor{ .none = {} };
|
||||
defer decompressor.deinit();
|
||||
try decompressor.updateBuffers(this.encoding, this.compressed_body.list.items, out);
|
||||
try decompressor.readAll(true);
|
||||
log("decompress({s}, {d} bytes) = {d} bytes", .{ @tagName(this.encoding), this.compressed_body.list.items.len, out.list.items.len });
|
||||
_ = count.fetchAdd(1, .monotonic);
|
||||
}
|
||||
|
||||
pub fn runFromThreadPool(task: *bun.ThreadPool.Task) void {
|
||||
_ = task; // autofix
|
||||
@panic("This should never be called");
|
||||
}
|
||||
};
|
||||
|
||||
pub fn abortReason(this: *const HTTPClientResult) ?JSC.CommonAbortReason {
|
||||
if (this.isTimeout()) {
|
||||
return .Timeout;
|
||||
@@ -3635,6 +3699,27 @@ pub fn toResult(this: *HTTPClient) HTTPClientResult {
|
||||
else
|
||||
.{ .unknown = {} };
|
||||
|
||||
const decompression_task: HTTPClientResult.DecompressionTask = brk: {
|
||||
if (!this.state.flags.should_defer_decompression_to_another_thread) {
|
||||
break :brk HTTPClientResult.DecompressionTask{
|
||||
.pending = false,
|
||||
.compressed_body = MutableString{ .allocator = bun.default_allocator, .list = .{} },
|
||||
.encoding = .identity,
|
||||
};
|
||||
}
|
||||
const compressed_body = this.state.compressed_body;
|
||||
this.state.compressed_body = .{
|
||||
.allocator = bun.default_allocator,
|
||||
.list = .{},
|
||||
};
|
||||
this.state.flags.should_defer_decompression_to_another_thread = false;
|
||||
break :brk HTTPClientResult.DecompressionTask{
|
||||
.pending = true,
|
||||
.compressed_body = compressed_body,
|
||||
.encoding = this.state.encoding,
|
||||
};
|
||||
};
|
||||
|
||||
var certificate_info: ?CertificateInfo = null;
|
||||
if (this.state.certificate_info) |info| {
|
||||
// transfer owner ship of the certificate info here
|
||||
@@ -3652,6 +3737,7 @@ pub fn toResult(this: *HTTPClient) HTTPClientResult {
|
||||
.has_more = certificate_info != null or (this.state.fail == null and !this.state.isDone()),
|
||||
.body_size = body_size,
|
||||
.certificate_info = null,
|
||||
.decompression_task = decompression_task,
|
||||
};
|
||||
}
|
||||
return HTTPClientResult{
|
||||
@@ -3663,6 +3749,7 @@ pub fn toResult(this: *HTTPClient) HTTPClientResult {
|
||||
.has_more = certificate_info != null or (this.state.fail == null and !this.state.isDone()),
|
||||
.body_size = body_size,
|
||||
.certificate_info = certificate_info,
|
||||
.decompression_task = decompression_task,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -3699,7 +3786,7 @@ fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const
|
||||
if (this.state.flags.is_redirect_pending) return;
|
||||
|
||||
if (this.state.encoding.isCompressed()) {
|
||||
try this.state.decompressBytes(incoming_data, this.state.body_out_str.?, true);
|
||||
try this.state.decompressBytes(incoming_data, this.state.body_out_str.?, true, !this.state.flags.has_streamed_response_body);
|
||||
} else {
|
||||
try this.state.getBodyBuffer().appendSliceExact(incoming_data);
|
||||
}
|
||||
@@ -3748,11 +3835,11 @@ fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []con
|
||||
const is_done = content_length != null and this.state.total_body_received >= content_length.?;
|
||||
if (is_done or this.signals.get(.body_streaming) or content_length == null) {
|
||||
const is_final_chunk = is_done;
|
||||
const processed = try this.state.processBodyBuffer(buffer.*, is_final_chunk);
|
||||
const processed = try this.state.processBodyBuffer(buffer.*, is_final_chunk, is_final_chunk and !this.state.flags.has_streamed_response_body);
|
||||
|
||||
// We can only use the libdeflate fast path when we are not streaming
|
||||
// If we ever call processBodyBuffer again, it cannot go through the fast path.
|
||||
this.state.flags.is_libdeflate_fast_path_disabled = true;
|
||||
this.state.flags.has_streamed_response_body = true;
|
||||
|
||||
if (this.progress_node) |progress| {
|
||||
progress.activate();
|
||||
@@ -3813,8 +3900,8 @@ fn handleResponseBodyChunkedEncodingFromMultiplePackets(
|
||||
// streaming chunks
|
||||
if (this.signals.get(.body_streaming)) {
|
||||
// If we're streaming, we cannot use the libdeflate fast path
|
||||
this.state.flags.is_libdeflate_fast_path_disabled = true;
|
||||
return try this.state.processBodyBuffer(buffer, false);
|
||||
this.state.flags.has_streamed_response_body = true;
|
||||
return try this.state.processBodyBuffer(buffer, false, false);
|
||||
}
|
||||
|
||||
return false;
|
||||
@@ -3825,6 +3912,7 @@ fn handleResponseBodyChunkedEncodingFromMultiplePackets(
|
||||
_ = try this.state.processBodyBuffer(
|
||||
buffer,
|
||||
true,
|
||||
!this.state.flags.has_streamed_response_body,
|
||||
);
|
||||
|
||||
if (this.progress_node) |progress| {
|
||||
@@ -3894,9 +3982,9 @@ fn handleResponseBodyChunkedEncodingFromSinglePacket(
|
||||
// streaming chunks
|
||||
if (this.signals.get(.body_streaming)) {
|
||||
// If we're streaming, we cannot use the libdeflate fast path
|
||||
this.state.flags.is_libdeflate_fast_path_disabled = true;
|
||||
this.state.flags.has_streamed_response_body = true;
|
||||
|
||||
return try this.state.processBodyBuffer(body_buffer.*, true);
|
||||
return try this.state.processBodyBuffer(body_buffer.*, true, false);
|
||||
}
|
||||
|
||||
return false;
|
||||
|
||||
@@ -267,13 +267,55 @@ const NetworkTask = struct {
|
||||
};
|
||||
pub const DedupeMap = std.HashMap(u64, DedupeMapEntry, IdentityContext(u64), 80);
|
||||
|
||||
pub fn notify(this: *NetworkTask, async_http: *AsyncHTTP, _: anytype) void {
|
||||
const NetworkTaskDecompressor = struct {
|
||||
task: bun.ThreadPool.Task = .{ .callback = &runFromThreadPool },
|
||||
temporary_http: AsyncHTTP = undefined,
|
||||
http_client_result: HTTP.HTTPClientResult,
|
||||
network_task: *NetworkTask,
|
||||
|
||||
pub usingnamespace bun.New(@This());
|
||||
|
||||
pub fn deinit(this: *NetworkTaskDecompressor) void {
|
||||
this.http_client_result.decompression_task.compressed_body.deinit();
|
||||
this.destroy();
|
||||
}
|
||||
|
||||
pub fn runFromThreadPool(task: *bun.ThreadPool.Task) void {
|
||||
const this: *NetworkTaskDecompressor = @fieldParentPtr("task", task);
|
||||
const network_task = this.network_task;
|
||||
defer this.deinit();
|
||||
this.http_client_result.decompression_task.decompress(this.temporary_http.response_buffer) catch |err| {
|
||||
this.temporary_http.err = err;
|
||||
this.temporary_http.response = null;
|
||||
};
|
||||
|
||||
network_task.onHTTPNetworkTaskComplete(&this.temporary_http);
|
||||
}
|
||||
};
|
||||
|
||||
pub fn onHTTPNetworkTaskComplete(this: *NetworkTask, async_http: *AsyncHTTP) void {
|
||||
defer this.package_manager.wake();
|
||||
async_http.real.?.* = async_http.*;
|
||||
async_http.real.?.response_buffer = async_http.response_buffer;
|
||||
this.package_manager.async_network_task_queue.push(this);
|
||||
}
|
||||
|
||||
pub fn notify(this: *NetworkTask, async_http: *AsyncHTTP, result: HTTP.HTTPClientResult) void {
|
||||
if (result.decompression_task.pending) {
|
||||
async_http.real.?.* = async_http.*;
|
||||
async_http.real.?.response_buffer = async_http.response_buffer;
|
||||
|
||||
const network_task_decompressor = NetworkTaskDecompressor.new(.{
|
||||
.network_task = this,
|
||||
.temporary_http = async_http.*,
|
||||
.http_client_result = result,
|
||||
});
|
||||
this.package_manager.thread_pool.schedule(ThreadPool.Batch.from(&network_task_decompressor.task));
|
||||
} else {
|
||||
this.onHTTPNetworkTaskComplete(async_http);
|
||||
}
|
||||
}
|
||||
|
||||
pub const Authorization = enum {
|
||||
no_authorization,
|
||||
allow_authorization,
|
||||
@@ -8763,6 +8805,7 @@ pub const PackageManager = struct {
|
||||
break :brk default_max_simultaneous_requests_for_bun_install;
|
||||
}, .monotonic);
|
||||
|
||||
bun.http.allow_multithreaded_decompression = true;
|
||||
HTTP.HTTPThread.init(&.{
|
||||
.ca = ca,
|
||||
.abs_ca_file_name = abs_ca_file_name,
|
||||
|
||||
@@ -141,3 +141,5 @@ export const isModuleResolveFilenameSlowPathEnabled: () => boolean = $newCppFunc
|
||||
"jsFunctionIsModuleResolveFilenameSlowPathEnabled",
|
||||
0,
|
||||
);
|
||||
|
||||
export const createInternalStatsObject = $newZigFunction("analytics_thread.zig", "createInternalStatsObject", 1);
|
||||
|
||||
@@ -1,13 +1,101 @@
|
||||
import { Socket } from "bun";
|
||||
import { beforeAll, expect, it } from "bun:test";
|
||||
import { gcTick } from "harness";
|
||||
|
||||
import { createInternalStatsObject } from "bun:internal-for-testing";
|
||||
import path from "path";
|
||||
|
||||
const gzipped = path.join(import.meta.dir, "fixture.html.gz");
|
||||
let gzippedBlob: Blob;
|
||||
const html = path.join(import.meta.dir, "fixture.html");
|
||||
let htmlText: string;
|
||||
beforeAll(async () => {
|
||||
htmlText = (await Bun.file(html).text()).replace(/\r\n/g, "\n");
|
||||
gzippedBlob = new Blob([await Bun.file(gzipped).arrayBuffer()]);
|
||||
});
|
||||
|
||||
it("many concurrent decompression tasks that read to completion", async () => {
|
||||
using server = Bun.serve({
|
||||
port: 0,
|
||||
async fetch(req) {
|
||||
await Bun.sleep(0);
|
||||
return new Response(gzippedBlob, {
|
||||
headers: {
|
||||
"Content-Encoding": "gzip",
|
||||
"Content-Type": "text/html; charset=utf-8",
|
||||
},
|
||||
});
|
||||
},
|
||||
});
|
||||
const initialStats = createInternalStatsObject();
|
||||
for (let j = 0; j < 10; j++) {
|
||||
const count = 40;
|
||||
const promises = [];
|
||||
for (let i = 0; i < count; i++) {
|
||||
promises.push(fetch(server.url).then(res => res.text()));
|
||||
}
|
||||
const results = await Promise.all(promises);
|
||||
for (const text of results) {
|
||||
expect(text).toBe(htmlText);
|
||||
}
|
||||
}
|
||||
const finalStats = createInternalStatsObject();
|
||||
console.log(finalStats);
|
||||
|
||||
expect(finalStats.concurrentDecompressionCount).toBeGreaterThan(
|
||||
// Not all will end up being concurrently decompressed, and that's okay.
|
||||
initialStats.concurrentDecompressionCount + 10 * 20,
|
||||
);
|
||||
});
|
||||
|
||||
it("many concurrent decompression tasks that ignore the body", async () => {
|
||||
const arrayBuffer = await gzippedBlob.arrayBuffer();
|
||||
let promises: Promise<void>[] = [];
|
||||
let allPromises = Promise.withResolvers();
|
||||
using server = Bun.serve({
|
||||
port: 0,
|
||||
async fetch(req) {
|
||||
await Bun.sleep(0);
|
||||
return new Response(
|
||||
new ReadableStream({
|
||||
type: "direct",
|
||||
async pull(controller) {
|
||||
const defer = Promise.withResolvers();
|
||||
promises.push(defer.promise);
|
||||
await Bun.sleep(1);
|
||||
await controller.write(arrayBuffer);
|
||||
await controller.end();
|
||||
defer.resolve();
|
||||
if (promises.length === 40) {
|
||||
allPromises.resolve();
|
||||
}
|
||||
},
|
||||
}),
|
||||
{
|
||||
headers: {
|
||||
"Content-Encoding": "gzip",
|
||||
"Content-Type": "text/html; charset=utf-8",
|
||||
},
|
||||
},
|
||||
);
|
||||
},
|
||||
});
|
||||
|
||||
for (let j = 0; j < 10; j++) {
|
||||
const count = 40;
|
||||
promises = [];
|
||||
allPromises = Promise.withResolvers();
|
||||
|
||||
function wrap() {
|
||||
for (let i = 0; i < count; i++) {
|
||||
fetch(server.url).then(r => {});
|
||||
}
|
||||
}
|
||||
|
||||
wrap();
|
||||
Bun.gc(true);
|
||||
await allPromises.promise;
|
||||
await Promise.all(promises);
|
||||
}
|
||||
});
|
||||
|
||||
it("fetch() with a buffered gzip response works (one chunk)", async () => {
|
||||
@@ -15,7 +103,6 @@ it("fetch() with a buffered gzip response works (one chunk)", async () => {
|
||||
port: 0,
|
||||
|
||||
async fetch(req) {
|
||||
gcTick(true);
|
||||
return new Response(require("fs").readFileSync(gzipped), {
|
||||
headers: {
|
||||
"Content-Encoding": "gzip",
|
||||
@@ -24,19 +111,17 @@ it("fetch() with a buffered gzip response works (one chunk)", async () => {
|
||||
});
|
||||
},
|
||||
});
|
||||
gcTick(true);
|
||||
|
||||
const res = await fetch(server.url, { verbose: true });
|
||||
gcTick(true);
|
||||
|
||||
const arrayBuffer = await res.arrayBuffer();
|
||||
const clone = new Buffer(arrayBuffer);
|
||||
gcTick(true);
|
||||
|
||||
await (async function () {
|
||||
const second = Buffer.from(htmlText);
|
||||
gcTick(true);
|
||||
|
||||
expect(second.equals(clone)).toBe(true);
|
||||
})();
|
||||
gcTick(true);
|
||||
});
|
||||
|
||||
it("fetch() with a redirect that returns a buffered gzip response works (one chunk)", async () => {
|
||||
@@ -122,104 +207,122 @@ it("fetch() with a gzip response works (one chunk, streamed, with a delay)", asy
|
||||
|
||||
it("fetch() with a gzip response works (multiple chunks, TCP server)", async done => {
|
||||
const compressed = await Bun.file(gzipped).arrayBuffer();
|
||||
var socketToClose!: Socket;
|
||||
let pending,
|
||||
pendingChunks = [];
|
||||
const server = Bun.listen({
|
||||
hostname: "localhost",
|
||||
port: 0,
|
||||
socket: {
|
||||
drain(socket) {
|
||||
if (pending) {
|
||||
while (pendingChunks.length) {
|
||||
const chunk = pendingChunks.shift();
|
||||
const written = socket.write(chunk);
|
||||
async function iterate() {
|
||||
var socketToClose!: Socket;
|
||||
let pending,
|
||||
pendingChunks = [];
|
||||
const server = Bun.listen({
|
||||
hostname: "localhost",
|
||||
port: 0,
|
||||
socket: {
|
||||
drain(socket) {
|
||||
if (pending) {
|
||||
while (pendingChunks.length) {
|
||||
const chunk = pendingChunks.shift();
|
||||
const written = socket.write(chunk);
|
||||
|
||||
if (written < chunk.length) {
|
||||
pendingChunks.push(chunk.slice(written));
|
||||
return;
|
||||
if (written < chunk.length) {
|
||||
pendingChunks.push(chunk.slice(written));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
const resolv = pending;
|
||||
pending = null;
|
||||
resolv();
|
||||
}
|
||||
},
|
||||
async open(socket) {
|
||||
socketToClose = socket;
|
||||
|
||||
var corked: any[] = [];
|
||||
var cork = true;
|
||||
let written = 0;
|
||||
let pendingChunks = [];
|
||||
async function write(chunk: any) {
|
||||
let defer = Promise.withResolvers();
|
||||
|
||||
if (cork) {
|
||||
corked.push(chunk);
|
||||
}
|
||||
|
||||
if (!cork && corked.length) {
|
||||
const toWrite = corked.join("");
|
||||
const wrote = socket.write(toWrite);
|
||||
if (wrote !== toWrite.length) {
|
||||
pendingChunks.push(toWrite.slice(wrote));
|
||||
}
|
||||
corked.length = 0;
|
||||
}
|
||||
|
||||
if (!cork) {
|
||||
if (pendingChunks.length) {
|
||||
pendingChunks.push(chunk);
|
||||
pending = defer.resolve;
|
||||
await defer.promise;
|
||||
defer = Promise.withResolvers();
|
||||
pending = defer.resolve;
|
||||
}
|
||||
|
||||
const written = socket.write(chunk);
|
||||
if (written < chunk.length) {
|
||||
console.log("written", written);
|
||||
pendingChunks.push(chunk.slice(written));
|
||||
pending = defer.resolve;
|
||||
await defer.promise;
|
||||
defer = Promise.withResolvers();
|
||||
pending = defer.resolve;
|
||||
}
|
||||
}
|
||||
|
||||
const promise = defer.promise;
|
||||
if (pendingChunks.length) {
|
||||
pending = promise;
|
||||
await promise;
|
||||
} else {
|
||||
const resolv = pending;
|
||||
pending = null;
|
||||
resolv();
|
||||
}
|
||||
}
|
||||
await write("HTTP/1.1 200 OK\r\n");
|
||||
await write("Content-Encoding: gzip\r\n");
|
||||
await write("Content-Type: text/html; charset=utf-8\r\n");
|
||||
await write("Content-Length: " + compressed.byteLength + "\r\n");
|
||||
await write("X-WTF: " + "lol".repeat(1000) + "\r\n");
|
||||
await write("\r\n");
|
||||
for (var i = 100; i < compressed.byteLength; i += 100) {
|
||||
cork = false;
|
||||
await write(compressed.slice(i - 100, i));
|
||||
}
|
||||
await write(compressed.slice(i - 100));
|
||||
await write("\r\n");
|
||||
},
|
||||
async open(socket) {
|
||||
socketToClose = socket;
|
||||
|
||||
socket.flush();
|
||||
var corked: any[] = [];
|
||||
var cork = true;
|
||||
let written = 0;
|
||||
let pendingChunks = [];
|
||||
async function write(chunk: any) {
|
||||
let defer = Promise.withResolvers();
|
||||
|
||||
if (cork) {
|
||||
corked.push(chunk);
|
||||
}
|
||||
|
||||
if (!cork && corked.length) {
|
||||
const toWrite = corked.join("");
|
||||
const wrote = socket.write(toWrite);
|
||||
if (wrote !== toWrite.length) {
|
||||
pendingChunks.push(toWrite.slice(wrote));
|
||||
}
|
||||
corked.length = 0;
|
||||
}
|
||||
|
||||
if (!cork) {
|
||||
if (pendingChunks.length) {
|
||||
pendingChunks.push(chunk);
|
||||
pending = defer.resolve;
|
||||
await defer.promise;
|
||||
defer = Promise.withResolvers();
|
||||
pending = defer.resolve;
|
||||
}
|
||||
|
||||
const written = socket.write(chunk);
|
||||
if (written < chunk.length) {
|
||||
console.log("written", written);
|
||||
pendingChunks.push(chunk.slice(written));
|
||||
pending = defer.resolve;
|
||||
await defer.promise;
|
||||
defer = Promise.withResolvers();
|
||||
pending = defer.resolve;
|
||||
}
|
||||
}
|
||||
|
||||
const promise = defer.promise;
|
||||
if (pendingChunks.length) {
|
||||
pending = promise;
|
||||
await promise;
|
||||
} else {
|
||||
pending = null;
|
||||
}
|
||||
}
|
||||
await write("HTTP/1.1 200 OK\r\n");
|
||||
await write("Content-Encoding: gzip\r\n");
|
||||
await write("Content-Type: text/html; charset=utf-8\r\n");
|
||||
await write("Content-Length: " + compressed.byteLength + "\r\n");
|
||||
await write("X-WTF: " + "lol".repeat(1000) + "\r\n");
|
||||
await write("\r\n");
|
||||
for (var i = 100; i < compressed.byteLength; i += 100) {
|
||||
cork = false;
|
||||
await write(compressed.slice(i - 100, i));
|
||||
}
|
||||
await write(compressed.slice(i - 100));
|
||||
await write("\r\n");
|
||||
|
||||
socket.flush();
|
||||
},
|
||||
drain(socket) {},
|
||||
},
|
||||
drain(socket) {},
|
||||
},
|
||||
});
|
||||
await 1;
|
||||
});
|
||||
await 1;
|
||||
|
||||
const res = await fetch(`http://${server.hostname}:${server.port}`);
|
||||
const text = (await res.text()).replace(/\r\n/g, "\n");
|
||||
expect(text).toEqual(htmlText);
|
||||
socketToClose.end();
|
||||
server.stop();
|
||||
}
|
||||
|
||||
// Run this once to test gzip decompression without concurrency
|
||||
{
|
||||
await iterate();
|
||||
Bun.sleep(2);
|
||||
}
|
||||
|
||||
// Run this 10 times in parallel to test gzip decompression concurrency
|
||||
{
|
||||
const promises = [];
|
||||
for (let i = 0; i < 10; i++) {
|
||||
promises.push(iterate());
|
||||
}
|
||||
await Promise.all(promises);
|
||||
}
|
||||
|
||||
const res = await fetch(`http://${server.hostname}:${server.port}`);
|
||||
const text = (await res.text()).replace(/\r\n/g, "\n");
|
||||
expect(text).toEqual(htmlText);
|
||||
socketToClose.end();
|
||||
server.stop();
|
||||
done();
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user