Compare commits

...

6 Commits

Author SHA1 Message Date
Jarred Sumner
6754c9d9dc Merge branch 'main' into jarred/followup 2024-06-09 00:49:48 -07:00
Jarred Sumner
d0567b0e95 Merge branch 'main' into jarred/followup 2024-06-06 03:43:26 -07:00
dave caruso
8f1004d978 add more test things 2024-06-05 17:01:35 -07:00
Jarred Sumner
0e000a97b4 more 2024-06-05 00:21:42 -07:00
Jarred Sumner
4f4dc2a9eb more 2024-06-05 00:05:00 -07:00
Jarred Sumner
376e3b57f2 Prevent calling into JavaScript during atexit. Emit beforeExit in Worker. Fix worker thread destruction in fetch() 2024-06-04 23:34:14 -07:00
8 changed files with 123 additions and 24 deletions

View File

@@ -425,16 +425,23 @@ extern "C" uint64_t Bun__readOriginTimer(void*);
extern "C" double Bun__readOriginTimerStart(void*);
// https://github.com/nodejs/node/blob/1936160c31afc9780e4365de033789f39b7cbc0c/src/api/hooks.cc#L49
extern "C" void Process__dispatchOnBeforeExit(Zig::GlobalObject* globalObject, uint8_t exitCode)
extern "C" int32_t Process__dispatchOnBeforeExit(Zig::GlobalObject* globalObject, uint8_t exitCode)
{
if (!globalObject->hasProcessObject()) {
return;
return 0;
}
auto* process = jsCast<Process*>(globalObject->processObject());
MarkedArgumentBuffer arguments;
arguments.append(jsNumber(exitCode));
process->wrapped().emit(Identifier::fromString(globalObject->vm(), "beforeExit"_s), arguments);
auto& emitter = process->wrapped();
const Identifier onBeforeExit = Identifier::fromString(globalObject->vm(), "beforeExit"_s);
if (emitter.hasEventListeners(onBeforeExit)) {
emitter.emit(onBeforeExit, arguments);
return 1;
}
return 0;
}
extern "C" void Process__dispatchOnExit(Zig::GlobalObject* globalObject, uint8_t exitCode)

View File

@@ -517,7 +517,7 @@ pub const ExitHandler = struct {
vm.exit_handler.exit_code = code;
}
extern fn Process__dispatchOnBeforeExit(*JSC.JSGlobalObject, code: u8) void;
extern fn Process__dispatchOnBeforeExit(*JSC.JSGlobalObject, code: u8) i32;
extern fn Process__dispatchOnExit(*JSC.JSGlobalObject, code: u8) void;
extern fn Bun__closeAllSQLiteDatabasesForTermination() void;
@@ -530,10 +530,11 @@ pub const ExitHandler = struct {
}
}
pub fn dispatchOnBeforeExit(this: *ExitHandler) void {
/// returns `true` if a 'beforeExit' handler was called. `false` otherwise.
pub fn dispatchOnBeforeExit(this: *ExitHandler) bool {
JSC.markBinding(@src());
const vm = @fieldParentPtr(VirtualMachine, "exit_handler", this);
Process__dispatchOnBeforeExit(vm.global, this.exit_code);
return Process__dispatchOnBeforeExit(vm.global, this.exit_code) != 0;
}
};
@@ -1059,19 +1060,26 @@ pub const VirtualMachine = struct {
}
pub fn onBeforeExit(this: *VirtualMachine) void {
this.exit_handler.dispatchOnBeforeExit();
if (!this.exit_handler.dispatchOnBeforeExit()) {
return;
}
var dispatch = false;
while (true) {
while (!this.isShuttingDown()) {
while (this.isEventLoopAlive()) : (dispatch = true) {
this.tick();
this.eventLoop().autoTickActive();
if (this.isShuttingDown()) {
break;
}
}
if (dispatch) {
this.exit_handler.dispatchOnBeforeExit();
dispatch = false;
if (this.isEventLoopAlive()) continue;
if (this.exit_handler.dispatchOnBeforeExit()) {
if (this.isEventLoopAlive()) continue;
}
}
break;
@@ -1620,6 +1628,10 @@ pub const VirtualMachine = struct {
}
}
pub fn beginShutdown(this: *VirtualMachine) void {
this.is_shutting_down = true;
}
pub fn initWorker(
worker: *WebWorker,
opts: Options,

View File

@@ -401,18 +401,21 @@ pub const WebWorker = struct {
var vm_to_deinit: ?*JSC.VirtualMachine = null;
if (this.vm) |vm| {
this.vm = null;
vm.is_shutting_down = true;
vm.beginShutdown();
vm.onExit();
exit_code = vm.exit_handler.exit_code;
globalObject = vm.global;
vm_to_deinit = vm;
// Prevent attempting to run GC after the VM has been deinitialized.
vm.gc_controller.disabled = true;
}
var arena = this.arena;
WebWorker__dispatchExit(globalObject, cpp_worker, exit_code);
this.deinit();
if (vm_to_deinit) |vm| {
this.deinit();
vm.deinit(); // NOTE: deinit here isn't implemented, so freeing workers will leak the vm.
}

View File

@@ -775,13 +775,19 @@ pub const Fetch = struct {
bun.debugAssert(count > 0);
}
pub fn deref(this: *FetchTasklet) void {
pub fn derefWithCount(this: *FetchTasklet) u32 {
const count = this.ref_count.fetchSub(1, .Monotonic);
bun.debugAssert(count > 0);
if (count == 1) {
this.deinit();
}
return count;
}
pub fn deref(this: *FetchTasklet) void {
_ = this.derefWithCount();
}
pub const HTTPRequestBody = union(enum) {
@@ -1507,6 +1513,8 @@ pub const Fetch = struct {
export fn Bun__FetchResponse_finalize(this: *FetchTasklet) callconv(.C) void {
log("onResponseFinalize", .{});
const vm = JSC.VirtualMachine.get();
if (this.native_response) |response| {
const body = response.body;
// Three scenarios:
@@ -1520,6 +1528,10 @@ pub const Fetch = struct {
// Note: We cannot call .get() on the ReadableStreamRef. This is called inside a finalizer.
if (body.value != .Locked or this.readable_stream_ref.held.has()) {
// Scenario 1 or 3.
if (vm.isShuttingDown()) {
this.ignoreRemainingResponseBody();
}
return;
}
@@ -1532,6 +1544,10 @@ pub const Fetch = struct {
// Scenario 3.
this.ignoreRemainingResponseBody();
}
} else {
if (vm.isShuttingDown()) {
this.ignoreRemainingResponseBody();
}
}
}
comptime {
@@ -1773,6 +1789,11 @@ pub const Fetch = struct {
task.deref();
return;
}
if (!result.has_more and task.javascript_vm.isShuttingDown()) {
task.deref();
return;
}
} else {
if (success) {
_ = task.scheduled_response_buffer.write(task.response_buffer.list.items) catch bun.outOfMemory();

View File

@@ -430,6 +430,7 @@ pub const Run = struct {
}
const exit_code = this.vm.exit_handler.exit_code;
vm.is_shutting_down = true;
vm.onExit();
if (!JSC.is_bindgen) JSC.napi.fixDeadCodeElimination();

View File

@@ -1225,6 +1225,10 @@ pub const PosixLoop = extern struct {
this.num_polls -= 1;
}
pub fn deinit(this: *PosixLoop) void {
us_loop_free(this);
}
pub fn ref(this: *PosixLoop) void {
log("ref {d} + 1 = {d}", .{ this.num_polls, this.num_polls + 1 });
this.num_polls += 1;
@@ -2787,6 +2791,10 @@ pub const WindowsLoop = extern struct {
return uws_get_loop_with_native(bun.windows.libuv.Loop.get());
}
pub fn deinit(this: *WindowsLoop) void {
us_loop_free(this);
}
extern fn uws_get_loop_with_native(*anyopaque) *WindowsLoop;
pub fn iterationNumber(this: *const WindowsLoop) u64 {

View File

@@ -3,9 +3,18 @@ import { $ } from "bun";
import { join } from "path";
import "harness";
describe("Worker destruction", () => {
const method = ["Bun.connect", "Bun.listen"];
test.each(method)("bun closes cleanly when %s is used in a Worker that is terminating", method => {
describe("Worker", () => {
const method = [
"Bun.connect",
"Bun.listen",
"fetch",
"fetch-early-exit",
"fetch+blob",
"fetch+blob-early-exit",
"readFile",
"readFile-early-exit",
];
test.each(method)("closes cleanly when %s is used while the Worker terminates", method => {
expect([join(import.meta.dir, "worker_thread_check.ts"), method]).toRun();
});
});

View File

@@ -1,28 +1,39 @@
const CONCURRENCY = 10;
const CONCURRENCY = 20;
const RUN_COUNT = 5;
import { Worker, isMainThread, workerData } from "worker_threads";
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
const sleep = Bun.sleep;
if (isMainThread) {
let action = process.argv.at(-1);
if (process.argv.length === 2) {
action = "Bun.connect";
action = "readFile";
}
const body = new Blob(["Hello, world!".repeat(100)]);
let httpCount = 0;
let onHTTPCount = (a: number) => {};
const server = Bun.serve({
port: 0,
fetch() {
return new Response();
onHTTPCount(httpCount++);
return new Response(body);
},
});
let remaining = RUN_COUNT;
while (remaining--) {
const promises = [];
const initialHTTPCount = httpCount;
let httpCountThisRun = 0;
let pendingHTTPCountPromises = [];
onHTTPCount = a => {
setTimeout(() => {
pendingHTTPCountPromises[httpCountThisRun++].resolve();
}, 0);
};
for (let i = 0; i < CONCURRENCY; i++) {
pendingHTTPCountPromises.push(Promise.withResolvers());
const worker = new Worker(import.meta.url, {
workerData: {
action,
@@ -36,7 +47,12 @@ if (isMainThread) {
worker.on("online", () => {
sleep(1)
.then(() => {
return worker.terminate();
// if (action === "fetch+blob") {
// return pendingHTTPCountPromises[i].promise;
// }
})
.then(() => {
worker.terminate();
})
.finally(resolve);
});
@@ -50,6 +66,7 @@ if (isMainThread) {
} else {
Bun.gc(true);
const { action, port } = workerData;
self.addEventListener("message", () => {});
switch (action) {
case "Bun.connect": {
@@ -81,9 +98,30 @@ if (isMainThread) {
break;
}
case "fetch": {
await fetch("http://localhost:" + port);
break;
}
case "fetch-early-exit": {
fetch("http://localhost:" + port);
break;
}
case "fetch+blob": {
const resp = await fetch("http://localhost:" + port);
await resp.blob();
break;
}
case "fetch+blob-early-exit": {
const resp = await fetch("http://localhost:" + port);
await resp.blob();
break;
}
case "readFile": {
await Bun.file(import.meta.path).text();
break;
}
case "readFile-early-exit": {
await Bun.file(import.meta.path).text();
break;
}
}
}