Compare commits

...

1 Commits

Author SHA1 Message Date
Jarred Sumner
68c8377d76 WIP concurrent CommonJS 2023-09-23 05:57:40 -07:00
8 changed files with 210 additions and 50 deletions

View File

@@ -1611,6 +1611,11 @@ pub const PollRef = struct {
this.status = .done;
}
pub fn disableConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void {
this.unrefConcurrently(vm);
@atomicStore(Status, &this.status, Status.done, .Monotonic);
}
/// Only intended to be used from EventLoop.Pollable
pub fn deactivate(this: *PollRef, loop: *uws.Loop) void {
if (this.status != .active)
@@ -1645,10 +1650,15 @@ pub const PollRef = struct {
/// From another thread, Prevent a poll from keeping the process alive.
pub fn unrefConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void {
if (this.status != .active)
return;
this.status = .inactive;
vm.event_loop_handle.?.unrefConcurrently();
switch (@atomicRmw(Status, &this.status, .Xchg, .inactive, .Monotonic)) {
.active => {
vm.event_loop_handle.?.unrefConcurrently();
},
.inactive => {},
.done => {
@atomicStore(Status, &this.status, .done, .Monotonic);
},
}
}
/// Prevent a poll from keeping the process alive on the next tick.
@@ -1659,14 +1669,6 @@ pub const PollRef = struct {
vm.pending_unref_counter +|= 1;
}
/// From another thread, prevent a poll from keeping the process alive on the next tick.
pub fn unrefOnNextTickConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void {
if (this.status != .active)
return;
this.status = .inactive;
_ = @atomicRmw(@TypeOf(vm.pending_unref_counter), &vm.pending_unref_counter, .Add, 1, .Monotonic);
}
/// Allow a poll to keep the process alive.
pub fn ref(this: *PollRef, vm: *JSC.VirtualMachine) void {
if (this.status != .inactive)
@@ -1677,10 +1679,15 @@ pub const PollRef = struct {
/// Allow a poll to keep the process alive.
pub fn refConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void {
if (this.status != .inactive)
return;
this.status = .active;
vm.event_loop_handle.?.refConcurrently();
switch (@atomicRmw(Status, &this.status, .Xchg, .inactive, .Monotonic)) {
.active => {},
.inactive => {
vm.event_loop_handle.?.refConcurrently();
},
.done => {
@atomicStore(Status, &this.status, .done, .Monotonic);
},
}
}
pub fn refConcurrentlyFromEventLoop(this: *PollRef, loop: *JSC.EventLoop) void {

View File

@@ -63,6 +63,13 @@ static JSC::JSInternalPromise* resolvedInternalPromise(JSC::JSGlobalObject* glob
return promise;
}
extern "C" bool ModuleLoader__moduleIDExistsInRequireMapOrESMRegistry(Zig::GlobalObject* globalObject, BunString* specifier)
{
JSC::JSValue specifierValue = Bun::toJS(globalObject, *specifier);
return globalObject->requireMap()->has(globalObject, specifierValue) || globalObject->esModuleRegistry()->has(globalObject, specifierValue);
}
// Converts an object from InternalModuleRegistry into { ...obj, default: obj }
static JSC::SyntheticSourceProvider::SyntheticSourceGenerator
generateInternalModuleSourceCode(JSC::JSGlobalObject* globalObject, InternalModuleRegistry::Field moduleId)
@@ -484,8 +491,7 @@ JSValue fetchCommonJSModule(
}
}
auto* loader = globalObject->moduleLoader();
JSMap* registry = jsCast<JSMap*>(loader->getDirect(vm, Identifier::fromString(vm, "registry"_s)));
JSMap* registry = globalObject->esModuleRegistry();
auto hasAlreadyLoadedESMVersionSoWeShouldntTranspileItTwice = [&]() -> bool {
JSValue entry = registry->get(globalObject, specifierValue);

View File

@@ -3070,6 +3070,14 @@ void GlobalObject::finishCreation(VM& vm)
init.set(map);
});
m_esModuleRegistry.initLater(
[](const JSC::LazyProperty<JSC::JSGlobalObject, JSC::JSMap>::Initializer& init) {
auto* globalObject = init.owner;
auto* loader = globalObject->moduleLoader();
JSMap* registry = jsCast<JSMap*>(loader->getDirect(init.vm, Identifier::fromString(init.vm, "registry"_s)));
init.set(registry);
});
m_encodeIntoObjectStructure.initLater(
[](const JSC::LazyProperty<JSC::JSGlobalObject, JSC::Structure>::Initializer& init) {
auto& vm = init.vm;
@@ -3796,6 +3804,7 @@ void GlobalObject::visitChildrenImpl(JSCell* cell, Visitor& visitor)
thisObject->m_utilInspectStylizeNoColorFunction.visit(visitor);
thisObject->m_lazyReadableStreamPrototypeMap.visit(visitor);
thisObject->m_requireMap.visit(visitor);
thisObject->m_esModuleRegistry.visit(visitor);
thisObject->m_encodeIntoObjectStructure.visit(visitor);
thisObject->m_JSArrayBufferControllerPrototype.visit(visitor);
thisObject->m_JSFileSinkControllerPrototype.visit(visitor);

View File

@@ -211,6 +211,7 @@ public:
JSC::JSMap* readableStreamNativeMap() { return m_lazyReadableStreamPrototypeMap.getInitializedOnMainThread(this); }
JSC::JSMap* requireMap() { return m_requireMap.getInitializedOnMainThread(this); }
JSC::JSMap* esModuleRegistry() { return m_esModuleRegistry.getInitializedOnMainThread(this); }
JSC::Structure* encodeIntoObjectStructure() { return m_encodeIntoObjectStructure.getInitializedOnMainThread(this); }
JSC::Structure* callSiteStructure() const { return m_callSiteStructure.getInitializedOnMainThread(this); }
@@ -478,6 +479,7 @@ public:
LazyProperty<JSGlobalObject, JSFunction> m_emitReadableNextTickFunction;
LazyProperty<JSGlobalObject, JSMap> m_lazyReadableStreamPrototypeMap;
LazyProperty<JSGlobalObject, JSMap> m_requireMap;
LazyProperty<JSGlobalObject, JSMap> m_esModuleRegistry;
LazyProperty<JSGlobalObject, Structure> m_encodeIntoObjectStructure;
LazyProperty<JSGlobalObject, JSObject> m_JSArrayBufferControllerPrototype;
LazyProperty<JSGlobalObject, JSObject> m_JSFileSinkControllerPrototype;

View File

@@ -343,6 +343,7 @@ const Futimes = JSC.Node.Async.futimes;
const Lchmod = JSC.Node.Async.lchmod;
const Lchown = JSC.Node.Async.lchown;
const Unlink = JSC.Node.Async.unlink;
const TranspilerJob = JSC.RuntimeTranspilerStore.TranspilerJob;
// Task.get(ReadFileTask) -> ?ReadFileTask
pub const Task = TaggedPointerUnion(.{
@@ -350,6 +351,7 @@ pub const Task = TaggedPointerUnion(.{
Microtask,
MicrotaskForDefaultGlobalObject,
AsyncTransformTask,
TranspilerJob,
ReadFileTask,
CopyFilePromiseTask,
WriteFileTask,
@@ -756,6 +758,10 @@ pub const EventLoop = struct {
var any: *Lstat = task.get(Lstat).?;
any.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(TranspilerJob))) => {
var job: *TranspilerJob = task.get(TranspilerJob).?;
job.runFromJSThread();
},
@field(Task.Tag, typeBaseName(@typeName(Fstat))) => {
var any: *Fstat = task.get(Fstat).?;
any.runFromJSThread();

View File

@@ -188,12 +188,38 @@ pub const RuntimeTranspilerStore = struct {
store: TranspilerJob.Store,
enabled: bool = true,
sync_transpilation_mutex: bun.Lock = bun.Lock.init(),
pending_transpilations: std.StringHashMap(*TranspilerJob) = .{},
pub const SyncQueue = bun.UnboundedQueue(TranspilerJob, .next);
pub fn init(allocator: std.mem.Allocator) RuntimeTranspilerStore {
return RuntimeTranspilerStore{
.store = TranspilerJob.Store.init(allocator),
};
}
fn createJob(this: *RuntimeTranspilerStore, vm: *JSC.VirtualMachine, globalObject: *JSC.JSGlobalObject, path: Fs.Path, promise: JSC.JSValue, referrer: []const u8) *TranspilerJob {
var owned_path = Fs.Path.init(bun.default_allocator.dupe(u8, path.text) catch unreachable);
var job: *TranspilerJob = this.store.get();
job.* = TranspilerJob{
.path = owned_path,
.globalThis = globalObject,
.referrer = bun.default_allocator.dupe(u8, referrer) catch unreachable,
.vm = vm,
.log = logger.Log.init(bun.default_allocator),
.loader = vm.bundler.options.loader(owned_path.name.ext),
.promise = if (promise != .zero) JSC.Strong.create(promise, globalObject) else .{},
.poll_ref = .{},
.fetcher = TranspilerJob.Fetcher{
.file = {},
},
};
return job;
}
pub fn transpile(
this: *RuntimeTranspilerStore,
vm: *JSC.VirtualMachine,
@@ -201,27 +227,69 @@ pub const RuntimeTranspilerStore = struct {
path: Fs.Path,
referrer: []const u8,
) *anyopaque {
var hash_entry = this.pending_transpilations.getOrPut(path.text) catch @panic("Out of memory");
if (hash_entry.found_existing) {
var job: *TranspilerJob = hash_entry.value_ptr.*;
if (job.generation_number == this.generation_number.load(.Monotonic)) {
var promise = job.promise.get() orelse brk: {
job.promise.set(JSC.JSValue.fromCell(JSC.JSInternalPromise.create(globalObject)), globalObject);
break :brk job.promise.get().?;
};
debug("transpile({s}) - returning existing promise", .{path.text});
job.onComplete(ModuleLoader.AsyncModule.fulfill);
return promise.asCell();
} else {
job.cancelled.store(true, .Monotonic);
debug("transpile({s}) - generation number mismatch ({d} vs {d})", .{ path.text, job.generation_number, this.generation_number.loadUnchecked() });
}
}
debug("transpile({s})", .{path.text});
var job: *TranspilerJob = this.store.get();
var owned_path = Fs.Path.init(bun.default_allocator.dupe(u8, path.text) catch unreachable);
var promise = JSC.JSInternalPromise.create(globalObject);
job.* = TranspilerJob{
.path = owned_path,
.globalThis = globalObject,
.referrer = bun.default_allocator.dupe(u8, referrer) catch unreachable,
.vm = vm,
.log = logger.Log.init(bun.default_allocator),
.loader = vm.bundler.options.loader(owned_path.name.ext),
.promise = JSC.Strong.create(JSC.JSValue.fromCell(promise), globalObject),
.poll_ref = .{},
.fetcher = TranspilerJob.Fetcher{
.file = {},
},
};
var job = this.createJob(vm, globalObject, path, JSC.JSValue.fromCell(promise), referrer);
hash_entry.value_ptr.* = job;
job.schedule();
return promise;
}
pub const RequireQueue = struct {
queue: bun.StringSet = bun.StringSet.init(bun.default_allocator),
pub fn deinit(this: *RequireQueue) void {
this.queue.deinit();
}
extern fn ModuleLoader__moduleIDExistsInRequireMapOrESMRegistry(globalObject: *JSC.JSGlobalObject, specifier: *bun.String) bool;
pub fn drain(require_queue: *RequireQueue, path: Fs.Path, vm: *JSC.VirtualMachine, globalThis: *JSC.JSGlobalObject) void {
const source_dir = path.name.dirWithTrailingSlash();
var resolver = &vm.bundler.resolver;
var store = &vm.transpiler_store;
const referrer = path.text;
for (require_queue.keys()) |module_id| {
var result = resolver.resolve(source_dir, module_id, .require) catch continue;
if (result.is_external or result.is_standalone_module) continue;
var current_path = result.path() orelse continue;
if (ModuleLoader__moduleIDExistsInRequireMapOrESMRegistry(globalThis, bun.String.init(current_path.text))) {
continue;
}
var entry = store.pending_transpilations.getOrPut(current_path.text) catch @panic("Out of memory");
if (entry.found_existing) {
continue;
}
var job = store.createJob(vm, globalThis, current_path, .zero, referrer);
entry.value_ptr.* = job;
entry.key_ptr.* = job.path.text;
job.schedule();
}
}
};
pub const TranspilerJob = struct {
path: Fs.Path,
referrer: []const u8,
@@ -236,8 +304,32 @@ pub const RuntimeTranspilerStore = struct {
parse_error: ?anyerror = null,
resolved_source: ResolvedSource = ResolvedSource{},
work_task: JSC.WorkPoolTask = .{ .callback = runFromWorkerThread },
next: ?*TranspilerJob = null,
ref_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(1),
cancelled: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
require_queue: RequireQueue = RequireQueue{},
pub const Status = enum(u32) {
pending,
};
pub const Store = bun.HiveArray(TranspilerJob, 64).Fallback;
pub fn ref(this: *TranspilerJob) void {
std.debug.assert(this.ref_count.fetchAdd(1, .Monotonic) != 0);
}
pub fn unref(this: *TranspilerJob) void {
this.cancelled.store(true, .Monotonic);
this.poll_ref.disableConcurrently(this.vm);
const prev_count = this.ref_count.fetchSub(1, .Monotonic);
const needs_deinit = prev_count == 1;
std.debug.assert(prev_count != 0);
if (needs_deinit) {
this.deinit();
}
}
pub const Fetcher = union(enum) {
virtual_module: bun.String,
@@ -250,29 +342,49 @@ pub const RuntimeTranspilerStore = struct {
}
};
pub fn deinit(this: *TranspilerJob) void {
fn deinit(this: *TranspilerJob) void {
_ = this.vm.transpiler_store.pending_transpilations.remove(this.path.text);
bun.default_allocator.free(this.path.text);
bun.default_allocator.free(this.referrer);
this.poll_ref.disable();
this.fetcher.deinit();
this.loader = options.Loader.file;
this.path = Fs.Path.empty;
this.log.deinit();
this.promise.deinit();
this.require_queue.deinit();
this.globalThis = undefined;
this.resolved_source.source_code.deref();
this.resolved_source.specifier.deref();
this.vm.transpiler_store.store.put(this);
}
threadlocal var ast_memory_store: ?*js_ast.ASTMemoryAllocator = null;
threadlocal var source_code_printer: ?*js_printer.BufferPrinter = null;
pub fn dispatchToMainThread(this: *TranspilerJob) void {
this.vm.eventLoop().enqueueTaskConcurrent(
JSC.ConcurrentTask.fromCallback(this, runFromJSThread),
);
this.vm.eventLoop().enqueueTaskConcurrent(JSC.ConcurrentTask.create(JSC.Task.init(this)));
}
pub fn runFromJSThread(this: *TranspilerJob) void {
if (this.isCancelled()) {
this.unref();
return;
}
this.onComplete(ModuleLoader.AsyncModule.fulfill);
}
fn drainRequireQueue(this: *TranspilerJob) void {
if (!this.path.isFile()) {
return;
}
this.require_queue.drain(this.path, this.vm, this.globalThis);
this.require_queue.deinit();
this.require_queue = .{};
}
pub fn onComplete(this: *TranspilerJob, comptime fulfill: anytype) void {
var vm = this.vm;
var promise = this.promise.swap();
var globalThis = this.globalThis;
@@ -282,6 +394,7 @@ pub const RuntimeTranspilerStore = struct {
var log = this.log;
this.log = logger.Log.init(bun.default_allocator);
var resolved_source = this.resolved_source;
this.resolved_source = ResolvedSource{};
resolved_source.source_url = specifier.toZigString();
resolved_source.tag = brk: {
@@ -303,18 +416,19 @@ pub const RuntimeTranspilerStore = struct {
};
const parse_error = this.parse_error;
if (!vm.transpiler_store.store.hive.in(this)) {
this.promise.deinit();
if (parse_error == null) {
this.drainRequireQueue();
}
this.deinit();
_ = vm.transpiler_store.store.hive.put(this);
this.unref();
ModuleLoader.AsyncModule.fulfill(globalThis, promise, resolved_source, parse_error, specifier, referrer, &log);
fulfill(globalThis, promise, resolved_source, parse_error, specifier, referrer, &log);
}
pub fn schedule(this: *TranspilerJob) void {
this.poll_ref.ref(this.vm);
this.ref();
JSC.WorkPool.schedule(&this.work_task);
}
@@ -322,15 +436,21 @@ pub const RuntimeTranspilerStore = struct {
@fieldParentPtr(TranspilerJob, "work_task", work_task).run();
}
pub fn run(this: *TranspilerJob) void {
var arena = bun.ArenaAllocator.init(bun.default_allocator);
defer arena.deinit();
inline fn isCancelled(this: *const TranspilerJob) bool {
return this.cancelled.load(.Monotonic) or this.generation_number != this.vm.transpiler_store.generation_number.load(.Monotonic);
}
pub fn run(this: *TranspilerJob) void {
defer this.dispatchToMainThread();
if (this.generation_number != this.vm.transpiler_store.generation_number.load(.Monotonic)) {
this.parse_error = error.TranspilerJobGenerationMismatch;
if (this.isCancelled()) {
// Allow the job to be freed from the main thread
return;
}
var vm = this.vm;
var arena = bun.ArenaAllocator.init(bun.default_allocator);
defer arena.deinit();
if (ast_memory_store == null) {
ast_memory_store = bun.default_allocator.create(js_ast.ASTMemoryAllocator) catch @panic("out of memory!");
@@ -349,7 +469,6 @@ pub const RuntimeTranspilerStore = struct {
const loader = this.loader;
this.log = logger.Log.init(bun.default_allocator);
var vm = this.vm;
var bundler: bun.Bundler = undefined;
bundler = vm.bundler;
var allocator = arena.allocator();
@@ -516,6 +635,13 @@ pub const RuntimeTranspilerStore = struct {
if (strings.eqlComptime(import_record.path.text, "test")) {
import_record.tag = .bun_test;
}
continue;
}
if (import_record.is_top_level_require and import_record.path.isFile()) {
std.debug.assert(import_record.kind == .require);
this.require_queue.queue.insert(import_record.path.text) catch continue;
}
}

View File

@@ -166,6 +166,9 @@ pub const ImportRecord = struct {
/// If true, this import can be removed if it's unused
is_external_without_side_effects: bool = false,
/// Used for async transpilation of CommonJS modules at runtime
is_top_level_require: bool = false,
kind: ImportKind,
tag: Tag = Tag.none,

View File

@@ -11669,6 +11669,7 @@ fn NewParser_(
.kind = kind,
.range = range,
.path = path,
.is_top_level_require = kind == .require and p.current_scope.parent == null,
};
p.import_records.append(record) catch unreachable;
return @as(u32, @intCast(index));