Compare commits

...

5 Commits

Author SHA1 Message Date
Ben Grant
0b4d2bd0ed Make Prisma leak test less strict 2024-11-21 10:49:39 -08:00
Ben Grant
3581b3b091 Try to narrow down segfault 2024-11-21 10:49:27 -08:00
Ben Grant
b1a371c7b2 Revert fix to see if test still crashes 2024-11-20 19:43:00 -08:00
Ben Grant
7478de533e Check for leaks in Prisma test 2024-11-19 19:16:31 -08:00
Ben Grant
3e53f72e6b Fix memory leak in napi_threadsafe_function 2024-11-19 18:22:07 -08:00
3 changed files with 74 additions and 4 deletions

View File

@@ -1434,11 +1434,17 @@ pub const ThreadSafeFunction = struct {
env: napi_env,
finalizer: Finalizer = Finalizer{ .fun = null, .data = null },
// TODO(@190n) change to better type
channel: Queue,
// allocator used to create the slice when the buffer size is fixed
allocator: std.mem.Allocator,
ctx: ?*anyopaque = null,
callback: Callback = undefined,
callback: Callback,
freeing: bool = false,
freed: bool = false,
const ThreadSafeFunctionTask = JSC.AnyTask.New(@This(), call);
pub const Queue = union(enum) {
@@ -1479,6 +1485,17 @@ pub const ThreadSafeFunction = struct {
}
}
pub fn deinit(this: *@This(), allocator: std.mem.Allocator) void {
switch (this.*) {
.sized => |*s| {
// buffer was allocated in init with nonzero size
allocator.free(s.buffer.buf);
s.deinit();
},
.unsized => |*u| u.deinit(),
}
}
pub fn writeItem(this: *@This(), value: ?*anyopaque) !void {
switch (this.*) {
.sized => try this.sized.writeItem(value),
@@ -1508,7 +1525,10 @@ pub const ThreadSafeFunction = struct {
}
};
/// From the JS thread, handle one call to the underlying function that was requested
/// by another thread (this is called by the event loop)
pub fn call(this: *ThreadSafeFunction) void {
if (this.freed) @panic("napi tsfn use after free");
const task = this.channel.tryReadItem() catch null orelse return;
const globalObject = this.env;
@@ -1539,6 +1559,7 @@ pub const ThreadSafeFunction = struct {
}
pub fn enqueue(this: *ThreadSafeFunction, ctx: ?*anyopaque, block: bool) !void {
if (this.freeing) @panic("napi tsfn use after free");
if (block) {
try this.channel.writeItem(ctx);
} else {
@@ -1552,6 +1573,7 @@ pub const ThreadSafeFunction = struct {
pub fn finalize(opaq: *anyopaque) void {
var this = bun.cast(*ThreadSafeFunction, opaq);
this.freed = true;
this.unref();
if (this.finalizer.fun) |fun| {
@@ -1567,6 +1589,7 @@ pub const ThreadSafeFunction = struct {
this.callback.c.js.unprotect();
}
}
this.channel.deinit(this.allocator);
bun.default_allocator.destroy(this);
}
@@ -1605,6 +1628,8 @@ pub const ThreadSafeFunction = struct {
}
if (mode == .abort or this.thread_count == 0) {
if (this.freeing) @panic("napi tsfn double free");
this.freeing = true;
this.event_loop.enqueueTaskConcurrent(JSC.ConcurrentTask.fromCallback(this, finalize));
}
@@ -1657,6 +1682,7 @@ pub export fn napi_create_threadsafe_function(
.thread_count = initial_thread_count,
.poll_ref = Async.KeepAlive.init(),
.tracker = JSC.AsyncTaskTracker.init(vm),
.allocator = bun.default_allocator,
};
function.finalizer = .{ .data = thread_finalize_data, .fun = thread_finalize_cb };

View File

@@ -535,7 +535,7 @@ pub const RwLock = if (@import("builtin").os.tag != .windows and @import("builti
pub fn deinit(self: *RwLock) void {
const safe_rc = switch (@import("builtin").os.tag) {
.dragonfly, .netbsd => std.posix.EAGAIN,
else => 0,
else => std.c.E.SUCCESS,
};
const rc = std.c.pthread_rwlock_destroy(&self.rwlock);
@@ -884,7 +884,7 @@ else if (@import("builtin").link_libc)
pub fn deinit(self: *Mutex) void {
const safe_rc = switch (@import("builtin").os.tag) {
.dragonfly, .netbsd => std.posix.EAGAIN,
else => 0,
else => std.c.E.SUCCESS,
};
const rc = std.c.pthread_mutex_destroy(&self.mutex);
@@ -1078,7 +1078,7 @@ else if (@import("builtin").link_libc)
pub fn deinit(self: *Condvar) void {
const safe_rc = switch (@import("builtin").os.tag) {
.dragonfly, .netbsd => std.posix.EAGAIN,
else => 0,
else => std.c.E.SUCCESS,
};
const rc = std.c.pthread_cond_destroy(&self.cond);

View File

@@ -2,6 +2,7 @@ import { createCanvas } from "@napi-rs/canvas";
import { it as bunIt, test as bunTest, describe, expect } from "bun:test";
import { generate, generateClient } from "./helper.ts";
import type { PrismaClient } from "./prisma/types.d.ts";
import { appendFile } from "node:fs/promises";
function* TestIDGenerator(): Generator<number> {
while (true) {
@@ -74,6 +75,49 @@ async function cleanTestId(prisma: PrismaClient, testId: number) {
});
}
test(
"does not leak",
async (prisma: PrismaClient, _: number) => {
// prisma leak was 8 bytes per query, so 4 million requests would manifest as a 32MB leak
const batchSize = 1000;
const warmupIters = 1_000_000 / batchSize;
const testIters = 4_000_000 / batchSize;
const gcPeriod = 10_000 / batchSize;
let totalIters = 0;
async function runQuery() {
totalIters++;
// GC occasionally to make memory usage more deterministic
if (totalIters % gcPeriod == gcPeriod - 1) {
Bun.gc(true);
const line = `${totalIters},${(process.memoryUsage.rss() / 1024 / 1024) | 0}`;
// console.log(line);
// await appendFile("rss.csv", line + "\n");
}
const queries = [];
for (let i = 0; i < batchSize; i++) {
queries.push(prisma.$queryRaw`SELECT 1`);
}
await Promise.all(queries);
}
// warmup first
for (let i = 0; i < warmupIters; i++) {
await runQuery();
}
// measure memory now
const before = process.memoryUsage.rss();
// run a bunch more iterations to see if memory usage increases
for (let i = 0; i < testIters; i++) {
await runQuery();
}
const after = process.memoryUsage.rss();
const deltaMB = (after - before) / 1024 / 1024;
expect(deltaMB).toBeLessThan(20);
},
240_000,
);
test(
"CRUD basics",
async (prisma: PrismaClient, testId: number) => {