meta: add a --parallel flag to the runner for faster local testing (#21140)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Meghan Denny
2025-07-21 14:17:19 -08:00
committed by GitHub
parent 56bc65932f
commit a4d031a841
3 changed files with 279 additions and 52 deletions

109
scripts/p-limit.mjs Normal file
View File

@@ -0,0 +1,109 @@
/**
* p-limit@6.2.0
* https://github.com/sindresorhus/p-limit
* MIT (c) Sindre Sorhus
*/
import Queue from "./yocto-queue.mjs";
export default function pLimit(concurrency) {
validateConcurrency(concurrency);
const queue = new Queue();
let activeCount = 0;
const resumeNext = () => {
if (activeCount < concurrency && queue.size > 0) {
queue.dequeue()();
// Since `pendingCount` has been decreased by one, increase `activeCount` by one.
activeCount++;
}
};
const next = () => {
activeCount--;
resumeNext();
};
const run = async (function_, resolve, arguments_) => {
const result = (async () => function_(...arguments_))();
resolve(result);
try {
await result;
} catch {}
next();
};
const enqueue = (function_, resolve, arguments_) => {
// Queue `internalResolve` instead of the `run` function
// to preserve asynchronous context.
new Promise(internalResolve => {
queue.enqueue(internalResolve);
}).then(run.bind(undefined, function_, resolve, arguments_));
(async () => {
// This function needs to wait until the next microtask before comparing
// `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
// after the `internalResolve` function is dequeued and called. The comparison in the if-statement
// needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
await Promise.resolve();
if (activeCount < concurrency) {
resumeNext();
}
})();
};
const generator = (function_, ...arguments_) =>
new Promise(resolve => {
enqueue(function_, resolve, arguments_);
});
Object.defineProperties(generator, {
activeCount: {
get: () => activeCount,
},
pendingCount: {
get: () => queue.size,
},
clearQueue: {
value() {
queue.clear();
},
},
concurrency: {
get: () => concurrency,
set(newConcurrency) {
validateConcurrency(newConcurrency);
concurrency = newConcurrency;
queueMicrotask(() => {
// eslint-disable-next-line no-unmodified-loop-condition
while (activeCount < concurrency && queue.size > 0) {
resumeNext();
}
});
},
},
});
return generator;
}
export function limitFunction(function_, option) {
const { concurrency } = option;
const limit = pLimit(concurrency);
return (...arguments_) => limit(() => function_(...arguments_));
}
function validateConcurrency(concurrency) {
if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
throw new TypeError("Expected `concurrency` to be a number from 1 and up");
}
}

View File

@@ -28,9 +28,10 @@ import {
writeFileSync,
} from "node:fs";
import { readFile } from "node:fs/promises";
import { userInfo } from "node:os";
import { availableParallelism, userInfo } from "node:os";
import { basename, dirname, extname, join, relative, sep } from "node:path";
import { parseArgs } from "node:util";
import pLimit from "./p-limit.mjs";
import {
getAbi,
getAbiVersion,
@@ -63,6 +64,7 @@ import {
unzip,
uploadArtifact,
} from "./utils.mjs";
let isQuiet = false;
const cwd = import.meta.dirname ? dirname(import.meta.dirname) : process.cwd();
const testsPath = join(cwd, "test");
@@ -153,6 +155,10 @@ const { values: options, positionals: filters } = parseArgs({
type: "boolean",
default: isBuildkite && isLinux,
},
["parallel"]: {
type: "boolean",
default: false,
},
},
});
@@ -341,6 +347,10 @@ async function runTests() {
const failedResults = [];
const maxAttempts = 1 + (parseInt(options["retries"]) || 0);
const parallelism = options["parallel"] ? availableParallelism() : 1;
console.log("parallelism", parallelism);
const limit = pLimit(parallelism);
/**
* @param {string} title
* @param {function} fn
@@ -355,12 +365,15 @@ async function runTests() {
await new Promise(resolve => setTimeout(resolve, 5000 + Math.random() * 10_000));
}
result = await startGroup(
attempt === 1
? `${getAnsi("gray")}[${index}/${total}]${getAnsi("reset")} ${title}`
: `${getAnsi("gray")}[${index}/${total}]${getAnsi("reset")} ${title} ${getAnsi("gray")}[attempt #${attempt}]${getAnsi("reset")}`,
fn,
);
let grouptitle = `${getAnsi("gray")}[${index}/${total}]${getAnsi("reset")} ${title}`;
if (attempt > 1) grouptitle += ` ${getAnsi("gray")}[attempt #${attempt}]${getAnsi("reset")}`;
if (parallelism > 1) {
console.log(grouptitle);
result = await fn();
} else {
result = await startGroup(grouptitle, fn);
}
const { ok, stdoutPreview, error } = result;
if (ok) {
@@ -375,6 +388,7 @@ async function runTests() {
const color = attempt >= maxAttempts ? "red" : "yellow";
const label = `${getAnsi(color)}[${index}/${total}] ${title} - ${error}${getAnsi("reset")}`;
startGroup(label, () => {
if (parallelism > 1) return;
process.stderr.write(stdoutPreview);
});
@@ -434,7 +448,9 @@ async function runTests() {
}
if (!failedResults.length) {
for (const testPath of tests) {
await Promise.all(
tests.map(testPath =>
limit(() => {
const absoluteTestPath = join(testsPath, testPath);
const title = relative(cwd, absoluteTestPath).replaceAll(sep, "/");
if (isNodeTest(testPath)) {
@@ -450,14 +466,18 @@ async function runTests() {
if ((basename(execPath).includes("asan") || !isCI) && shouldValidateExceptions(testPath)) {
env.BUN_JSC_validateExceptionChecks = "1";
}
await runTest(title, async () => {
return runTest(title, async () => {
const { ok, error, stdout } = await spawnBun(execPath, {
cwd: cwd,
args: [subcommand, "--config=" + join(import.meta.dirname, "../bunfig.node-test.toml"), absoluteTestPath],
args: [
subcommand,
"--config=" + join(import.meta.dirname, "../bunfig.node-test.toml"),
absoluteTestPath,
],
timeout: getNodeParallelTestTimeout(title),
env,
stdout: chunk => pipeTestStdout(process.stdout, chunk),
stderr: chunk => pipeTestStdout(process.stderr, chunk),
stdout: parallelism > 1 ? () => {} : chunk => pipeTestStdout(process.stdout, chunk),
stderr: parallelism > 1 ? () => {} : chunk => pipeTestStdout(process.stderr, chunk),
});
const mb = 1024 ** 3;
const stdoutPreview = stdout.slice(0, mb).split("\n").slice(0, 50).join("\n");
@@ -473,9 +493,17 @@ async function runTests() {
};
});
} else {
await runTest(title, async () => spawnBunTest(execPath, join("test", testPath)));
}
return runTest(title, async () =>
spawnBunTest(execPath, join("test", testPath), {
cwd,
stdout: parallelism > 1 ? () => {} : chunk => pipeTestStdout(process.stdout, chunk),
stderr: parallelism > 1 ? () => {} : chunk => pipeTestStdout(process.stderr, chunk),
}),
);
}
}),
),
);
}
if (vendorTests?.length) {
@@ -1059,7 +1087,7 @@ async function spawnBunTest(execPath, testPath, options = { cwd }) {
const env = {
GITHUB_ACTIONS: "true", // always true so annotations are parsed
};
if (basename(execPath).includes("asan") && shouldValidateExceptions(relative(cwd, absPath))) {
if ((basename(execPath).includes("asan") || !isCI) && shouldValidateExceptions(relative(cwd, absPath))) {
env.BUN_JSC_validateExceptionChecks = "1";
}
@@ -1068,8 +1096,8 @@ async function spawnBunTest(execPath, testPath, options = { cwd }) {
cwd: options["cwd"],
timeout: isReallyTest ? timeout : 30_000,
env,
stdout: chunk => pipeTestStdout(process.stdout, chunk),
stderr: chunk => pipeTestStdout(process.stderr, chunk),
stdout: options.stdout,
stderr: options.stderr,
});
const { tests, errors, stdout: stdoutPreview } = parseTestStdout(stdout, testPath);

90
scripts/yocto-queue.mjs Normal file
View File

@@ -0,0 +1,90 @@
/**
* yocto-queue@1.2.1
* https://github.com/sindresorhus/yocto-queue
* MIT (c) Sindre Sorhus
*/
/*
How it works:
`this.#head` is an instance of `Node` which keeps track of its current value and nests another instance of `Node` that keeps the value that comes after it. When a value is provided to `.enqueue()`, the code needs to iterate through `this.#head`, going deeper and deeper to find the last value. However, iterating through every single item is slow. This problem is solved by saving a reference to the last value as `this.#tail` so that it can reference it to add a new value.
*/
class Node {
value;
next;
constructor(value) {
this.value = value;
}
}
export default class Queue {
#head;
#tail;
#size;
constructor() {
this.clear();
}
enqueue(value) {
const node = new Node(value);
if (this.#head) {
this.#tail.next = node;
this.#tail = node;
} else {
this.#head = node;
this.#tail = node;
}
this.#size++;
}
dequeue() {
const current = this.#head;
if (!current) {
return;
}
this.#head = this.#head.next;
this.#size--;
return current.value;
}
peek() {
if (!this.#head) {
return;
}
return this.#head.value;
// TODO: Node.js 18.
// return this.#head?.value;
}
clear() {
this.#head = undefined;
this.#tail = undefined;
this.#size = 0;
}
get size() {
return this.#size;
}
*[Symbol.iterator]() {
let current = this.#head;
while (current) {
yield current.value;
current = current.next;
}
}
*drain() {
while (this.#head) {
yield this.dequeue();
}
}
}