Compare commits

...

6 Commits

Author SHA1 Message Date
Jarred Sumner
6ba9b0a27a Merge branch 'main' into jarred/fetchy 2024-10-31 13:31:24 -07:00
Jarred Sumner
12dee5c720 Merge branch 'main' into jarred/fetchy 2024-10-29 18:57:07 -07:00
Jarred Sumner
9e454a43ad See if this fixes the test failure 2024-10-24 13:37:09 -07:00
Jarred Sumner
8cec29c95a Update http.zig 2024-10-24 00:12:57 -07:00
Jarred Sumner
b941dd6f0b Update fetch-gzip.test.ts 2024-10-23 22:42:51 -07:00
Jarred Sumner
f4e0948055 Multi-threaded fetch() body decompression 2024-10-23 22:20:12 -07:00
10 changed files with 482 additions and 143 deletions

View File

@@ -124,6 +124,18 @@ pub const Features = struct {
return Formatter{};
}
const JSC = bun.JSC;
pub fn toJS(globalThis: *JSC.JSGlobalObject, _: *JSC.CallFrame) JSC.JSValue {
const object = JSC.JSValue.createEmptyObjectWithNullPrototype(globalThis);
inline for (comptime std.meta.declarations(Features)) |decl| {
if (@typeInfo(@TypeOf(@field(Features, decl.name))) == .Int) {
object.put(globalThis, decl.name, JSC.JSValue.jsNumber(@field(Features, decl.name)));
}
}
object.put(globalThis, "concurrentDecompressionCount", JSC.JSValue.jsNumber(bun.http.HTTPClientResult.DecompressionTask.count.load(.monotonic)));
return object;
}
pub const Formatter = struct {
pub fn format(_: Formatter, comptime _: []const u8, _: std.fmt.FormatOptions, writer: anytype) !void {
const fields = comptime brk: {
@@ -353,3 +365,5 @@ pub const GenerateHeader = struct {
}
};
};
pub const createInternalStatsObject = Features.toJS;

View File

@@ -4105,7 +4105,7 @@ static void populateStackFramePosition(const JSC::StackFrame* stackFrame, BunStr
// It is key to not clone this data because source code strings are large.
// Usage of toStringView (non-owning) is safe as we ref the provider.
provider->ref();
ASSERT(*referenced_source_provider == nullptr);
// ASSERT(*referenced_source_provider == nullptr);
*referenced_source_provider = provider;
source_lines[0] = Bun::toStringView(sourceString.substring(lineStart, lineEnd - lineStart));
source_line_numbers[0] = location.line();

View File

@@ -1000,7 +1000,7 @@ pub const Fetch = struct {
const success = this.result.isSuccess();
const globalThis = this.global_this;
// reset the buffer if we are streaming or if we are not waiting for bufferig anymore
var buffer_reset = true;
var buffer_reset = !this.result.decompression_task.pending;
defer {
if (buffer_reset) {
this.scheduled_response_buffer.reset();
@@ -1038,7 +1038,7 @@ pub const Fetch = struct {
}
if (this.readable_stream_ref.get()) |readable| {
if (readable.ptr == .Bytes) {
if (readable.ptr == .Bytes and !this.result.decompression_task.pending) {
readable.ptr.Bytes.size_hint = this.getSizeHint();
// body can be marked as used but we still need to pipe the data
const scheduled_response_buffer = this.scheduled_response_buffer.list;
@@ -1078,7 +1078,7 @@ pub const Fetch = struct {
if (this.getCurrentResponse()) |response| {
var body = &response.body;
if (body.value == .Locked) {
if (body.value == .Locked and !this.result.decompression_task.pending) {
if (body.value.Locked.readable.get()) |readable| {
if (readable.ptr == .Bytes) {
readable.ptr.Bytes.size_hint = this.getSizeHint();
@@ -1149,7 +1149,7 @@ pub const Fetch = struct {
log("onProgressUpdate", .{});
this.mutex.lock();
this.has_schedule_callback.store(false, .monotonic);
const is_done = !this.result.has_more;
const is_done = this.result.hasCompleteResponseBody();
const vm = this.javascript_vm;
// vm is shutting down we cannot touch JS
@@ -1564,7 +1564,7 @@ pub const Fetch = struct {
// at this point we always should have metadata
const metadata = this.metadata.?;
const http_response = metadata.response;
this.is_waiting_body = this.result.has_more;
this.is_waiting_body = !this.result.hasCompleteResponseBody();
return Response{
.url = bun.String.createAtomIfPossible(metadata.url),
.redirected = this.result.redirected,
@@ -1816,10 +1816,84 @@ pub const Fetch = struct {
return node;
}
fn onDecompressFromThreadPool(task: *bun.ThreadPool.Task) void {
const input_decompression_task: *http.HTTPClientResult.DecompressionTask = @fieldParentPtr("task", task);
const client_result: *http.HTTPClientResult = @fieldParentPtr("decompression_task", input_decompression_task);
var this: *FetchTasklet = @fieldParentPtr("result", client_result);
var response_buffer: MutableString = undefined;
defer response_buffer.deinit();
// Avoid potential data races by moving this to the stack
var decompression_task: http.HTTPClientResult.DecompressionTask = brk: {
this.mutex.lock();
defer this.mutex.unlock();
response_buffer = this.response_buffer;
this.response_buffer = .{
.allocator = bun.default_allocator,
.list = .{
.items = &.{},
.capacity = 0,
},
};
const old = this.result.decompression_task;
this.result.decompression_task = .{
.compressed_body = MutableString{ .allocator = bun.default_allocator, .list = .{} },
// keep it marked as pending so the other thread if it receives an update simultaneously, doesn't think we had an e
.pending = true,
.encoding = .identity,
};
break :brk old;
};
defer decompression_task.compressed_body.deinit();
var fail: ?anyerror = null;
// Do not lock the mutex on this thread while decompressing!
decompression_task.decompress(&response_buffer) catch |err| {
// if decompression fails, we mark it as an error
fail = err;
};
this.mutex.lock();
const has_only_one_ref = this.ref_count.load(.monotonic) == 1;
defer if (!has_only_one_ref) this.mutex.unlock();
defer this.deref();
_ = this.scheduled_response_buffer.write(response_buffer.list.items) catch bun.outOfMemory();
this.result.fail = fail;
this.result.decompression_task.pending = false;
scheduleMainThreadTask(this);
}
fn scheduleMainThreadTask(task: *FetchTasklet) void {
if (task.ref_count.load(.monotonic) == 1) {
// If the JS side already rejected the Promise, or it was otherwise already finalized.
// We can skip enqueuing and rely on the caller to deinit immediately.
log("deinit early due to only one ref", .{});
return;
}
if (task.has_schedule_callback.cmpxchgStrong(false, true, .acquire, .monotonic)) |has_schedule_callback| {
if (has_schedule_callback) {
return;
}
}
task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task, .manual_deinit));
}
pub fn callback(task: *FetchTasklet, async_http: *http.AsyncHTTP, result: http.HTTPClientResult) void {
var is_done = !result.has_more;
task.mutex.lock();
defer task.mutex.unlock();
const is_done = !result.has_more;
const has_only_one_ref = task.ref_count.load(.monotonic) == 1;
var should_unlock_mutex = !has_only_one_ref;
// Prevent a use-after-free of this mutex if
defer if (should_unlock_mutex) task.mutex.unlock();
// we are done with the http client so we can deref our side
defer if (is_done) task.deref();
@@ -1857,6 +1931,16 @@ pub const Fetch = struct {
if (task.ignore_data) {
task.response_buffer.reset();
// If we ignored the response body and it is compressed, immediately free it before we start decompressing it
if (task.result.decompression_task.pending) {
var compressed_body = &task.result.decompression_task.compressed_body;
compressed_body.deinit();
task.result.decompression_task.pending = false;
// if we have more data, we should not have had a decompression task.
bun.debugAssert(!result.has_more);
}
if (task.scheduled_response_buffer.list.capacity > 0) {
task.scheduled_response_buffer.deinit();
task.scheduled_response_buffer = .{
@@ -1873,19 +1957,26 @@ pub const Fetch = struct {
}
} else {
if (success) {
if (task.result.decompression_task.pending) {
bun.debugAssert(task.result.decompression_task.compressed_body.list.items.len > 0);
task.result.decompression_task.task = .{ .callback = &onDecompressFromThreadPool };
is_done = false;
should_unlock_mutex = false;
// Ensure we don't block the other thread on this mutex.
task.mutex.unlock();
JSC.WorkPool.schedule(&task.result.decompression_task.task);
return;
}
_ = task.scheduled_response_buffer.write(task.response_buffer.list.items) catch bun.outOfMemory();
}
// reset for reuse
task.response_buffer.reset();
}
if (task.has_schedule_callback.cmpxchgStrong(false, true, .acquire, .monotonic)) |has_schedule_callback| {
if (has_schedule_callback) {
return;
}
}
task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task, .manual_deinit));
scheduleMainThreadTask(task);
}
};

View File

@@ -1711,13 +1711,9 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
if (comptime !@hasField(SinkType, "signal"))
return;
const ptr = this.sink.signal.ptr;
if (this.sink.signal.isDead())
return;
this.sink.signal.clear();
const value = @as(JSValue, @enumFromInt(@as(JSC.JSValueReprInt, @bitCast(@intFromPtr(ptr)))));
value.unprotect();
detachPtr(value);
}
pub fn detachPtr(ptr: JSValue) callconv(.C) void {

View File

@@ -44,6 +44,7 @@ pub const Run = struct {
pub fn bootStandalone(ctx: Command.Context, entry_path: string, graph: bun.StandaloneModuleGraph) !void {
JSC.markBinding(@src());
bun.http.allow_multithreaded_decompression = !ctx.runtime_options.smol;
bun.JSC.initialize(false);
bun.Analytics.Features.standalone_executable += 1;
@@ -183,7 +184,7 @@ pub const Run = struct {
}
bun.JSC.initialize(ctx.runtime_options.eval.eval_and_print);
bun.http.allow_multithreaded_decompression = !ctx.runtime_options.smol;
js_ast.Expr.Data.Store.create();
js_ast.Stmt.Data.Store.create();
var arena = try Arena.init();

View File

@@ -741,6 +741,7 @@ pub const TestCommand = struct {
break :brk loader;
};
bun.JSC.initialize(false);
bun.http.allow_multithreaded_decompression = !ctx.runtime_options.smol;
HTTPThread.init(&.{});
var snapshot_file_buf = std.ArrayList(u8).init(ctx.allocator);

View File

@@ -40,8 +40,8 @@ const uws = bun.uws;
pub const MimeType = @import("./http/mime_type.zig");
pub const URLPath = @import("./http/url_path.zig");
// This becomes Arena.allocator
pub var default_allocator: std.mem.Allocator = undefined;
var default_arena: Arena = undefined;
const default_allocator = bun.default_allocator;
pub var http_thread: HTTPThread = undefined;
const HiveArray = @import("./hive_array.zig").HiveArray;
const Batch = bun.ThreadPool.Batch;
@@ -53,6 +53,7 @@ var socket_async_http_abort_tracker = std.AutoArrayHashMap(u32, uws.InternalSock
var async_http_id: std.atomic.Value(u32) = std.atomic.Value(u32).init(0);
const MAX_REDIRECT_URL_LENGTH = 128 * 1024;
var custom_ssl_context_map = std.AutoArrayHashMap(*SSLConfig, *NewHTTPContext(true)).init(bun.default_allocator);
pub var allow_multithreaded_decompression: bool = false;
pub var max_http_header_size: usize = 16 * 1024;
comptime {
@@ -68,7 +69,7 @@ var shared_request_headers_buf: [256]picohttp.Header = undefined;
// this doesn't need to be stack memory because it is immediately cloned after use
var shared_response_headers_buf: [256]picohttp.Header = undefined;
var requests_this_tick: usize = 0;
const end_of_chunked_http1_1_encoding_response_body = "0\r\n\r\n";
pub const Signals = struct {
@@ -1100,8 +1101,6 @@ pub const HTTPThread = struct {
pub fn onStart(opts: InitOpts) void {
Output.Source.configureNamedThread("HTTP Client");
default_arena = Arena.init() catch unreachable;
default_allocator = default_arena.allocator();
const loop = bun.JSC.MiniEventLoop.initGlobal(null);
@@ -1257,6 +1256,7 @@ pub const HTTPThread = struct {
this.loop.loop.inc();
this.loop.loop.tick();
this.loop.loop.dec();
requests_this_tick = 0;
// this.loop.run();
if (comptime Environment.isDebug) {
@@ -1543,7 +1543,7 @@ pub inline fn getAllocator() std.mem.Allocator {
}
pub inline fn cleanup(force: bool) void {
default_arena.gc(force);
_ = force; // autofix
}
pub const Headers = @import("./http/headers.zig");
@@ -1805,8 +1805,10 @@ pub const InternalState = struct {
received_last_chunk: bool = false,
did_set_content_encoding: bool = false,
is_redirect_pending: bool = false,
is_libdeflate_fast_path_disabled: bool = false,
has_streamed_response_body: bool = false,
resend_request_body_on_redirect: bool = false,
is_libdeflate_fast_path_disabled: bool = true,
should_defer_decompression_to_another_thread: bool = false,
};
pub fn init(body: HTTPRequestBody, body_out_str: *MutableString) InternalState {
@@ -1877,8 +1879,9 @@ pub const InternalState = struct {
return this.flags.received_last_chunk;
}
fn decompressBytes(this: *InternalState, buffer: []const u8, body_out_str: *MutableString, is_final_chunk: bool) !void {
defer this.compressed_body.reset();
fn decompressBytes(this: *InternalState, buffer: []const u8, body_out_str: *MutableString, is_final_chunk: bool, owns_buffer: bool) !void {
var should_reset_compressed_body = true;
defer if (should_reset_compressed_body) this.compressed_body.reset();
var gzip_timer: std.time.Timer = undefined;
if (extremely_verbose)
@@ -1888,7 +1891,7 @@ pub const InternalState = struct {
if (FeatureFlags.isLibdeflateEnabled()) {
// Fast-path: use libdeflate
if (is_final_chunk and !this.flags.is_libdeflate_fast_path_disabled and this.encoding.canUseLibDeflate() and this.isDone()) libdeflate: {
if (is_final_chunk and !this.flags.has_streamed_response_body and !this.flags.is_libdeflate_fast_path_disabled and this.encoding.canUseLibDeflate() and this.isDone()) libdeflate: {
this.flags.is_libdeflate_fast_path_disabled = true;
log("Decompressing {d} bytes with libdeflate\n", .{buffer.len});
@@ -1929,6 +1932,28 @@ pub const InternalState = struct {
}
}
const should_defer_decompression_to_another_thread = still_needs_to_decompress and
is_final_chunk and
// This is only supported when not streaming the response body.
!this.flags.has_streamed_response_body and
!this.flags.should_defer_decompression_to_another_thread and
owns_buffer and
allow_multithreaded_decompression and
// It needs to be a large enough buffer to make it worthwhile.
buffer.len > 1024 * 2 and
// If this is the only active request, there's really no point in paying the cost of concurrency.
(requests_this_tick > 1 or AsyncHTTP.active_requests_count.load(.monotonic) > 1);
if (should_defer_decompression_to_another_thread) {
log("Deferring decompression of {d} bytes to another thread\n", .{buffer.len});
if (!this.compressed_body.owns(buffer)) {
this.compressed_body.appendSlice(buffer) catch bun.outOfMemory();
}
this.flags.should_defer_decompression_to_another_thread = true;
should_reset_compressed_body = false;
return;
}
// Slow path, or brotli: use the .decompressor
if (still_needs_to_decompress) {
log("Decompressing {d} bytes\n", .{buffer.len});
@@ -1952,18 +1977,22 @@ pub const InternalState = struct {
this.gzip_elapsed = gzip_timer.read();
}
fn decompress(this: *InternalState, buffer: MutableString, body_out_str: *MutableString, is_final_chunk: bool) !void {
try this.decompressBytes(buffer.list.items, body_out_str, is_final_chunk);
fn decompress(this: *InternalState, buffer: MutableString, body_out_str: *MutableString, is_final_chunk: bool, owns_buffer: bool) !void {
try this.decompressBytes(buffer.list.items, body_out_str, is_final_chunk, owns_buffer);
}
pub fn processBodyBuffer(this: *InternalState, buffer: MutableString, is_final_chunk: bool) !bool {
pub fn processBodyBuffer(this: *InternalState, buffer: MutableString, is_final_chunk: bool, owns_buffer: bool) !bool {
if (this.flags.is_redirect_pending) return false;
var body_out_str = this.body_out_str.?;
switch (this.encoding) {
Encoding.brotli, Encoding.gzip, Encoding.deflate => {
try this.decompress(buffer, body_out_str, is_final_chunk);
bun.debugAssert(!this.flags.should_defer_decompression_to_another_thread);
try this.decompress(buffer, body_out_str, is_final_chunk, owns_buffer);
if (this.flags.should_defer_decompression_to_another_thread) {
return true;
}
},
else => {
if (!body_out_str.owns(buffer.list.items)) {
@@ -2859,6 +2888,8 @@ fn start_(this: *HTTPClient, comptime is_ssl: bool) void {
this.fail(error.ConnectionClosed);
return;
}
requests_this_tick +|= 1;
}
const Task = ThreadPool.Task;
@@ -3507,12 +3538,13 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
}
return;
}
const out_str = this.state.body_out_str.?;
const body = out_str.*;
const result = this.toResult();
const is_done = !result.has_more;
log("progressUpdate {}", .{is_done});
log("progressUpdate is_done={}", .{is_done});
const callback = this.result_callback;
@@ -3527,6 +3559,7 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
this.connected_url.getPortAuto(),
);
} else if (!socket.isClosed()) {
// closeSocket detaches the pointer, so this will not close the socket
NewHTTPContext(is_ssl).closeSocket(socket);
}
@@ -3545,7 +3578,6 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
if (print_every_i % print_every == 0) {
Output.prettyln("Heap stats for HTTP thread\n", .{});
Output.flush();
default_arena.dumpThreadStats();
print_every_i = 0;
}
}
@@ -3553,6 +3585,7 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
}
pub const HTTPClientResult = struct {
decompression_task: DecompressionTask = .{},
body: ?*MutableString = null,
has_more: bool = false,
redirected: bool = false,
@@ -3569,6 +3602,37 @@ pub const HTTPClientResult = struct {
body_size: BodySize = .unknown,
certificate_info: ?CertificateInfo = null,
pub fn hasCompleteResponseBody(this: *const HTTPClientResult) bool {
return !this.has_more and !this.decompression_task.pending;
}
pub const DecompressionTask = struct {
compressed_body: MutableString = .{
.allocator = undefined,
.list = .{},
},
encoding: Encoding = .identity,
pending: bool = false,
task: bun.ThreadPool.Task = .{ .callback = &runFromThreadPool },
pub var count = std.atomic.Value(u32).init(0);
pub fn decompress(this: *DecompressionTask, out: *MutableString) !void {
bun.assert(this.encoding.isCompressed());
var decompressor = Decompressor{ .none = {} };
defer decompressor.deinit();
try decompressor.updateBuffers(this.encoding, this.compressed_body.list.items, out);
try decompressor.readAll(true);
log("decompress({s}, {d} bytes) = {d} bytes", .{ @tagName(this.encoding), this.compressed_body.list.items.len, out.list.items.len });
_ = count.fetchAdd(1, .monotonic);
}
pub fn runFromThreadPool(task: *bun.ThreadPool.Task) void {
_ = task; // autofix
@panic("This should never be called");
}
};
pub fn abortReason(this: *const HTTPClientResult) ?JSC.CommonAbortReason {
if (this.isTimeout()) {
return .Timeout;
@@ -3635,6 +3699,27 @@ pub fn toResult(this: *HTTPClient) HTTPClientResult {
else
.{ .unknown = {} };
const decompression_task: HTTPClientResult.DecompressionTask = brk: {
if (!this.state.flags.should_defer_decompression_to_another_thread) {
break :brk HTTPClientResult.DecompressionTask{
.pending = false,
.compressed_body = MutableString{ .allocator = bun.default_allocator, .list = .{} },
.encoding = .identity,
};
}
const compressed_body = this.state.compressed_body;
this.state.compressed_body = .{
.allocator = bun.default_allocator,
.list = .{},
};
this.state.flags.should_defer_decompression_to_another_thread = false;
break :brk HTTPClientResult.DecompressionTask{
.pending = true,
.compressed_body = compressed_body,
.encoding = this.state.encoding,
};
};
var certificate_info: ?CertificateInfo = null;
if (this.state.certificate_info) |info| {
// transfer owner ship of the certificate info here
@@ -3652,6 +3737,7 @@ pub fn toResult(this: *HTTPClient) HTTPClientResult {
.has_more = certificate_info != null or (this.state.fail == null and !this.state.isDone()),
.body_size = body_size,
.certificate_info = null,
.decompression_task = decompression_task,
};
}
return HTTPClientResult{
@@ -3663,6 +3749,7 @@ pub fn toResult(this: *HTTPClient) HTTPClientResult {
.has_more = certificate_info != null or (this.state.fail == null and !this.state.isDone()),
.body_size = body_size,
.certificate_info = certificate_info,
.decompression_task = decompression_task,
};
}
@@ -3699,7 +3786,7 @@ fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const
if (this.state.flags.is_redirect_pending) return;
if (this.state.encoding.isCompressed()) {
try this.state.decompressBytes(incoming_data, this.state.body_out_str.?, true);
try this.state.decompressBytes(incoming_data, this.state.body_out_str.?, true, !this.state.flags.has_streamed_response_body);
} else {
try this.state.getBodyBuffer().appendSliceExact(incoming_data);
}
@@ -3748,11 +3835,11 @@ fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []con
const is_done = content_length != null and this.state.total_body_received >= content_length.?;
if (is_done or this.signals.get(.body_streaming) or content_length == null) {
const is_final_chunk = is_done;
const processed = try this.state.processBodyBuffer(buffer.*, is_final_chunk);
const processed = try this.state.processBodyBuffer(buffer.*, is_final_chunk, is_final_chunk and !this.state.flags.has_streamed_response_body);
// We can only use the libdeflate fast path when we are not streaming
// If we ever call processBodyBuffer again, it cannot go through the fast path.
this.state.flags.is_libdeflate_fast_path_disabled = true;
this.state.flags.has_streamed_response_body = true;
if (this.progress_node) |progress| {
progress.activate();
@@ -3813,8 +3900,8 @@ fn handleResponseBodyChunkedEncodingFromMultiplePackets(
// streaming chunks
if (this.signals.get(.body_streaming)) {
// If we're streaming, we cannot use the libdeflate fast path
this.state.flags.is_libdeflate_fast_path_disabled = true;
return try this.state.processBodyBuffer(buffer, false);
this.state.flags.has_streamed_response_body = true;
return try this.state.processBodyBuffer(buffer, false, false);
}
return false;
@@ -3825,6 +3912,7 @@ fn handleResponseBodyChunkedEncodingFromMultiplePackets(
_ = try this.state.processBodyBuffer(
buffer,
true,
!this.state.flags.has_streamed_response_body,
);
if (this.progress_node) |progress| {
@@ -3894,9 +3982,9 @@ fn handleResponseBodyChunkedEncodingFromSinglePacket(
// streaming chunks
if (this.signals.get(.body_streaming)) {
// If we're streaming, we cannot use the libdeflate fast path
this.state.flags.is_libdeflate_fast_path_disabled = true;
this.state.flags.has_streamed_response_body = true;
return try this.state.processBodyBuffer(body_buffer.*, true);
return try this.state.processBodyBuffer(body_buffer.*, true, false);
}
return false;

View File

@@ -267,13 +267,55 @@ const NetworkTask = struct {
};
pub const DedupeMap = std.HashMap(u64, DedupeMapEntry, IdentityContext(u64), 80);
pub fn notify(this: *NetworkTask, async_http: *AsyncHTTP, _: anytype) void {
const NetworkTaskDecompressor = struct {
task: bun.ThreadPool.Task = .{ .callback = &runFromThreadPool },
temporary_http: AsyncHTTP = undefined,
http_client_result: HTTP.HTTPClientResult,
network_task: *NetworkTask,
pub usingnamespace bun.New(@This());
pub fn deinit(this: *NetworkTaskDecompressor) void {
this.http_client_result.decompression_task.compressed_body.deinit();
this.destroy();
}
pub fn runFromThreadPool(task: *bun.ThreadPool.Task) void {
const this: *NetworkTaskDecompressor = @fieldParentPtr("task", task);
const network_task = this.network_task;
defer this.deinit();
this.http_client_result.decompression_task.decompress(this.temporary_http.response_buffer) catch |err| {
this.temporary_http.err = err;
this.temporary_http.response = null;
};
network_task.onHTTPNetworkTaskComplete(&this.temporary_http);
}
};
pub fn onHTTPNetworkTaskComplete(this: *NetworkTask, async_http: *AsyncHTTP) void {
defer this.package_manager.wake();
async_http.real.?.* = async_http.*;
async_http.real.?.response_buffer = async_http.response_buffer;
this.package_manager.async_network_task_queue.push(this);
}
pub fn notify(this: *NetworkTask, async_http: *AsyncHTTP, result: HTTP.HTTPClientResult) void {
if (result.decompression_task.pending) {
async_http.real.?.* = async_http.*;
async_http.real.?.response_buffer = async_http.response_buffer;
const network_task_decompressor = NetworkTaskDecompressor.new(.{
.network_task = this,
.temporary_http = async_http.*,
.http_client_result = result,
});
this.package_manager.thread_pool.schedule(ThreadPool.Batch.from(&network_task_decompressor.task));
} else {
this.onHTTPNetworkTaskComplete(async_http);
}
}
pub const Authorization = enum {
no_authorization,
allow_authorization,
@@ -8763,6 +8805,7 @@ pub const PackageManager = struct {
break :brk default_max_simultaneous_requests_for_bun_install;
}, .monotonic);
bun.http.allow_multithreaded_decompression = true;
HTTP.HTTPThread.init(&.{
.ca = ca,
.abs_ca_file_name = abs_ca_file_name,

View File

@@ -141,3 +141,5 @@ export const isModuleResolveFilenameSlowPathEnabled: () => boolean = $newCppFunc
"jsFunctionIsModuleResolveFilenameSlowPathEnabled",
0,
);
export const createInternalStatsObject = $newZigFunction("analytics_thread.zig", "createInternalStatsObject", 1);

View File

@@ -1,13 +1,101 @@
import { Socket } from "bun";
import { beforeAll, expect, it } from "bun:test";
import { gcTick } from "harness";
import { createInternalStatsObject } from "bun:internal-for-testing";
import path from "path";
const gzipped = path.join(import.meta.dir, "fixture.html.gz");
let gzippedBlob: Blob;
const html = path.join(import.meta.dir, "fixture.html");
let htmlText: string;
beforeAll(async () => {
htmlText = (await Bun.file(html).text()).replace(/\r\n/g, "\n");
gzippedBlob = new Blob([await Bun.file(gzipped).arrayBuffer()]);
});
it("many concurrent decompression tasks that read to completion", async () => {
using server = Bun.serve({
port: 0,
async fetch(req) {
await Bun.sleep(0);
return new Response(gzippedBlob, {
headers: {
"Content-Encoding": "gzip",
"Content-Type": "text/html; charset=utf-8",
},
});
},
});
const initialStats = createInternalStatsObject();
for (let j = 0; j < 10; j++) {
const count = 40;
const promises = [];
for (let i = 0; i < count; i++) {
promises.push(fetch(server.url).then(res => res.text()));
}
const results = await Promise.all(promises);
for (const text of results) {
expect(text).toBe(htmlText);
}
}
const finalStats = createInternalStatsObject();
console.log(finalStats);
expect(finalStats.concurrentDecompressionCount).toBeGreaterThan(
// Not all will end up being concurrently decompressed, and that's okay.
initialStats.concurrentDecompressionCount + 10 * 20,
);
});
it("many concurrent decompression tasks that ignore the body", async () => {
const arrayBuffer = await gzippedBlob.arrayBuffer();
let promises: Promise<void>[] = [];
let allPromises = Promise.withResolvers();
using server = Bun.serve({
port: 0,
async fetch(req) {
await Bun.sleep(0);
return new Response(
new ReadableStream({
type: "direct",
async pull(controller) {
const defer = Promise.withResolvers();
promises.push(defer.promise);
await Bun.sleep(1);
await controller.write(arrayBuffer);
await controller.end();
defer.resolve();
if (promises.length === 40) {
allPromises.resolve();
}
},
}),
{
headers: {
"Content-Encoding": "gzip",
"Content-Type": "text/html; charset=utf-8",
},
},
);
},
});
for (let j = 0; j < 10; j++) {
const count = 40;
promises = [];
allPromises = Promise.withResolvers();
function wrap() {
for (let i = 0; i < count; i++) {
fetch(server.url).then(r => {});
}
}
wrap();
Bun.gc(true);
await allPromises.promise;
await Promise.all(promises);
}
});
it("fetch() with a buffered gzip response works (one chunk)", async () => {
@@ -15,7 +103,6 @@ it("fetch() with a buffered gzip response works (one chunk)", async () => {
port: 0,
async fetch(req) {
gcTick(true);
return new Response(require("fs").readFileSync(gzipped), {
headers: {
"Content-Encoding": "gzip",
@@ -24,19 +111,17 @@ it("fetch() with a buffered gzip response works (one chunk)", async () => {
});
},
});
gcTick(true);
const res = await fetch(server.url, { verbose: true });
gcTick(true);
const arrayBuffer = await res.arrayBuffer();
const clone = new Buffer(arrayBuffer);
gcTick(true);
await (async function () {
const second = Buffer.from(htmlText);
gcTick(true);
expect(second.equals(clone)).toBe(true);
})();
gcTick(true);
});
it("fetch() with a redirect that returns a buffered gzip response works (one chunk)", async () => {
@@ -122,104 +207,122 @@ it("fetch() with a gzip response works (one chunk, streamed, with a delay)", asy
it("fetch() with a gzip response works (multiple chunks, TCP server)", async done => {
const compressed = await Bun.file(gzipped).arrayBuffer();
var socketToClose!: Socket;
let pending,
pendingChunks = [];
const server = Bun.listen({
hostname: "localhost",
port: 0,
socket: {
drain(socket) {
if (pending) {
while (pendingChunks.length) {
const chunk = pendingChunks.shift();
const written = socket.write(chunk);
async function iterate() {
var socketToClose!: Socket;
let pending,
pendingChunks = [];
const server = Bun.listen({
hostname: "localhost",
port: 0,
socket: {
drain(socket) {
if (pending) {
while (pendingChunks.length) {
const chunk = pendingChunks.shift();
const written = socket.write(chunk);
if (written < chunk.length) {
pendingChunks.push(chunk.slice(written));
return;
if (written < chunk.length) {
pendingChunks.push(chunk.slice(written));
return;
}
}
}
const resolv = pending;
pending = null;
resolv();
}
},
async open(socket) {
socketToClose = socket;
var corked: any[] = [];
var cork = true;
let written = 0;
let pendingChunks = [];
async function write(chunk: any) {
let defer = Promise.withResolvers();
if (cork) {
corked.push(chunk);
}
if (!cork && corked.length) {
const toWrite = corked.join("");
const wrote = socket.write(toWrite);
if (wrote !== toWrite.length) {
pendingChunks.push(toWrite.slice(wrote));
}
corked.length = 0;
}
if (!cork) {
if (pendingChunks.length) {
pendingChunks.push(chunk);
pending = defer.resolve;
await defer.promise;
defer = Promise.withResolvers();
pending = defer.resolve;
}
const written = socket.write(chunk);
if (written < chunk.length) {
console.log("written", written);
pendingChunks.push(chunk.slice(written));
pending = defer.resolve;
await defer.promise;
defer = Promise.withResolvers();
pending = defer.resolve;
}
}
const promise = defer.promise;
if (pendingChunks.length) {
pending = promise;
await promise;
} else {
const resolv = pending;
pending = null;
resolv();
}
}
await write("HTTP/1.1 200 OK\r\n");
await write("Content-Encoding: gzip\r\n");
await write("Content-Type: text/html; charset=utf-8\r\n");
await write("Content-Length: " + compressed.byteLength + "\r\n");
await write("X-WTF: " + "lol".repeat(1000) + "\r\n");
await write("\r\n");
for (var i = 100; i < compressed.byteLength; i += 100) {
cork = false;
await write(compressed.slice(i - 100, i));
}
await write(compressed.slice(i - 100));
await write("\r\n");
},
async open(socket) {
socketToClose = socket;
socket.flush();
var corked: any[] = [];
var cork = true;
let written = 0;
let pendingChunks = [];
async function write(chunk: any) {
let defer = Promise.withResolvers();
if (cork) {
corked.push(chunk);
}
if (!cork && corked.length) {
const toWrite = corked.join("");
const wrote = socket.write(toWrite);
if (wrote !== toWrite.length) {
pendingChunks.push(toWrite.slice(wrote));
}
corked.length = 0;
}
if (!cork) {
if (pendingChunks.length) {
pendingChunks.push(chunk);
pending = defer.resolve;
await defer.promise;
defer = Promise.withResolvers();
pending = defer.resolve;
}
const written = socket.write(chunk);
if (written < chunk.length) {
console.log("written", written);
pendingChunks.push(chunk.slice(written));
pending = defer.resolve;
await defer.promise;
defer = Promise.withResolvers();
pending = defer.resolve;
}
}
const promise = defer.promise;
if (pendingChunks.length) {
pending = promise;
await promise;
} else {
pending = null;
}
}
await write("HTTP/1.1 200 OK\r\n");
await write("Content-Encoding: gzip\r\n");
await write("Content-Type: text/html; charset=utf-8\r\n");
await write("Content-Length: " + compressed.byteLength + "\r\n");
await write("X-WTF: " + "lol".repeat(1000) + "\r\n");
await write("\r\n");
for (var i = 100; i < compressed.byteLength; i += 100) {
cork = false;
await write(compressed.slice(i - 100, i));
}
await write(compressed.slice(i - 100));
await write("\r\n");
socket.flush();
},
drain(socket) {},
},
drain(socket) {},
},
});
await 1;
});
await 1;
const res = await fetch(`http://${server.hostname}:${server.port}`);
const text = (await res.text()).replace(/\r\n/g, "\n");
expect(text).toEqual(htmlText);
socketToClose.end();
server.stop();
}
// Run this once to test gzip decompression without concurrency
{
await iterate();
Bun.sleep(2);
}
// Run this 10 times in parallel to test gzip decompression concurrency
{
const promises = [];
for (let i = 0; i < 10; i++) {
promises.push(iterate());
}
await Promise.all(promises);
}
const res = await fetch(`http://${server.hostname}:${server.port}`);
const text = (await res.text()).replace(/\r\n/g, "\n");
expect(text).toEqual(htmlText);
socketToClose.end();
server.stop();
done();
});