Compare commits

...

5 Commits

Author SHA1 Message Date
Don Isaac
a9666aebec Merge branch 'main' of github.com:oven-sh/bun into don/fix/blob-leak 2025-04-09 11:40:14 -07:00
Don Isaac
8a1c911e7b wip: start using Arc(Source) in Blob 2025-03-31 16:00:33 -07:00
Don Isaac
4ebde315ca let intrusive ref_count field be an atomic value 2025-03-31 14:39:23 -07:00
Don Isaac
f64e252379 support intrusive Rc pointers 2025-03-31 14:32:47 -07:00
Don Isaac
1e3f5b18c6 add Rc and Arc 2025-03-31 14:30:54 -07:00
6 changed files with 361 additions and 40 deletions

View File

@@ -29,6 +29,7 @@ const JSPromise = JSC.JSPromise;
const JSValue = JSC.JSValue;
const JSGlobalObject = JSC.JSGlobalObject;
const NullableAllocator = bun.NullableAllocator;
const Arc = bun.ptr.Arc;
const VirtualMachine = JSC.VirtualMachine;
const Task = JSC.Task;
@@ -83,7 +84,7 @@ pub const Blob = struct {
/// When set, the blob will be freed on finalization callbacks
/// If the blob is contained in Response or Request, this must be null
allocator: ?std.mem.Allocator = null,
store: ?*Store = null,
store: ?Arc(Store) = null,
content_type: string = "",
content_type_allocated: bool = false,
content_type_was_set: bool = false,
@@ -124,7 +125,10 @@ pub const Blob = struct {
}
pub fn hasContentTypeFromUser(this: *const Blob) bool {
return this.content_type_was_set or (this.store != null and (this.store.?.data == .file or this.store.?.data == .s3));
return this.content_type_was_set or if (this.store) |store| switch (store.get().data) {
.s3, .file => true,
else => false,
} else false;
}
pub fn contentTypeOrMimeType(this: *const Blob) ?[]const u8 {
@@ -132,7 +136,7 @@ pub const Blob = struct {
return this.content_type;
}
if (this.store) |store| {
switch (store.data) {
switch (store.get().data) {
.file => |file| {
return file.mime_type.value;
},
@@ -148,7 +152,7 @@ pub const Blob = struct {
pub fn isBunFile(this: *const Blob) bool {
const store = this.store orelse return false;
return store.data == .file;
return store.get().data == .file;
}
pub fn doReadFromS3(this: *Blob, comptime Function: anytype, global: *JSGlobalObject) JSValue {
@@ -5244,7 +5248,7 @@ pub const Blob = struct {
return tryCreate(bytes_, allocator_, globalThis, was_string) catch bun.outOfMemory();
}
pub fn initWithStore(store: *Blob.Store, globalThis: *JSGlobalObject) Blob {
pub fn initWithStore(store: Arc(Blob.Store), globalThis: *JSGlobalObject) Blob {
return Blob{
.size = store.size(),
.store = store,
@@ -5274,7 +5278,7 @@ pub const Blob = struct {
}
pub fn detach(this: *Blob) void {
if (this.store != null) this.store.?.deref();
if (this.store) |*s| s.deinit();
this.store = null;
}
@@ -5286,7 +5290,8 @@ pub const Blob = struct {
}
pub fn dupeWithContentType(this: *const Blob, include_content_type: bool) Blob {
if (this.store != null) this.store.?.ref();
// SAFETY: creating a copy of `this`, so the new reference belongs to the new Blob.
if (this.store) |*s| s.dangerouslyIncrementRef();
var duped = this.*;
if (duped.content_type_allocated and duped.allocator != null and !include_content_type) {

View File

@@ -15,6 +15,7 @@ const JSPromise = JSC.JSPromise;
const JSGlobalObject = JSC.JSGlobalObject;
const ZigString = JSC.ZigString;
const libuv = bun.windows.libuv;
const Arc = bun.ptr.Arc;
const log = bun.Output.scoped(.ReadFile, true);
@@ -64,7 +65,7 @@ pub const ReadFileTask = JSC.WorkTask(ReadFile);
pub const ReadFile = struct {
file_store: FileStore,
byte_store: ByteStore = ByteStore{ .allocator = bun.default_allocator },
store: ?*Store = null,
store: ?Arc(Store) = null,
offset: SizeType = 0,
max_length: SizeType = Blob.max_size,
total_size: SizeType = Blob.max_size,
@@ -100,7 +101,7 @@ pub const ReadFile = struct {
pub fn createWithCtx(
_: std.mem.Allocator,
store: *Store,
store: Arc(Store),
onReadFileContext: *anyopaque,
onCompleteCallback: ReadFileOnReadFileCallback,
off: SizeType,
@@ -110,20 +111,20 @@ pub const ReadFile = struct {
@compileError("Do not call ReadFile.createWithCtx on Windows, see ReadFileUV");
const read_file = bun.new(ReadFile, ReadFile{
.file_store = store.data.file,
// TODO: do not store this separately from .store
.file_store = store.get().data.file,
.offset = off,
.max_length = max_len,
.store = store,
.onCompleteCtx = onReadFileContext,
.onCompleteCallback = onCompleteCallback,
});
store.ref();
return read_file;
}
pub fn create(
allocator: std.mem.Allocator,
store: *Store,
store: Arc(Store),
off: SizeType,
max_len: SizeType,
comptime Context: type,
@@ -258,14 +259,14 @@ pub const ReadFile = struct {
const cb = this.onCompleteCallback;
const cb_ctx = this.onCompleteCtx;
if (this.store == null and this.system_error != null) {
const system_error = this.system_error.?;
bun.destroy(this);
cb(cb_ctx, ReadFileResultType{ .err = system_error });
return;
} else if (this.store == null) {
bun.destroy(this);
var store = this.store orelse {
if (this.system_error) |system_error| {
bun.destroy(this);
cb(cb_ctx, ReadFileResultType{ .err = system_error });
return;
}
if (Environment.allow_assert) @panic("assertion failure - store should not be null");
bun.destroy(this);
cb(cb_ctx, ReadFileResultType{
.err = SystemError{
.code = bun.String.static("INTERNAL_ERROR"),
@@ -274,12 +275,11 @@ pub const ReadFile = struct {
},
});
return;
}
};
var store = this.store.?;
const buf = this.buffer.items;
defer store.deref();
defer store.deinit();
const system_error = this.system_error;
const total_size = this.total_size;
bun.destroy(this);
@@ -342,8 +342,9 @@ pub const ReadFile = struct {
};
if (this.store) |store| {
if (store.data == .file) {
store.data.file.last_modified = JSC.toJSTime(stat.mtime().sec, stat.mtime().nsec);
const data = &store.getMut().data;
if (data == .file) {
data.file.last_modified = JSC.toJSTime(stat.mtime().sec, stat.mtime().nsec);
}
}
@@ -547,7 +548,7 @@ pub const ReadFileUV = struct {
loop: *libuv.Loop,
file_store: FileStore,
byte_store: ByteStore = ByteStore{ .allocator = bun.default_allocator },
store: *Store,
store: Arc(Store),
offset: SizeType = 0,
max_length: SizeType = Blob.max_size,
total_size: SizeType = Blob.max_size,
@@ -565,7 +566,7 @@ pub const ReadFileUV = struct {
req: libuv.fs_t = std.mem.zeroes(libuv.fs_t),
pub fn start(loop: *libuv.Loop, store: *Store, off: SizeType, max_len: SizeType, comptime Handler: type, handler: *anyopaque) void {
pub fn start(loop: *libuv.Loop, store: Arc(Store), off: SizeType, max_len: SizeType, comptime Handler: type, handler: *anyopaque) void {
log("ReadFileUV.start", .{});
var this = bun.new(ReadFileUV, .{
.loop = loop,
@@ -576,14 +577,13 @@ pub const ReadFileUV = struct {
.on_complete_data = handler,
.on_complete_fn = @ptrCast(&Handler.run),
});
store.ref();
this.getFd(onFileOpen);
}
pub fn finalize(this: *ReadFileUV) void {
log("ReadFileUV.finalize", .{});
defer {
this.store.deref();
this.store.deinit();
this.req.deinit();
bun.destroy(this);
log("ReadFileUV.finalize destroy", .{});
@@ -657,8 +657,9 @@ pub const ReadFileUV = struct {
log("stat: {any}", .{stat});
// keep in sync with resolveSizeAndLastModified
if (this.store.data == .file) {
this.store.data.file.last_modified = JSC.toJSTime(stat.mtime().sec, stat.mtime().nsec);
const data = &this.store.getMut().data;
if (data == .file) {
data.file.last_modified = JSC.toJSTime(stat.mtime().sec, stat.mtime().nsec);
}
if (bun.S.ISDIR(@intCast(stat.mode))) {

View File

@@ -44,6 +44,8 @@ const Syscall = bun.sys;
const uv = bun.windows.libuv;
const AnyBlob = bun.JSC.WebCore.AnyBlob;
const Arc = bun.ptr.Arc;
pub const ReadableStream = struct {
value: JSValue,
ptr: Source,
@@ -339,7 +341,7 @@ pub const ReadableStream = struct {
var store = blob.store orelse {
return ReadableStream.empty(globalThis);
};
switch (store.data) {
switch (store.getMut().data) {
.bytes => {
var reader = ByteBlobLoader.Source.new(
.{
@@ -359,11 +361,10 @@ pub const ReadableStream = struct {
.max_size = if (blob.size != Blob.max_size) blob.size else null,
.lazy = .{
.blob = store,
.blob = store.clone(),
},
},
});
store.ref();
return reader.toReadableStream(globalThis);
},
@@ -373,6 +374,7 @@ pub const ReadableStream = struct {
const proxy = globalThis.bunVM().transpiler.env.getHttpProxy(true, null);
const proxy_url = if (proxy) |p| p.href else null;
// NOTE: contains a reference to blob store field, but store isn't ref'd. This smells like a bug.
return bun.S3.readableStream(credentials, path, blob.offset, if (blob.size != Blob.max_size) blob.size else null, proxy_url, globalThis);
},
}
@@ -387,7 +389,7 @@ pub const ReadableStream = struct {
var store = blob.store orelse {
return ReadableStream.empty(globalThis);
};
switch (store.data) {
switch (store.getMut().data) {
.file => {
var reader = FileReader.Source.new(.{
.globalThis = globalThis,
@@ -395,11 +397,10 @@ pub const ReadableStream = struct {
.event_loop = JSC.EventLoopHandle.init(globalThis.bunVM().eventLoop()),
.start_offset = offset,
.lazy = .{
.blob = store,
.blob = store.clone(),
},
},
});
store.ref();
return reader.toReadableStream(globalThis);
},
@@ -4067,7 +4068,7 @@ pub const FileReader = struct {
pub const Lazy = union(enum) {
none: void,
blob: *Blob.Store,
blob: Arc(Blob.Store),
const OpenedFileBlob = struct {
fd: bun.FileDescriptor,

View File

@@ -3,7 +3,12 @@ pub const Cow = @import("ptr/Cow.zig").Cow;
pub const CowSlice = @import("ptr/CowSlice.zig").CowSlice;
pub const CowSliceZ = @import("ptr/CowSlice.zig").CowSliceZ;
pub const CowString = CowSlice(u8);
pub const NewRefCounted = @import("ptr/ref_count.zig").NewRefCounted;
pub const NewThreadSafeRefCounted = @import("ptr/ref_count.zig").NewThreadSafeRefCounted;
pub const Rc = @import("ptr/rc.zig").Rc;
pub const Arc = @import("ptr/rc.zig").Arc;
pub const RefCounted = @import("ptr/rc.zig").RefCounted;
pub const TaggedPointer = @import("ptr/tagged_pointer.zig").TaggedPointer;
pub const TaggedPointerUnion = @import("ptr/tagged_pointer.zig").TaggedPointerUnion;

308
src/ptr/rc.zig Normal file
View File

@@ -0,0 +1,308 @@
//! Reference counted pointers.
//!
//! These pointers are a safer, non-invasive alternative to `ptr.NewRefCounted`
//! and `ptr.NewThreadSafeRefCounted`. Prefer using pointers in this module in
//! new code.
//!
//! This module's primary main pointer type is `RefCounted`, which is a
//! reference-counted pointer. `Rc` and its thread-safe counterpart, `Arc`,
//! provide shorthands for creating `RefCounted` pointers with sensible
//! defaults.
//!
//! These docs assume basic familiarity with move semantics.
const bun = @import("root").bun;
const std = @import("std");
const atomic = std.atomic;
const AtomicU32 = atomic.Value(u32);
const AtomicOrder = std.builtin.AtomicOrder;
const Allocator = std.mem.Allocator;
const Output = bun.Output;
const meta = bun.meta;
/// A single-threaded reference-counting pointer. 'Rc' stands for 'Reference
/// Counted'. Inspired by Rust's
/// [`Rc<T>`](https://doc.rust-lang.org/std/rc/index.html) type.
///
/// `Rc(T)` provides shared ownership of a `T` allocated on the heap. Invoking
/// `clone` provides a new pointer to the same allocation. When the last `Rc` to
/// an allocation is `deinit`ed, value stored in that allocation (aka the "inner
/// value") is deallocated.
pub fn Rc(T: type) type {
return RefCounted(T, AutoContext(T), false);
}
/// A thread-safe reference-counting pointer. 'Arc' stands for 'Atomic
/// Reference Counted'. Inspired by Rust's
/// [`Arc<T>`](https://doc.rust-lang.org/std/sync/struct.Arc.html) type.
///
/// The type `Arc(T)` provides shared ownership of a value of type `T`,
/// allocated in the heap. Invoking `clone` on `Arc` produces a new `Arc`
/// instance, which points to the same allocation on the heap as the source
/// `Arc`, while increasing a reference count. When the last `Arc` pointer to a
/// given allocation is destroyed, the value stored in that allocation (often
/// referred to as "inner value") is also dropped.
///
/// The `deinit` function used by `Arc(T)` is inferred from `T`. Types that do
/// not need to deinitialize their fields do not need a `deinit` function. If they
/// do, `deinit` should either
/// * take only a `*T` or `T` as a parameter
/// * Take a `*T` or `T` and an Allocator as parameters
pub fn Arc(T: type) type {
return RefCounted(T, AutoContext(T), true);
}
fn AutoContext(T: type) type {
return struct {
pub const debug_name: ?[:0]const u8 = null;
pub const deinit = if (@hasDecl(T, "deinit")) T.deinit else null;
};
}
/// A reference-counted pointer.
///
/// The `RefCounted` type provides shared ownership of a value of type `T`,
/// allocated in the heap. Invoking `clone` on `RefCounted` produces a new
/// `RefCounted` instance, which points to the same allocation on the heap as
/// the source `RefCounted`, while increasing a reference count. When the last
/// `RefCounted` pointer to a given allocation is destroyed, the value stored in
/// that allocation (often referred to as "inner value") is also dropped.
///
/// ## Deinitialization Behavior
///
/// `RefCounted` takes a `Context` type that may define a `deinit` function to
/// deinitialize a `T`.
///
/// ## Invariants
/// The following invariants must be upheld for `RefCounted` to work safely:
/// - Allocations created by the provided `Allocator` must have `'static`
/// lifetimes. This means, e.g., `StackFallback` cannot be used.
pub fn RefCounted(
/// Type of the value to be reference-counted
T: type,
/// Interface pseudo-type:
/// ```zig
/// const Context = struct {
/// pub const debug_name: ?[:0]const u8;
/// pub const deinit: null;
/// pub const deinit: fn (value: *T) void;
/// pub const deinit: fn (value: *T, allocator: Allocator) void;
/// };
/// ```
/// `deinit`'s `value` parameter may be a `*T` or `T`.
Context: type,
/// When `true`, uses atomic operations to modify the reference count.
comptime sync: bool,
) type {
const output_name: [:0]const u8 =
if (!bun.Environment.enable_logs)
""
else if (@hasDecl(Context, "debug_name") and Context.debug_name != null)
Context.debug_name
else
meta.typeBaseName(@typeName(T));
const log = Output.scoped(output_name, true);
const intrusive = @hasField(T, "ref_count");
const Inner = struct {
/// This is always > 0 unless value is just about to be deallocated.
ref_count: if (intrusive) void else if (sync) AtomicU32 else u32 = if (intrusive) {} else if (sync) .init(1) else 1,
allocator: Allocator,
value: T,
const Self = @This();
fn ref(this: *Self) callconv(bun.callconv_inline) u32 {
var ref_count_field = @field(if (intrusive) this.value else this, "ref_count");
if (comptime sync) {
const prev = ref_count_field.fetchAdd(1, AtomicOrder.acquire);
bun.assertWithLocation(prev > 0, @src());
return prev;
} else {
const prev = ref_count_field;
bun.assertWithLocation(prev > 0, @src());
ref_count_field += 1;
return prev;
}
}
fn deref(this: *Self) callconv(bun.callconv_inline) u32 {
var ref_count_field = @field(if (intrusive) this.value else this, "ref_count");
if (comptime sync) {
const prev = ref_count_field.fetchSub(1, AtomicOrder.release);
bun.assertWithLocation(prev > 0, @src());
return prev;
} else {
const prev = ref_count_field;
bun.assertWithLocation(prev > 0, @src());
ref_count_field -= 1;
return prev;
}
}
fn refCount(this: Self) callconv(bun.callconv_inline) u32 {
const ref_count_field = @field(if (intrusive) this.value else this, "ref_count");
return if (comptime sync) ref_count_field.load(.acq_rel) else ref_count_field;
}
};
return struct {
/// Do not initialize, read, or otherwise access this directly.
/// - Use `.get()` or `.getMut()` to dereference the pointer.
/// - use `.clone()` to obtain a new reference to the value.
__ptr: *Inner,
const Self = @This();
/// Construct a new reference-counted pointer. It takes ownership of
/// `value`. `value` is assumed to be fully initialized.
pub fn init(value: T, allocator: Allocator) Allocator.Error!Self {
const inner = try allocator.create(Inner);
inner.* = .{ .value = value, .allocator = allocator };
return Self{ .__ptr = inner };
}
/// Dereference the pointer, obtaining a read-only reference to the
/// stored value. This is not thread-safe.
pub inline fn get(this: Self) *const T {
// should this use @atomicLoad when `sync` for thread safety?
return &this.__ptr.value;
}
/// Dereference the pointer, obtaining a mutable reference to the stored value.
/// This is not thread-safe.
pub inline fn getMut(this: Self) *T {
return &this.__ptr.value;
}
/// Create a clone of this ref-counted pointer.
///
/// This makes another pointer to the same allocation, incrementing the
/// reference count. No allocations occur.
///
/// Caller owns the returned pointer and must call `deinit` when done.
pub fn clone(this: Self) Self {
const prev = this.__ptr.ref();
log("0x{x} ref {d} + 1 = {d}", .{ @intFromPtr(this.__ptr), prev, prev + 1 });
return this;
}
/// Increment the reference count without obtaining a new pointer.
///
/// While this does not return a new pointer, it _does_ create an owned
/// reference. Caller must ensure the new reference's lifecycle is
/// properly managed (that is, its owner calls `.deinit()`), otherwise a
/// memory leak will occur.
pub fn dangerouslyIncrementRef(this: Self) void {
std.mem.doNotOptimizeAway(this.clone());
}
/// Decrement the reference count without deinitializing the pointer.
/// When the reference count hits 0, the value is deinitialized and the
/// allocation is freed.
///
/// ## Safety
/// Decrementing the reference count below 1 while holding a live
/// reference count is illegal behavior. This pointer's owner must die
/// at the same time as this pointer, otherwise it may hold a dangling
/// pointer.
///
/// Caller should set `ensure_alive` to `true` when it assumes the
/// allocation will continue to be alive after this call. This enables
/// runtime safety checks and is useful, for example, when transferring
/// ownership across threads or Zig/c++ boundaries.
pub fn dangerouslyDecrementRef(this: Self, comptime ensure_alive: bool) void {
const prev = this.__ptr.deref();
log("0x{x} deref {d} - 1 = {d}", .{ @intFromPtr(this.__ptr), prev, prev - 1 });
if (prev == 1) {
@branchHint(.unlikely);
if (comptime ensure_alive) bun.assert(false);
// SAFETY: caller ensures that
// 1. owner will die at the same time as this pointer, meaning nobody owns the dangling pointer
// 2. reference count is not decremented below 1, meaning the pointer is not dangling
this.destroy();
}
}
/// Deinitialize the pointer.
///
/// `deinit` decrements the stored allocation's reference count. If that
/// count hits 0, the value gets deinitialized and the allocation is freed.
pub fn deinit(this: *Self) void {
const prev = this.__ptr.deref();
log("0x{x} deref {d} - 1 = {d}", .{ @intFromPtr(this.__ptr), prev, prev - 1 });
if (prev == 1) {
@branchHint(.unlikely);
this.destroy();
}
// pointer cannot be used after .deinit(), even if other references
// to the allocation still exist
this.* = undefined;
}
fn destroy(this: Self) void {
const allocator = this.__ptr.allocator;
var value: T = if (comptime sync) @atomicLoad(T, this.__ptr.value, .acquire) else this.__ptr.value;
if (@hasDecl(Context, "deinit")) {
switch (@TypeOf(Context.deinit)) {
fn (*T) void => Context.deinit(&value),
fn (*T, Allocator) void => Context.deinit(&value, allocator),
fn (T) void => Context.deinit(&value),
fn (T, Allocator) void => Context.deinit(value, allocator),
null => {}, // No deinit function, do nothing
else => @compileError("Invalid type or fn signature for `Context.deinit`: " ++ @typeName(Context.deinit)),
}
}
allocator.destroy(this.__ptr);
this.__ptr.* = undefined;
}
/// Get the current number of references to the stored value.
///
/// You will almost never need to call this directly. Use `clone` and
/// `deinit` instead.
pub fn refCount(this: Self) u32 {
return this.__ptr.refCount();
}
};
}
const expectEqual = std.testing.expectEqual;
test Rc {
const allocator = std.testing.allocator;
const rc = try Rc(u32).init(42, allocator);
defer rc.deinit();
try expectEqual(42, rc.get().*);
try expectEqual(1, rc.refCount());
const rc2 = rc.clone();
try expectEqual(42, rc2.get().*);
try expectEqual(2, rc.refCount());
rc2.deinit();
try expectEqual(1, rc.refCount());
}
test Arc {
const allocator = std.testing.allocator;
const rc = try Arc(u32).init(42, allocator);
defer rc.deinit();
try expectEqual(42, rc.get().*);
try expectEqual(1, rc.refCount());
// Create a new reference to the same value
const rc2 = rc.clone();
try expectEqual(42, rc2.get().*);
try expectEqual(2, rc.refCount());
// `deinit`ing an Arc decrements the reference count
rc2.deinit();
try expectEqual(1, rc.refCount());
}

View File

@@ -3786,9 +3786,10 @@ pub fn handleTemplateValue(
if (template_value.as(JSC.WebCore.Blob)) |blob| {
if (blob.store) |store| {
if (store.data == .file) {
if (store.data.file.pathlike == .path) {
const path = store.data.file.pathlike.path.slice();
const s = store.get();
if (s.data == .file) {
if (s.data.file.pathlike == .path) {
const path = s.data.file.pathlike.path.slice();
if (!try builder.appendUTF8(path, true)) {
return globalThis.throw("Shell script string contains invalid UTF-16", .{});
}