stuff to make it work with mini event loop

This commit is contained in:
Zack Radisic
2024-03-06 16:06:54 -08:00
parent fe8b034284
commit 005be7cc0c
3 changed files with 75 additions and 21 deletions

View File

@@ -359,6 +359,7 @@ const ShellMkdirTask = bun.shell.Interpreter.Builtin.Mkdir.ShellMkdirTask;
const ShellTouchTask = bun.shell.Interpreter.Builtin.Touch.ShellTouchTask;
// const ShellIOReaderAsyncDeinit = bun.shell.Interpreter.IOReader.AsyncDeinit;
const ShellIOReaderAsyncDeinit = bun.shell.Interpreter.AsyncDeinit;
const ShellIOWriterAsyncDeinit = bun.shell.Interpreter.AsyncDeinitWriter;
const TimerReference = JSC.BunTimer.Timeout.TimerReference;
const ProcessWaiterThreadTask = if (Environment.isPosix) bun.spawn.WaiterThread.ProcessQueue.ResultTask else opaque {};
const ProcessMiniEventLoopWaiterThreadTask = if (Environment.isPosix) bun.spawn.WaiterThread.ProcessMiniEventLoopQueue.ResultTask else opaque {};
@@ -373,6 +374,7 @@ pub const Task = TaggedPointerUnion(.{
AnyTask,
ManagedTask,
ShellIOReaderAsyncDeinit,
ShellIOWriterAsyncDeinit,
napi_async_work,
ThreadSafeFunction,
CppTask,
@@ -880,6 +882,11 @@ pub const EventLoop = struct {
while (@field(this, queue_name).readItem()) |task| {
defer counter += 1;
switch (task.tag()) {
@field(Task.Tag, typeBaseName(@typeName(ShellIOWriterAsyncDeinit))) => {
var shell_ls_task: *ShellIOWriterAsyncDeinit = task.get(ShellIOWriterAsyncDeinit).?;
shell_ls_task.runFromMainThread();
// shell_ls_task.deinit();
},
@field(Task.Tag, typeBaseName(@typeName(ShellIOReaderAsyncDeinit))) => {
var shell_ls_task: *ShellIOReaderAsyncDeinit = task.get(ShellIOReaderAsyncDeinit).?;
shell_ls_task.runFromMainThread();
@@ -1691,11 +1698,13 @@ pub const MiniEventLoop = struct {
pipe_read_buffer: ?*PipeReadBuffer = null,
const PipeReadBuffer = [256 * 1024]u8;
pub threadlocal var globalInitialized: bool = false;
pub threadlocal var global: *MiniEventLoop = undefined;
pub const ConcurrentTaskQueue = UnboundedQueue(AnyTaskWithExtraContext, .next);
pub fn initGlobal(env: ?*bun.DotEnv.Loader) *MiniEventLoop {
if (globalInitialized) return global;
const loop = MiniEventLoop.init(bun.default_allocator);
global = bun.default_allocator.create(MiniEventLoop) catch bun.outOfMemory();
global.* = loop;
@@ -1707,6 +1716,7 @@ pub const MiniEventLoop = struct {
loader.* = bun.DotEnv.Loader.init(map, bun.default_allocator);
break :env_loader loader;
};
globalInitialized = true;
return global;
}
@@ -1750,11 +1760,17 @@ pub const MiniEventLoop = struct {
pub fn init(
allocator: std.mem.Allocator,
) MiniEventLoop {
return .{
var mini = MiniEventLoop{
.tasks = Queue.init(allocator),
.allocator = allocator,
.loop = uws.Loop.get(),
};
if (comptime Environment.isWindows) {
mini.loop.uv_loop = bun.windows.libuv.Loop.get();
}
return mini;
}
pub fn deinit(this: *MiniEventLoop) void {

View File

@@ -305,26 +305,25 @@ pub const RunCommand = struct {
}
if (Environment.isWindows and !use_native_shell) {
@panic("TODO: Windows shell support");
// if (!silent) {
// if (Environment.isDebug) {
// Output.prettyError("[bun shell] ", .{});
// }
// Output.prettyErrorln("<r><d><magenta>$<r> <d><b>{s}<r>", .{combined_script});
// Output.flush();
// }
if (!silent) {
if (Environment.isDebug) {
Output.prettyError("[bun shell] ", .{});
}
Output.prettyErrorln("<r><d><magenta>$<r> <d><b>{s}<r>", .{combined_script});
Output.flush();
}
// const mini = bun.JSC.MiniEventLoop.initGlobal(env);
// bun.shell.ShellSubprocessMini.initAndRunFromSource(mini, name, combined_script) catch |err| {
// if (!silent) {
// Output.prettyErrorln("<r><red>error<r>: Failed to run script <b>{s}<r> due to error <b>{s}<r>", .{ name, @errorName(err) });
// }
const mini = bun.JSC.MiniEventLoop.initGlobal(env);
bun.shell.Interpreter.initAndRunFromSource(mini, name, combined_script) catch |err| {
if (!silent) {
Output.prettyErrorln("<r><red>error<r>: Failed to run script <b>{s}<r> due to error <b>{s}<r>", .{ name, @errorName(err) });
}
// Output.flush();
// Global.exit(1);
// };
Output.flush();
Global.exit(1);
};
// return true;
return true;
}
var argv = [_]string{

View File

@@ -1220,7 +1220,7 @@ pub const Interpreter = struct {
};
const script_heap = try arena.allocator().create(ast.Script);
script_heap.* = script;
var interp = switch (ThisInterpreter.init(mini, bun.default_allocator, &arena, script_heap, jsobjs)) {
var interp = switch (ThisInterpreter.init(.{ .mini = mini }, bun.default_allocator, &arena, script_heap, jsobjs)) {
.err => |*e| {
throwShellErr(e, .{ .mini = mini });
return;
@@ -1239,6 +1239,7 @@ pub const Interpreter = struct {
interp.done = &is_done.done;
try interp.run();
mini.tick(&is_done, @as(fn (*anyopaque) bool, IsDone.isDone));
interp.deinit();
}
pub fn run(this: *ThisInterpreter) !void {
@@ -8737,6 +8738,37 @@ pub const Interpreter = struct {
pub const Readers = SmolList(ChildPtr, 4);
};
pub const AsyncDeinitWriter = struct {
task: WorkPoolTask = .{ .callback = &runFromThreadPool },
pub fn runFromThreadPool(task: *WorkPoolTask) void {
var this = @fieldParentPtr(@This(), "task", task);
var iowriter = this.writer();
if (iowriter.evtloop == .js) {
iowriter.evtloop.js.enqueueTaskConcurrent(iowriter.concurrent_task.js.from(this, .manual_deinit));
} else {
iowriter.evtloop.mini.enqueueTaskConcurrent(iowriter.concurrent_task.mini.from(this, "runFromMainThreadMini"));
}
}
pub fn writer(this: *@This()) *IOWriter {
return @fieldParentPtr(IOWriter, "async_deinit", this);
}
pub fn runFromMainThread(this: *@This()) void {
const ioreader = @fieldParentPtr(IOWriter, "async_deinit", this);
ioreader.__deinit();
}
pub fn runFromMainThreadMini(this: *@This(), _: *void) void {
this.runFromMainThread();
}
pub fn schedule(this: *@This()) void {
WorkPool.schedule(&this.task);
}
};
pub const AsyncDeinit = struct {
task: WorkPoolTask = .{ .callback = &runFromThreadPool },
@@ -8780,7 +8812,9 @@ pub const Interpreter = struct {
ref_count: u32 = 1,
err: ?JSC.SystemError = null,
evtloop: JSC.EventLoopHandle,
concurrent_task: JSC.EventLoopTask,
is_writing: if (bun.Environment.isWindows) bool else u0 = if (bun.Environment.isWindows) false else 0,
async_deinit: AsyncDeinitWriter = .{},
pub const DEBUG_REFCOUNT_NAME: []const u8 = "IOWriterRefCount";
@@ -8796,7 +8830,7 @@ pub const Interpreter = struct {
pub const auto_poll = false;
usingnamespace bun.NewRefCounted(@This(), This.deinit);
usingnamespace bun.NewRefCounted(@This(), asyncDeinit);
const This = @This();
pub const WriterImpl = bun.io.BufferedWriter(
This,
@@ -8820,6 +8854,7 @@ pub const Interpreter = struct {
const this = IOWriter.new(.{
.fd = fd,
.evtloop = evtloop,
.concurrent_task = JSC.EventLoopTask.fromEventLoop(evtloop),
});
this.writer.parent = this;
@@ -9083,7 +9118,11 @@ pub const Interpreter = struct {
this.write();
}
pub fn deinit(this: *This) void {
pub fn asyncDeinit(this: *@This()) void {
this.async_deinit.schedule();
}
pub fn __deinit(this: *This) void {
print("IOWriter(0x{x}, fd={}) deinit", .{ @intFromPtr(this), this.fd });
if (bun.Environment.allow_assert) std.debug.assert(this.ref_count == 0);
this.buf.deinit(bun.default_allocator);