start to implement native streams

This commit is contained in:
Jarred Sumner
2022-05-23 07:18:33 -07:00
parent c362729186
commit 7f3bc2b9e6
32 changed files with 1853 additions and 1255 deletions

View File

@@ -84,6 +84,7 @@ const Config = @import("./config.zig");
const URL = @import("../../url.zig").URL;
const Transpiler = @import("./api/transpiler.zig");
const Bun = JSC.API.Bun;
const EventLoop = JSC.EventLoop;
const ThreadSafeFunction = JSC.napi.ThreadSafeFunction;
pub const GlobalConstructors = [_]type{
WebCore.Blob.Constructor,
@@ -109,6 +110,8 @@ pub const GlobalClasses = [_]type{
// The last item in this array becomes "process.env"
Bun.EnvironmentVariables.Class,
};
const TaggedPointerUnion = @import("../../tagged_pointer.zig").TaggedPointerUnion;
const Task = JSC.Task;
const Blob = @import("../../blob.zig");
pub const Buffer = MarkedArrayBuffer;
const Lock = @import("../../lock.zig").Lock;
@@ -125,244 +128,6 @@ pub fn OpaqueWrap(comptime Context: type, comptime Function: fn (this: *Context)
const bun_file_import_path = "/node_modules.server.bun";
const FetchTasklet = Fetch.FetchTasklet;
const TaggedPointerUnion = @import("../../tagged_pointer.zig").TaggedPointerUnion;
const WorkPool = @import("../../work_pool.zig").WorkPool;
const WorkPoolTask = @import("../../work_pool.zig").Task;
pub fn ConcurrentPromiseTask(comptime Context: type) type {
return struct {
const This = @This();
ctx: *Context,
task: WorkPoolTask = .{ .callback = runFromThreadPool },
event_loop: *VirtualMachine.EventLoop,
allocator: std.mem.Allocator,
promise: JSValue,
globalThis: *JSGlobalObject,
pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
var this = try allocator.create(This);
this.* = .{
.event_loop = VirtualMachine.vm.event_loop,
.ctx = value,
.allocator = allocator,
.promise = JSValue.createInternalPromise(globalThis),
.globalThis = globalThis,
};
js.JSValueProtect(globalThis.ref(), this.promise.asObjectRef());
VirtualMachine.vm.active_tasks +|= 1;
return this;
}
pub fn runFromThreadPool(task: *WorkPoolTask) void {
var this = @fieldParentPtr(This, "task", task);
Context.run(this.ctx);
this.onFinish();
}
pub fn runFromJS(this: This) void {
var promise_value = this.promise;
var promise = promise_value.asInternalPromise() orelse {
if (comptime @hasDecl(Context, "deinit")) {
@call(.{}, Context.deinit, .{this.ctx});
}
return;
};
var ctx = this.ctx;
js.JSValueUnprotect(this.globalThis.ref(), promise_value.asObjectRef());
ctx.then(promise);
}
pub fn schedule(this: *This) void {
WorkPool.schedule(&this.task);
}
pub fn onFinish(this: *This) void {
this.event_loop.enqueueTaskConcurrent(Task.init(this));
}
pub fn deinit(this: *This) void {
this.allocator.destroy(this);
}
};
}
pub fn SerialPromiseTask(comptime Context: type) type {
return struct {
const SerialWorkPool = @import("../../work_pool.zig").NewWorkPool(1);
const This = @This();
ctx: *Context,
task: WorkPoolTask = .{ .callback = runFromThreadPool },
event_loop: *VirtualMachine.EventLoop,
allocator: std.mem.Allocator,
promise: JSValue,
globalThis: *JSGlobalObject,
pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
var this = try allocator.create(This);
this.* = .{
.event_loop = VirtualMachine.vm.event_loop,
.ctx = value,
.allocator = allocator,
.promise = JSValue.createInternalPromise(globalThis),
.globalThis = globalThis,
};
js.JSValueProtect(globalThis.ref(), this.promise.asObjectRef());
VirtualMachine.vm.active_tasks +|= 1;
return this;
}
pub fn runFromThreadPool(task: *WorkPoolTask) void {
var this = @fieldParentPtr(This, "task", task);
Context.run(this.ctx);
this.onFinish();
}
pub fn runFromJS(this: This) void {
var promise_value = this.promise;
var promise = promise_value.asInternalPromise() orelse {
if (comptime @hasDecl(Context, "deinit")) {
@call(.{}, Context.deinit, .{this.ctx});
}
return;
};
var ctx = this.ctx;
js.JSValueUnprotect(this.globalThis.ref(), promise_value.asObjectRef());
ctx.then(promise, this.globalThis);
}
pub fn schedule(this: *This) void {
SerialWorkPool.schedule(&this.task);
}
pub fn onFinish(this: *This) void {
this.event_loop.enqueueTaskConcurrent(Task.init(this));
}
pub fn deinit(this: *This) void {
this.allocator.destroy(this);
}
};
}
pub fn IOTask(comptime Context: type) type {
return struct {
const This = @This();
ctx: *Context,
task: NetworkThread.Task = .{ .callback = runFromThreadPool },
event_loop: *VirtualMachine.EventLoop,
allocator: std.mem.Allocator,
globalThis: *JSGlobalObject,
pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
var this = try allocator.create(This);
this.* = .{
.event_loop = VirtualMachine.vm.eventLoop(),
.ctx = value,
.allocator = allocator,
.globalThis = globalThis,
};
return this;
}
pub fn runFromThreadPool(task: *NetworkThread.Task) void {
var this = @fieldParentPtr(This, "task", task);
Context.run(this.ctx, this);
}
pub fn runFromJS(this: This) void {
var ctx = this.ctx;
ctx.then(this.globalThis);
}
pub fn schedule(this: *This) void {
NetworkThread.init() catch return;
NetworkThread.global.pool.schedule(NetworkThread.Batch.from(&this.task));
}
pub fn onFinish(this: *This) void {
this.event_loop.enqueueTaskConcurrent(Task.init(this));
}
pub fn deinit(this: *This) void {
var allocator = this.allocator;
this.* = undefined;
allocator.destroy(this);
}
};
}
pub fn AsyncNativeCallbackTask(comptime Context: type) type {
return struct {
const This = @This();
ctx: *Context,
task: WorkPoolTask = .{ .callback = runFromThreadPool },
event_loop: *VirtualMachine.EventLoop,
allocator: std.mem.Allocator,
globalThis: *JSGlobalObject,
pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
var this = try allocator.create(This);
this.* = .{
.event_loop = VirtualMachine.vm.eventLoop(),
.ctx = value,
.allocator = allocator,
.globalThis = globalThis,
};
return this;
}
pub fn runFromThreadPool(task: *WorkPoolTask) void {
var this = @fieldParentPtr(This, "task", task);
Context.run(this.ctx, this);
}
pub fn runFromJS(this: This) void {
this.ctx.runFromJS(this.globalThis);
}
pub fn schedule(this: *This) void {
WorkPool.get().schedule(WorkPool.schedule(&this.task));
}
pub fn onFinish(this: *This) void {
this.event_loop.enqueueTaskConcurrent(Task.init(this));
}
pub fn deinit(this: *This) void {
var allocator = this.allocator;
this.* = undefined;
allocator.destroy(this);
}
};
}
const CopyFilePromiseTask = WebCore.Blob.Store.CopyFile.CopyFilePromiseTask;
const AsyncTransformTask = @import("./api/transpiler.zig").TransformTask.AsyncTransformTask;
const BunTimerTimeoutTask = Bun.Timer.Timeout.TimeoutTask;
const ReadFileTask = WebCore.Blob.Store.ReadFile.ReadFileTask;
const WriteFileTask = WebCore.Blob.Store.WriteFile.WriteFileTask;
const napi_async_work = JSC.napi.napi_async_work;
// const PromiseTask = JSInternalPromise.Completion.PromiseTask;
pub const Task = TaggedPointerUnion(.{
FetchTasklet,
Microtask,
AsyncTransformTask,
BunTimerTimeoutTask,
ReadFileTask,
CopyFilePromiseTask,
WriteFileTask,
AnyTask,
napi_async_work,
ThreadSafeFunction,
// PromiseTask,
// TimeoutTasklet,
});
const SourceMap = @import("../../sourcemap/sourcemap.zig");
const MappingList = SourceMap.Mapping.List;
@@ -480,31 +245,6 @@ pub const SavedSourceMap = struct {
};
const uws = @import("uws");
pub const AnyTask = struct {
ctx: ?*anyopaque,
callback: fn (*anyopaque) void,
pub fn run(this: *AnyTask) void {
@setRuntimeSafety(false);
this.callback(this.ctx.?);
}
pub fn New(comptime Type: type, comptime Callback: anytype) type {
return struct {
pub fn init(ctx: *Type) AnyTask {
return AnyTask{
.callback = wrap,
.ctx = ctx,
};
}
pub fn wrap(this: ?*anyopaque) void {
Callback(@ptrCast(*Type, @alignCast(@alignOf(Type), this.?)));
}
};
}
};
pub export fn Bun__getDefaultGlobal() *JSGlobalObject {
return JSC.VirtualMachine.vm.global;
}
@@ -575,7 +315,7 @@ pub const VirtualMachine = struct {
response_objects_pool: ?*Response.Pool = null,
rare_data: ?*JSC.RareData = null,
poller: JSC.WebCore.Poller = JSC.WebCore.Poller{},
poller: JSC.Poller = JSC.Poller{},
pub fn io(this: *VirtualMachine) *IO {
if (this.io_ == null) {
@@ -615,162 +355,6 @@ pub const VirtualMachine = struct {
}
}
pub const EventLoop = struct {
ready_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
pending_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
io_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
tasks: Queue = undefined,
concurrent_tasks: Queue = undefined,
concurrent_lock: Lock = Lock.init(),
global: *JSGlobalObject = undefined,
virtual_machine: *VirtualMachine = undefined,
pub const Queue = std.fifo.LinearFifo(Task, .Dynamic);
pub fn tickWithCount(this: *EventLoop) u32 {
var finished: u32 = 0;
var global = this.global;
var vm_ = this.virtual_machine;
while (this.tasks.readItem()) |task| {
switch (task.tag()) {
.Microtask => {
var micro: *Microtask = task.as(Microtask);
micro.run(global);
finished += 1;
},
.FetchTasklet => {
var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?;
fetch_task.onDone();
finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(AsyncTransformTask)) => {
var transform_task: *AsyncTransformTask = task.get(AsyncTransformTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(CopyFilePromiseTask)) => {
var transform_task: *CopyFilePromiseTask = task.get(CopyFilePromiseTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(JSC.napi.napi_async_work)) => {
var transform_task: *JSC.napi.napi_async_work = task.get(JSC.napi.napi_async_work).?;
transform_task.*.runFromJS();
finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(BunTimerTimeoutTask)) => {
var transform_task: *BunTimerTimeoutTask = task.get(BunTimerTimeoutTask).?;
transform_task.*.runFromJS();
finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(ReadFileTask)) => {
var transform_task: *ReadFileTask = task.get(ReadFileTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(WriteFileTask)) => {
var transform_task: *WriteFileTask = task.get(WriteFileTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(AnyTask)) => {
var any: *AnyTask = task.get(AnyTask).?;
any.run();
finished += 1;
vm_.active_tasks -|= 1;
},
else => unreachable,
}
}
if (finished > 0) {
_ = this.pending_tasks_count.fetchSub(finished, .Monotonic);
}
return finished;
}
pub fn tickConcurrent(this: *EventLoop) void {
if (this.ready_tasks_count.load(.Monotonic) > 0) {
this.concurrent_lock.lock();
defer this.concurrent_lock.unlock();
const add: u32 = @truncate(u32, this.concurrent_tasks.readableLength());
// TODO: optimzie
this.tasks.ensureUnusedCapacity(add) catch unreachable;
{
this.tasks.writeAssumeCapacity(this.concurrent_tasks.readableSlice(0));
this.concurrent_tasks.discard(this.concurrent_tasks.count);
}
_ = this.pending_tasks_count.fetchAdd(add, .Monotonic);
_ = this.ready_tasks_count.fetchSub(add, .Monotonic);
}
}
// TODO: fix this technical debt
pub fn tick(this: *EventLoop) void {
var poller = &this.virtual_machine.poller;
while (true) {
this.tickConcurrent();
// this.global.vm().doWork();
while (this.tickWithCount() > 0) {}
poller.tick();
this.tickConcurrent();
if (this.tickWithCount() == 0) break;
}
}
// TODO: fix this technical debt
pub fn waitForPromise(this: *EventLoop, promise: *JSC.JSInternalPromise) void {
switch (promise.status(this.global.vm())) {
JSC.JSPromise.Status.Pending => {
while (promise.status(this.global.vm()) == .Pending) {
this.tick();
}
},
else => {},
}
}
pub fn waitForTasks(this: *EventLoop) void {
this.tick();
while (this.pending_tasks_count.load(.Monotonic) > 0) {
this.tick();
}
}
pub fn enqueueTask(this: *EventLoop, task: Task) void {
_ = this.pending_tasks_count.fetchAdd(1, .Monotonic);
this.tasks.writeItem(task) catch unreachable;
}
pub fn enqueueTaskConcurrent(this: *EventLoop, task: Task) void {
this.concurrent_lock.lock();
defer this.concurrent_lock.unlock();
this.concurrent_tasks.writeItem(task) catch unreachable;
if (this.virtual_machine.uws_event_loop) |loop| {
loop.nextTick(*EventLoop, this, EventLoop.tick);
}
_ = this.ready_tasks_count.fetchAdd(1, .Monotonic);
}
};
pub inline fn enqueueTask(this: *VirtualMachine, task: Task) void {
this.eventLoop().enqueueTask(task);
}