Compare commits

...

4 Commits

Author SHA1 Message Date
Ciro Spaciari
5647309650 test 2025-10-08 18:48:11 -07:00
Ciro Spaciari
7d89e9f282 refactor and investigation 2025-10-08 17:51:26 -07:00
Ciro Spaciari
210d620828 test 2025-10-08 00:42:28 -07:00
Ciro Spaciari
3721da2663 wip 2025-10-07 18:48:51 -07:00
4 changed files with 90 additions and 121 deletions

View File

@@ -696,8 +696,10 @@ pub inline fn packageManager(this: *VirtualMachine) *PackageManager {
pub fn garbageCollect(this: *const VirtualMachine, sync: bool) usize {
@branchHint(.cold);
Global.mimalloc_cleanup(false);
if (sync)
if (sync) {
return this.global.vm().runGC(true);
}
this.global.vm().collectAsync();
return this.global.vm().heapSize();

View File

@@ -78,92 +78,50 @@ pub fn unpipeWithoutDeref(this: *@This()) void {
pub fn onData(
this: *@This(),
stream: streams.Result,
stream_result: streams.Result,
allocator: std.mem.Allocator,
) void {
var stream = stream_result;
jsc.markBinding(@src());
if (this.done) {
if (stream.isDone() and (stream == .owned or stream == .owned_and_done)) {
if (stream == .owned) allocator.free(stream.owned.slice());
if (stream == .owned_and_done) allocator.free(stream.owned_and_done.slice());
}
this.has_received_last_chunk = stream.isDone();
log("ByteStream.onData already done... do nothing", .{});
return;
}
bun.assert(!this.has_received_last_chunk or stream == .err);
this.has_received_last_chunk = stream.isDone();
if (this.pipe.ctx) |ctx| {
this.pipe.onPipe.?(ctx, stream, allocator);
if (this.done) {
log("ByteStream.onData already done... do nothing", .{});
stream.deinit(allocator);
return;
}
const chunk = stream.slice();
if (this.pipe.ctx) |ctx| {
this.pipe.onPipe.?(ctx, stream_result, allocator);
return;
}
if (this.buffer_action) |*action| {
if (stream == .err) {
defer {
this.buffer.clearAndFree();
this.pending.result.deinit();
this.pending.result = .{ .done = {} };
this.buffer_action = null;
}
if (stream_result == .err) {
log("ByteStream.onData err action.reject()", .{});
action.reject(this.parent().globalThis, stream.err);
this.buffer.clearAndFree();
this.pending.result.deinit(allocator);
this.pending.result = .{ .done = {} };
this.buffer_action = null;
return;
}
bun.handleOom(this.append(&stream, 0, allocator));
if (this.has_received_last_chunk) {
defer {
this.buffer_action = null;
}
if (this.buffer.capacity == 0 and stream == .done) {
log("ByteStream.onData done and action.fulfill()", .{});
var blob = this.toAnyBlob().?;
action.fulfill(this.parent().globalThis, &blob);
return;
}
if (this.buffer.capacity == 0 and stream == .owned_and_done) {
log("ByteStream.onData owned_and_done and action.fulfill()", .{});
this.buffer = std.ArrayList(u8).fromOwnedSlice(bun.default_allocator, @constCast(chunk));
var blob = this.toAnyBlob().?;
action.fulfill(this.parent().globalThis, &blob);
return;
}
defer {
if (stream == .owned_and_done or stream == .owned) {
allocator.free(stream.slice());
}
}
log("ByteStream.onData appendSlice and action.fulfill()", .{});
bun.handleOom(this.buffer.appendSlice(chunk));
defer this.buffer_action = null;
log("ByteStream.onData action.fulfill()", .{});
var blob = this.toAnyBlob().?;
action.fulfill(this.parent().globalThis, &blob);
return;
} else {
bun.handleOom(this.buffer.appendSlice(chunk));
if (stream == .owned_and_done or stream == .owned) {
allocator.free(stream.slice());
}
}
return;
}
if (this.pending.state == .pending) {
bun.assert(this.buffer.items.len == 0);
const chunk = stream.slice();
const to_copy = this.pending_buffer[0..@min(chunk.len, this.pending_buffer.len)];
const pending_buffer_len = this.pending_buffer.len;
bun.assert(to_copy.ptr != chunk.ptr);
@@ -172,64 +130,58 @@ pub fn onData(
const is_really_done = this.has_received_last_chunk and to_copy.len <= pending_buffer_len;
if (is_really_done) {
this.done = true;
this.pending.result = pending_result: {
if (is_really_done) {
this.done = true;
if (to_copy.len == 0) {
if (stream == .err) {
this.pending.result = .{
.err = stream.err,
};
} else {
this.pending.result = .{
if (to_copy.len == 0) {
if (stream == .err) {
break :pending_result .{
.err = stream.err,
};
}
break :pending_result .{
.done = {},
};
}
} else {
this.pending.result = .{
break :pending_result .{
.into_array_and_done = .{
.value = this.value(),
.len = @as(Blob.SizeType, @truncate(to_copy.len)),
},
};
}
} else {
this.pending.result = .{
break :pending_result .{
.into_array = .{
.value = this.value(),
.len = @as(Blob.SizeType, @truncate(to_copy.len)),
},
};
}
};
const remaining = chunk[to_copy.len..];
if (remaining.len > 0 and chunk.len > 0)
this.append(stream, to_copy.len, chunk, allocator) catch @panic("Out of memory while copying request body");
log("ByteStream.onData pending.run()", .{});
if (remaining.len > 0) {
bun.handleOom(this.append(&stream, to_copy.len, allocator));
}
this.pending.run();
return;
}
log("ByteStream.onData no action just append", .{});
this.append(stream, 0, chunk, allocator) catch @panic("Out of memory while copying request body");
bun.handleOom(this.append(&stream, 0, allocator));
}
pub fn append(
this: *@This(),
stream: streams.Result,
stream: *streams.Result,
offset: usize,
base_address: []const u8,
allocator: std.mem.Allocator,
) !void {
var stream_ = stream;
const chunk = stream.slice()[offset..];
log("ByteStream.append stream_result={s} offset={d}", .{ @tagName(stream.*), offset });
const slice = stream.slice();
const chunk = slice[offset..];
if (chunk.len == 0) return;
if (this.buffer.capacity == 0) {
switch (stream_) {
switch (stream.*) {
.owned => |*owned| {
this.buffer = owned.moveToListManaged(allocator);
this.offset += offset;
@@ -251,13 +203,13 @@ pub fn append(
return;
}
switch (stream) {
switch (stream.*) {
.temporary_and_done, .temporary => {
try this.buffer.appendSlice(chunk);
},
.owned_and_done, .owned => {
try this.buffer.appendSlice(chunk);
allocator.free(@constCast(base_address));
stream.deinit(allocator);
},
.err => {
if (this.buffer_action != null) {
@@ -270,8 +222,6 @@ pub fn append(
// We don't support the rest of these yet
else => unreachable,
}
return;
}
pub fn setValue(this: *@This(), view: jsc.JSValue) void {
@@ -307,6 +257,8 @@ pub fn onPull(this: *@This(), buffer: []u8, view: jsc.JSValue) streams.Result {
if (this.has_received_last_chunk and remaining_in_buffer.len == 0) {
this.buffer.clearAndFree();
this.pending.result.deinit(bun.default_allocator);
this.pending.result = .{ .done = {} };
this.done = true;
return .{
@@ -326,6 +278,10 @@ pub fn onPull(this: *@This(), buffer: []u8, view: jsc.JSValue) streams.Result {
}
if (this.has_received_last_chunk) {
this.buffer.clearAndFree();
this.pending.result.deinit(bun.default_allocator);
this.pending.result = .{ .done = {} };
this.done = true;
return .{
.done = {},
};
@@ -341,15 +297,14 @@ pub fn onPull(this: *@This(), buffer: []u8, view: jsc.JSValue) streams.Result {
pub fn onCancel(this: *@This()) void {
jsc.markBinding(@src());
const view = this.value();
if (this.buffer.capacity > 0) this.buffer.clearAndFree();
this.buffer.clearAndFree();
this.done = true;
this.pending_value.deinit();
if (view != .zero) {
this.pending_buffer = &.{};
this.pending.result.deinit();
this.pending.result = .{ .done = {} };
this.pending_buffer = &.{};
this.pending.result.deinit(bun.default_allocator);
this.pending.result = .{ .done = {} };
if (this.value() != .zero) {
this.pending.run();
}
@@ -367,15 +322,14 @@ pub fn memoryCost(this: *const @This()) usize {
pub fn deinit(this: *@This()) void {
jsc.markBinding(@src());
if (this.buffer.capacity > 0) this.buffer.clearAndFree();
this.buffer.clearAndFree();
this.pending_value.deinit();
this.pending_buffer = &.{};
this.pending.result.deinit(bun.default_allocator);
this.pending.result = .{ .done = {} };
if (!this.done) {
this.done = true;
this.pending_buffer = &.{};
this.pending.result.deinit();
this.pending.result = .{ .done = {} };
if (this.pending.state == .pending and this.pending.future == .promise) {
// We must never run JavaScript inside of a GC finalizer.
this.pending.runOnNextTick();
@@ -405,13 +359,15 @@ pub fn toAnyBlob(this: *@This()) ?Blob.Any {
.capacity = 0,
};
this.done = true;
this.pending.result.deinit();
this.pending.result.deinit(bun.default_allocator);
this.pending.result = .{ .done = {} };
this.parent().is_closed = true;
return .{ .InternalBlob = .{
.bytes = buffer,
.was_string = false,
} };
return .{
.InternalBlob = .{
.bytes = buffer,
.was_string = false,
},
};
}
return null;
@@ -424,7 +380,7 @@ pub fn toBufferedValue(this: *@This(), globalThis: *jsc.JSGlobalObject, action:
if (this.pending.result == .err) {
const err, _ = this.pending.result.err.toJSWeak(globalThis);
this.pending.result.deinit();
this.pending.result.deinit(bun.default_allocator);
this.done = true;
this.buffer.clearAndFree();
return jsc.JSPromise.dangerouslyCreateRejectedPromiseValueWithoutNotifyingVM(globalThis, err);

View File

@@ -432,9 +432,13 @@ pub const FetchTasklet = struct {
readable.ptr.Bytes.onData(
.{
.owned_and_done = bun.ByteList.moveFromList(scheduled_response_buffer),
// Investigate why owned_and_done is using more memory than temporary_and_done
// .temporary_and_done = bun.ByteList.fromBorrowedSliceDangerous(chunk),
},
bun.default_allocator,
this.memory_reporter.allocator(),
);
scheduled_response_buffer.deinit(this.memory_reporter.allocator());
}
return;
}

View File

@@ -205,10 +205,10 @@ pub const Result = union(Tag) {
into_array: IntoArray,
into_array_and_done: IntoArray,
pub fn deinit(this: *Result) void {
pub fn deinit(this: *Result, allocator: std.mem.Allocator) void {
switch (this.*) {
.owned => |*owned| owned.clearAndFree(bun.default_allocator),
.owned_and_done => |*owned_and_done| owned_and_done.clearAndFree(bun.default_allocator),
.owned => |*owned| owned.clearAndFree(allocator),
.owned_and_done => |*owned_and_done| owned_and_done.clearAndFree(allocator),
.err => |err| {
if (err == .JSValue) {
err.JSValue.unprotect();
@@ -273,6 +273,16 @@ pub const Result = union(Tag) {
};
}
pub fn allocatedSlice(this: *const Result) []const u8 {
return switch (this.*) {
.owned => |owned| owned.allocatedSlice(),
.owned_and_done => |owned_and_done| owned_and_done.allocatedSlice(),
.temporary_and_done => |temporary_and_done| temporary_and_done.allocatedSlice(),
.temporary => |temporary| temporary.allocatedSlice(),
else => "",
};
}
pub const Writable = union(Result.Tag) {
pending: *Writable.Pending,
@@ -559,15 +569,12 @@ pub const Result = union(Tag) {
pub fn toJS(this: *const Result, globalThis: *JSGlobalObject) bun.JSError!JSValue {
if (jsc.VirtualMachine.get().isShuttingDown()) {
var that = this.*;
that.deinit();
that.deinit(bun.default_allocator);
return .zero;
}
switch (this.*) {
.owned => |list| {
return jsc.ArrayBuffer.fromBytes(list.slice(), .Uint8Array).toJS(globalThis);
},
.owned_and_done => |list| {
.owned, .owned_and_done => |list| {
return jsc.ArrayBuffer.fromBytes(list.slice(), .Uint8Array).toJS(globalThis);
},
.temporary => |temp| {