Synchronous concurrent test fix (#22928)

```ts
beforeEach(() => {
  console.log("beforeEach");
});
afterEach(() => {
  console.log("afterEach");
});
test.concurrent("test 1", () => {
  console.log("start test 1");
});
test.concurrent("test 2", async () => {
  console.log("start test 2");
});
test.concurrent("test 3", () => {
  console.log("start test 3");
});
```

```
$> bun-before test synchronous-concurrent
beforeEach
beforeEach
beforeEach
start test 1
start test 2
start test 3
afterEach
afterEach
afterEach

$> bun-after test synchronous-concurrent
beforeEach
start test 1
afterEach
beforeEach
start test 2
afterEach
beforeEach
start test 3
afterEach
```

---------

Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
This commit is contained in:
pfg
2025-09-25 03:52:18 -07:00
committed by GitHub
parent 6c381b0e03
commit 0ea4ce1bb4
10 changed files with 261 additions and 63 deletions

View File

@@ -182,6 +182,14 @@ comptime {
@export(&externRunCallback3, .{ .name = "Bun__EventLoop__runCallback3" });
}
/// Prefer `runCallbackWithResult` unless you really need to make sure that microtasks are drained.
pub fn runCallbackWithResultAndForcefullyDrainMicrotasks(this: *EventLoop, callback: jsc.JSValue, globalObject: *jsc.JSGlobalObject, thisValue: jsc.JSValue, arguments: []const jsc.JSValue) !jsc.JSValue {
const result = try callback.call(globalObject, thisValue, arguments);
result.ensureStillAlive();
try this.drainMicrotasksWithGlobal(globalObject, globalObject.bunVM().jsc_vm);
return result;
}
pub fn runCallbackWithResult(this: *EventLoop, callback: jsc.JSValue, globalObject: *jsc.JSGlobalObject, thisValue: jsc.JSValue, arguments: []const jsc.JSValue) jsc.JSValue {
this.enter();
defer this.exit();

View File

@@ -137,11 +137,12 @@ pub fn step(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject
this.active_scope = new_scope;
group.log("collection:runOne set scope to {s}", .{this.active_scope.base.name orelse "undefined"});
BunTest.runTestCallback(buntest_strong, globalThis, callback.get(), false, .{
.collection = .{
.active_scope = previous_scope,
},
}, .epoch);
if (BunTest.runTestCallback(buntest_strong, globalThis, callback.get(), false, .{
.collection = .{ .active_scope = previous_scope },
}, &.epoch)) |cfg_data| {
// the result is available immediately; queue
buntest.addResult(cfg_data);
}
return .{ .waiting = .{} };
}

View File

@@ -222,10 +222,11 @@ pub fn step(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject
defer groupLog.end();
const buntest = buntest_strong.get();
const this = &buntest.execution;
var now = bun.timespec.now();
switch (data) {
.start => {
return try stepGroup(buntest_strong, globalThis, bun.timespec.now());
return try stepGroup(buntest_strong, globalThis, &now);
},
else => {
// determine the active sequence,group
@@ -242,21 +243,20 @@ pub fn step(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject
bun.assert(sequence.active_index < sequence.entries(this).len);
this.advanceSequence(sequence, group);
const now = bun.timespec.now();
const sequence_result = try stepSequence(buntest_strong, globalThis, sequence, group, sequence_index, now);
const sequence_result = try stepSequence(buntest_strong, globalThis, sequence, group, sequence_index, &now);
switch (sequence_result) {
.done => {},
.execute => |exec| return .{ .waiting = .{ .timeout = exec.timeout } },
}
if (group.remaining_incomplete_entries == 0) {
return try stepGroup(buntest_strong, globalThis, now);
return try stepGroup(buntest_strong, globalThis, &now);
}
return .{ .waiting = .{} };
},
}
}
pub fn stepGroup(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject, now: bun.timespec) bun.JSError!bun_test.StepResult {
pub fn stepGroup(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject, now: *bun.timespec) bun.JSError!bun_test.StepResult {
groupLog.begin(@src());
defer groupLog.end();
const buntest = buntest_strong.get();
@@ -295,7 +295,7 @@ pub fn stepGroup(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalO
}
}
const AdvanceStatus = union(enum) { done, execute: struct { timeout: bun.timespec = .epoch } };
fn stepGroupOne(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject, group: *ConcurrentGroup, now: bun.timespec) !AdvanceStatus {
fn stepGroupOne(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject, group: *ConcurrentGroup, now: *bun.timespec) !AdvanceStatus {
const buntest = buntest_strong.get();
const this = &buntest.execution;
var final_status: AdvanceStatus = .done;
@@ -320,13 +320,13 @@ const AdvanceSequenceStatus = union(enum) {
timeout: bun.timespec = .epoch,
},
};
fn stepSequence(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject, sequence: *ExecutionSequence, group: *ConcurrentGroup, sequence_index: usize, now: bun.timespec) !AdvanceSequenceStatus {
fn stepSequence(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject, sequence: *ExecutionSequence, group: *ConcurrentGroup, sequence_index: usize, now: *bun.timespec) !AdvanceSequenceStatus {
while (true) {
return try stepSequenceOne(buntest_strong, globalThis, sequence, group, sequence_index, now) orelse continue;
}
}
/// returns null if the while loop should continue
fn stepSequenceOne(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject, sequence: *ExecutionSequence, group: *ConcurrentGroup, sequence_index: usize, now: bun.timespec) !?AdvanceSequenceStatus {
fn stepSequenceOne(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGlobalObject, sequence: *ExecutionSequence, group: *ConcurrentGroup, sequence_index: usize, now: *bun.timespec) !?AdvanceSequenceStatus {
groupLog.begin(@src());
defer groupLog.end();
const buntest = buntest_strong.get();
@@ -337,10 +337,7 @@ fn stepSequenceOne(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGloba
bun.debugAssert(false); // sequence is executing with no active entry
return .{ .execute = .{} };
};
if (!active_entry.timespec.eql(&.epoch) and active_entry.timespec.order(&now) == .lt) {
// timed out
sequence.result = if (active_entry == sequence.test_entry) if (active_entry.has_done_parameter) .fail_because_timeout_with_done_callback else .fail_because_timeout else if (active_entry.has_done_parameter) .fail_because_hook_timeout_with_done_callback else .fail_because_hook_timeout;
sequence.maybe_skip = true;
if (active_entry.evaluateTimeout(sequence, now)) {
this.advanceSequence(sequence, group);
return null; // run again
}
@@ -374,7 +371,14 @@ fn stepSequenceOne(buntest_strong: bun_test.BunTestPtr, globalThis: *jsc.JSGloba
};
groupLog.log("runSequence queued callback: {}", .{callback_data});
BunTest.runTestCallback(buntest_strong, globalThis, cb.get(), next_item.has_done_parameter, callback_data, next_item.timespec);
if (BunTest.runTestCallback(buntest_strong, globalThis, cb.get(), next_item.has_done_parameter, callback_data, &next_item.timespec) != null) {
now.* = bun.timespec.now();
_ = next_item.evaluateTimeout(sequence, now);
// the result is available immediately; advance the sequence and run again.
this.advanceSequence(sequence, group);
return null; // run again
}
return .{ .execute = .{ .timeout = next_item.timespec } };
} else {
switch (next_item.base.mode) {

View File

@@ -331,6 +331,7 @@ pub const BunTest = struct {
errdefer group.log("ended in error", .{});
const result, const this_ptr = callframe.argumentsAsArray(2);
if (this_ptr.isEmptyOrUndefinedOrNull()) return;
const refdata: *RefData = this_ptr.asPromisePtr(RefData);
defer refdata.deref();
@@ -472,21 +473,21 @@ pub const BunTest = struct {
}
}
this.updateMinTimeout(globalThis, min_timeout);
this.updateMinTimeout(globalThis, &min_timeout);
}
fn updateMinTimeout(this: *BunTest, globalThis: *jsc.JSGlobalObject, min_timeout: bun.timespec) void {
fn updateMinTimeout(this: *BunTest, globalThis: *jsc.JSGlobalObject, min_timeout: *const bun.timespec) void {
group.begin(@src());
defer group.end();
// only set the timer if the new timeout is sooner than the current timeout. this unfortunately means that we can't unset an unnecessary timer.
group.log("-> timeout: {} {}, {s}", .{ min_timeout, this.timer.next, @tagName(min_timeout.orderIgnoreEpoch(this.timer.next)) });
group.log("-> timeout: {} {}, {s}", .{ min_timeout.*, this.timer.next, @tagName(min_timeout.orderIgnoreEpoch(this.timer.next)) });
if (min_timeout.orderIgnoreEpoch(this.timer.next) == .lt) {
group.log("-> setting timer to {}", .{min_timeout});
group.log("-> setting timer to {}", .{min_timeout.*});
if (!this.timer.next.eql(&.epoch)) {
group.log("-> removing existing timer", .{});
globalThis.bunVM().timer.remove(&this.timer);
}
this.timer.next = min_timeout;
this.timer.next = min_timeout.*;
if (!this.timer.next.eql(&.epoch)) {
group.log("-> inserting timer", .{});
globalThis.bunVM().timer.insert(&this.timer);
@@ -534,48 +535,55 @@ pub const BunTest = struct {
}
}
fn drain(globalThis: *jsc.JSGlobalObject) void {
const bun_vm = globalThis.bunVM();
bun_vm.drainMicrotasks();
var count = bun_vm.unhandled_error_counter;
bun_vm.global.handleRejectedPromises();
while (bun_vm.unhandled_error_counter > count) {
count = bun_vm.unhandled_error_counter;
bun_vm.drainMicrotasks();
bun_vm.global.handleRejectedPromises();
}
}
/// if sync, the result is queued and appended later
pub fn runTestCallback(this_strong: BunTestPtr, globalThis: *jsc.JSGlobalObject, cfg_callback: jsc.JSValue, cfg_done_parameter: bool, cfg_data: BunTest.RefDataValue, timeout: bun.timespec) void {
/// if sync, the result is returned. if async, null is returned.
pub fn runTestCallback(this_strong: BunTestPtr, globalThis: *jsc.JSGlobalObject, cfg_callback: jsc.JSValue, cfg_done_parameter: bool, cfg_data: BunTest.RefDataValue, timeout: *const bun.timespec) ?RefDataValue {
group.begin(@src());
defer group.end();
const this = this_strong.get();
const vm = globalThis.bunVM();
var done_arg: ?jsc.JSValue = null;
// Don't use ?jsc.JSValue to make it harder for the conservative stack
// scanner to miss it.
var done_arg: jsc.JSValue = .zero;
var done_callback: jsc.JSValue = .zero;
var done_callback: ?jsc.JSValue = null;
if (cfg_done_parameter) {
group.log("callTestCallback -> appending done callback param: data {}", .{cfg_data});
done_callback = DoneCallback.createUnbound(globalThis);
done_arg = DoneCallback.bind(done_callback.?, globalThis) catch |e| blk: {
done_arg = DoneCallback.bind(done_callback, globalThis) catch |e| blk: {
this.onUncaughtException(globalThis, globalThis.takeException(e), false, cfg_data);
break :blk jsc.JSValue.js_undefined; // failed to bind done callback
break :blk .zero; // failed to bind done callback
};
}
this.updateMinTimeout(globalThis, timeout);
const result: ?jsc.JSValue = cfg_callback.call(globalThis, .js_undefined, if (done_arg) |done| &.{done} else &.{}) catch blk: {
const result: jsc.JSValue = vm.eventLoop().runCallbackWithResultAndForcefullyDrainMicrotasks(cfg_callback, globalThis, .js_undefined, if (done_arg != .zero) &.{done_arg} else &.{}) catch blk: {
globalThis.clearTerminationException();
this.onUncaughtException(globalThis, globalThis.tryTakeException(), false, cfg_data);
group.log("callTestCallback -> error", .{});
break :blk null;
break :blk .zero;
};
done_callback.ensureStillAlive();
// Drain unhandled promise rejections.
while (true) {
// Prevent the user's Promise rejection from going into the uncaught promise rejection queue.
if (result != .zero)
if (result.asPromise()) |promise|
if (promise.status(globalThis.vm()) == .rejected)
promise.setHandled(globalThis.vm());
const prev_unhandled_count = vm.unhandled_error_counter;
globalThis.handleRejectedPromises();
if (vm.unhandled_error_counter == prev_unhandled_count)
break;
}
var dcb_ref: ?*RefData = null;
if (done_callback) |dcb| {
if (DoneCallback.fromJS(dcb)) |dcb_data| {
if (dcb_data.called or result == null) {
if (done_callback != .zero and result != .zero) {
if (DoneCallback.fromJS(done_callback)) |dcb_data| {
if (dcb_data.called) {
// done callback already called or the callback errored; add result immediately
} else {
dcb_ref = ref(this_strong, cfg_data);
@@ -584,25 +592,43 @@ pub const BunTest = struct {
} else bun.debugAssert(false); // this should be unreachable, we create DoneCallback above
}
if (result != null and result.?.asPromise() != null) {
group.log("callTestCallback -> promise: data {}", .{cfg_data});
const this_ref: *RefData = if (dcb_ref) |dcb_ref_value| dcb_ref_value.dupe() else ref(this_strong, cfg_data);
result.?.then(globalThis, this_ref, bunTestThen, bunTestCatch);
drain(globalThis);
return;
if (result != .zero) {
if (result.asPromise()) |promise| {
defer result.ensureStillAlive(); // because sometimes we use promise without result
group.log("callTestCallback -> promise: data {}", .{cfg_data});
switch (promise.status(globalThis.vm())) {
.pending => {
// not immediately resolved; register 'then' to handle the result when it becomes available
const this_ref: *RefData = if (dcb_ref) |dcb_ref_value| dcb_ref_value.dupe() else ref(this_strong, cfg_data);
result.then(globalThis, this_ref, bunTestThen, bunTestCatch);
return null;
},
.fulfilled => {
// Do not register a then callback when it's already fulfilled.
return cfg_data;
},
.rejected => {
const value = promise.result(globalThis.vm());
this.onUncaughtException(globalThis, value, true, cfg_data);
// We previously marked it as handled above.
return cfg_data;
},
}
}
}
if (dcb_ref) |_| {
// completed asynchronously
group.log("callTestCallback -> wait for done callback", .{});
drain(globalThis);
return;
return null;
}
group.log("callTestCallback -> sync", .{});
drain(globalThis);
this.addResult(cfg_data);
return;
return cfg_data;
}
/// called from the uncaught exception handler, or if a test callback rejects or throws an error
@@ -843,6 +869,26 @@ pub const ExecutionEntry = struct {
}
return entry;
}
pub fn evaluateTimeout(this: *ExecutionEntry, sequence: *Execution.ExecutionSequence, now: *const bun.timespec) bool {
if (!this.timespec.eql(&.epoch) and this.timespec.order(now) == .lt) {
// timed out
sequence.result = if (this == sequence.test_entry)
if (this.has_done_parameter)
.fail_because_timeout_with_done_callback
else
.fail_because_timeout
else if (this.has_done_parameter)
.fail_because_hook_timeout_with_done_callback
else
.fail_because_hook_timeout;
sequence.maybe_skip = true;
return true;
}
return false;
}
pub fn destroy(this: *ExecutionEntry, gpa: std.mem.Allocator) void {
if (this.callback) |*c| c.deinit();
this.base.deinit(gpa);

View File

@@ -0,0 +1,15 @@
beforeEach(() => {
console.log("beforeEach");
});
afterEach(() => {
console.log("afterEach");
});
test.concurrent("test 1", () => {
console.log("start test 1");
});
test.concurrent("test 2", () => {
console.log("start test 2");
});
test.concurrent("test 3", () => {
console.log("start test 3");
});

View File

@@ -0,0 +1,77 @@
import { expect, test } from "bun:test";
import { bunEnv, bunExe, normalizeBunSnapshot } from "harness";
test("concurrent immediate", async () => {
const result = await Bun.spawn({
cmd: [bunExe(), "test", import.meta.dir + "/concurrent_immediate.fixture.ts"],
stdout: "pipe",
stderr: "pipe",
env: bunEnv,
});
const exitCode = await result.exited;
const stdout = await result.stdout.text();
const stderr = await result.stderr.text();
expect(exitCode).toBe(0);
expect(normalizeBunSnapshot(stdout)).toMatchInlineSnapshot(`
"bun test <version> (<revision>)
beforeEach
start test 1
afterEach
beforeEach
start test 2
afterEach
beforeEach
start test 3
afterEach"
`);
const result2 = await Bun.spawn({
cmd: [bunExe(), "test", import.meta.dir + "/concurrent_immediate_promise.fixture.ts"],
stdout: "pipe",
stderr: "pipe",
env: bunEnv,
});
const exitCode2 = await result2.exited;
const stdout2 = await result2.stdout.text();
const stderr2 = await result2.stderr.text();
expect(exitCode2).toBe(0);
expect(normalizeBunSnapshot(stdout2)).toBe(normalizeBunSnapshot(stdout));
expect(normalizeBunSnapshot(stderr2).replaceAll("_promise.", ".")).toBe(normalizeBunSnapshot(stderr));
});
function filterImportantLines(stderr: string) {
return normalizeBunSnapshot(stderr)
.split("\n")
.filter(l => l.startsWith("(pass)") || l.startsWith("(fail)") || l.startsWith("error:"))
.join("\n");
}
test("concurrent immediate error", async () => {
const result = await Bun.spawn({
cmd: [bunExe(), "test", import.meta.dir + "/concurrent_immediate_error.fixture.ts"],
stdout: "pipe",
stderr: "pipe",
env: bunEnv,
});
const exitCode = await result.exited;
const stdout = await result.stdout.text();
const stderr = await result.stderr.text();
expect(exitCode).toBe(1);
expect(filterImportantLines(stderr)).toMatchInlineSnapshot(`
"(pass) test 1
error: test 2 error
(fail) test 2
(pass) test 3"
`);
const result2 = await Bun.spawn({
cmd: [bunExe(), "test", import.meta.dir + "/concurrent_immediate_error_promise.fixture.ts"],
stdout: "pipe",
stderr: "pipe",
env: bunEnv,
});
const exitCode2 = await result2.exited;
const stdout2 = await result2.stdout.text();
const stderr2 = await result2.stderr.text();
expect(filterImportantLines(stderr2)).toBe(filterImportantLines(stderr));
});

View File

@@ -0,0 +1,15 @@
beforeEach(() => {
console.log("beforeEach");
});
afterEach(() => {
console.log("afterEach");
});
test.concurrent("test 1", () => {
console.log("start test 1");
});
test.concurrent("test 2", () => {
throw new Error("test 2 error");
});
test.concurrent("test 3", () => {
console.log("start test 3");
});

View File

@@ -0,0 +1,15 @@
beforeEach(async () => {
console.log("beforeEach");
});
afterEach(async () => {
console.log("afterEach");
});
test.concurrent("test 1", async () => {
console.log("start test 1");
});
test.concurrent("test 2", async () => {
throw new Error("test 2 error");
});
test.concurrent("test 3", async () => {
console.log("start test 3");
});

View File

@@ -0,0 +1,15 @@
beforeEach(async () => {
console.log("beforeEach");
});
afterEach(async () => {
console.log("afterEach");
});
test.concurrent("test 1", async () => {
console.log("start test 1");
});
test.concurrent("test 2", async () => {
console.log("start test 2");
});
test.concurrent("test 3", async () => {
console.log("start test 3");
});

View File

@@ -13,6 +13,7 @@ import fs, {
fdatasync,
fdatasyncSync,
fstatSync,
ftruncateSync,
lstatSync,
mkdirSync,
mkdtemp,
@@ -2714,14 +2715,15 @@ it("fstat on a large file", () => {
try {
dest = `${tmpdir()}/fs.test.ts/${Math.trunc(Math.random() * 10000000000).toString(32)}.stat.txt`;
mkdirSync(dirname(dest), { recursive: true });
const bigBuffer = new Uint8Array(1024 * 1024 * 1024);
fd = openSync(dest, "w");
let offset = 0;
while (offset < 5 * 1024 * 1024 * 1024) {
offset += writeSync(fd, bigBuffer, 0, bigBuffer.length, offset);
}
// Instead of writing the actual bytes, we can use ftruncate to make a
// hole-y file and extend it to the desired size This should generally avoid
// the ENOSPC issue and avoid timeouts.
ftruncateSync(fd, 5 * 1024 * 1024 * 1024);
fdatasyncSync(fd);
expect(fstatSync(fd).size).toEqual(offset);
const stats = fstatSync(fd);
expect(stats.size).toEqual(5 * 1024 * 1024 * 1024);
} catch (error) {
// TODO: Once `fs.statfsSync` is implemented, make sure that the buffer size
// is small enough not to cause: ENOSPC: No space left on device.