Web Streams API (#176)

* [bun.js] `WritableStream`, `ReadableStream`, `TransformStream`, `WritableStreamDefaultController`, `ReadableStreamDefaultController` & more

* Implement `Blob.stream()`

* Update streams.test.js

* Fix sourcemaps crash

* [TextEncoder] 3x faster in hot loops

* reading almost works

* start to implement native streams

* Implement `Blob.stream()`

* Implement `Bun.file(pathOrFd).stream()`

* Add an extra function

* [fs.readFile] Improve performance

* make jsc bindings a little easier to work with

* fix segfault

* faster async/await + readablestream optimizations

* WebKit updates

* More WebKit updates

* Add releaseWEakrefs binding

* `bun:jsc`

* More streams

* Update streams.test.js

* Update Makefile

* Update mimalloc

* Update WebKit

* Create bun-jsc.test.js

* Faster ReadableStream

* Fix off by one & exceptions

* Handle empty files/blobs

* Update streams.test.js

* Move streams to it's own file

* temp

* impl #1

* take two

* good enough for now

* Implement `readableStreamToArray`, `readableStreamToArrayBuffer`, `concatArrayBuffers`

* jsxOptimizationInlining

* Fix crash

* Add `jsxOptimizationInline` to Bun.Transpiler

* Update Transpiler types

* Update js_ast.zig

* Automatically choose production mode when NODE_ENV="production"

* Update cli.zig

* [jsx] Handle defaultProps when inlining

* Update transpiler.test.js

* uncomment some tests

Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
This commit is contained in:
Jarred Sumner
2022-06-07 22:32:46 -07:00
committed by GitHub
parent 958fc3d4f5
commit 43de33afc7
247 changed files with 32127 additions and 2037 deletions

View File

@@ -15,7 +15,7 @@ const StoredFileDescriptorType = bun.StoredFileDescriptorType;
const Arena = @import("../../mimalloc_arena.zig").Arena;
const C = bun.C;
const NetworkThread = @import("http").NetworkThread;
const IO = @import("io");
pub fn zigCast(comptime Destination: type, value: anytype) *Destination {
return @ptrCast(*Destination, @alignCast(@alignOf(*Destination), value));
}
@@ -84,21 +84,23 @@ const Config = @import("./config.zig");
const URL = @import("../../url.zig").URL;
const Transpiler = @import("./api/transpiler.zig");
const Bun = JSC.API.Bun;
const EventLoop = JSC.EventLoop;
const ThreadSafeFunction = JSC.napi.ThreadSafeFunction;
pub const GlobalConstructors = [_]type{
WebCore.Blob.Constructor,
WebCore.TextDecoder.Constructor,
WebCore.TextEncoder.Constructor,
// WebCore.TextEncoder.Constructor,
Request.Constructor,
Response.Constructor,
JSC.Cloudflare.HTMLRewriter.Constructor,
};
pub const GlobalClasses = [_]type{
Bun.Class,
EventListenerMixin.addEventListener(VirtualMachine),
BuildError.Class,
ResolveError.Class,
Bun.Class,
Fetch.Class,
js_ast.Macro.JSNode.BunJSXCallbackFunction,
WebCore.Performance.Class,
@@ -109,6 +111,8 @@ pub const GlobalClasses = [_]type{
// The last item in this array becomes "process.env"
Bun.EnvironmentVariables.Class,
};
const TaggedPointerUnion = @import("../../tagged_pointer.zig").TaggedPointerUnion;
const Task = JSC.Task;
const Blob = @import("../../blob.zig");
pub const Buffer = MarkedArrayBuffer;
const Lock = @import("../../lock.zig").Lock;
@@ -125,244 +129,6 @@ pub fn OpaqueWrap(comptime Context: type, comptime Function: fn (this: *Context)
const bun_file_import_path = "/node_modules.server.bun";
const FetchTasklet = Fetch.FetchTasklet;
const TaggedPointerUnion = @import("../../tagged_pointer.zig").TaggedPointerUnion;
const WorkPool = @import("../../work_pool.zig").WorkPool;
const WorkPoolTask = @import("../../work_pool.zig").Task;
pub fn ConcurrentPromiseTask(comptime Context: type) type {
return struct {
const This = @This();
ctx: *Context,
task: WorkPoolTask = .{ .callback = runFromThreadPool },
event_loop: *VirtualMachine.EventLoop,
allocator: std.mem.Allocator,
promise: JSValue,
globalThis: *JSGlobalObject,
pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
var this = try allocator.create(This);
this.* = .{
.event_loop = VirtualMachine.vm.event_loop,
.ctx = value,
.allocator = allocator,
.promise = JSValue.createInternalPromise(globalThis),
.globalThis = globalThis,
};
js.JSValueProtect(globalThis.ref(), this.promise.asObjectRef());
VirtualMachine.vm.active_tasks +|= 1;
return this;
}
pub fn runFromThreadPool(task: *WorkPoolTask) void {
var this = @fieldParentPtr(This, "task", task);
Context.run(this.ctx);
this.onFinish();
}
pub fn runFromJS(this: This) void {
var promise_value = this.promise;
var promise = promise_value.asInternalPromise() orelse {
if (comptime @hasDecl(Context, "deinit")) {
@call(.{}, Context.deinit, .{this.ctx});
}
return;
};
var ctx = this.ctx;
js.JSValueUnprotect(this.globalThis.ref(), promise_value.asObjectRef());
ctx.then(promise);
}
pub fn schedule(this: *This) void {
WorkPool.schedule(&this.task);
}
pub fn onFinish(this: *This) void {
this.event_loop.enqueueTaskConcurrent(Task.init(this));
}
pub fn deinit(this: *This) void {
this.allocator.destroy(this);
}
};
}
pub fn SerialPromiseTask(comptime Context: type) type {
return struct {
const SerialWorkPool = @import("../../work_pool.zig").NewWorkPool(1);
const This = @This();
ctx: *Context,
task: WorkPoolTask = .{ .callback = runFromThreadPool },
event_loop: *VirtualMachine.EventLoop,
allocator: std.mem.Allocator,
promise: JSValue,
globalThis: *JSGlobalObject,
pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
var this = try allocator.create(This);
this.* = .{
.event_loop = VirtualMachine.vm.event_loop,
.ctx = value,
.allocator = allocator,
.promise = JSValue.createInternalPromise(globalThis),
.globalThis = globalThis,
};
js.JSValueProtect(globalThis.ref(), this.promise.asObjectRef());
VirtualMachine.vm.active_tasks +|= 1;
return this;
}
pub fn runFromThreadPool(task: *WorkPoolTask) void {
var this = @fieldParentPtr(This, "task", task);
Context.run(this.ctx);
this.onFinish();
}
pub fn runFromJS(this: This) void {
var promise_value = this.promise;
var promise = promise_value.asInternalPromise() orelse {
if (comptime @hasDecl(Context, "deinit")) {
@call(.{}, Context.deinit, .{this.ctx});
}
return;
};
var ctx = this.ctx;
js.JSValueUnprotect(this.globalThis.ref(), promise_value.asObjectRef());
ctx.then(promise, this.globalThis);
}
pub fn schedule(this: *This) void {
SerialWorkPool.schedule(&this.task);
}
pub fn onFinish(this: *This) void {
this.event_loop.enqueueTaskConcurrent(Task.init(this));
}
pub fn deinit(this: *This) void {
this.allocator.destroy(this);
}
};
}
pub fn IOTask(comptime Context: type) type {
return struct {
const This = @This();
ctx: *Context,
task: NetworkThread.Task = .{ .callback = runFromThreadPool },
event_loop: *VirtualMachine.EventLoop,
allocator: std.mem.Allocator,
globalThis: *JSGlobalObject,
pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
var this = try allocator.create(This);
this.* = .{
.event_loop = VirtualMachine.vm.eventLoop(),
.ctx = value,
.allocator = allocator,
.globalThis = globalThis,
};
return this;
}
pub fn runFromThreadPool(task: *NetworkThread.Task) void {
var this = @fieldParentPtr(This, "task", task);
Context.run(this.ctx, this);
}
pub fn runFromJS(this: This) void {
var ctx = this.ctx;
ctx.then(this.globalThis);
}
pub fn schedule(this: *This) void {
NetworkThread.init() catch return;
NetworkThread.global.pool.schedule(NetworkThread.Batch.from(&this.task));
}
pub fn onFinish(this: *This) void {
this.event_loop.enqueueTaskConcurrent(Task.init(this));
}
pub fn deinit(this: *This) void {
var allocator = this.allocator;
this.* = undefined;
allocator.destroy(this);
}
};
}
pub fn AsyncNativeCallbackTask(comptime Context: type) type {
return struct {
const This = @This();
ctx: *Context,
task: WorkPoolTask = .{ .callback = runFromThreadPool },
event_loop: *VirtualMachine.EventLoop,
allocator: std.mem.Allocator,
globalThis: *JSGlobalObject,
pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
var this = try allocator.create(This);
this.* = .{
.event_loop = VirtualMachine.vm.eventLoop(),
.ctx = value,
.allocator = allocator,
.globalThis = globalThis,
};
return this;
}
pub fn runFromThreadPool(task: *WorkPoolTask) void {
var this = @fieldParentPtr(This, "task", task);
Context.run(this.ctx, this);
}
pub fn runFromJS(this: This) void {
this.ctx.runFromJS(this.globalThis);
}
pub fn schedule(this: *This) void {
WorkPool.get().schedule(WorkPool.schedule(&this.task));
}
pub fn onFinish(this: *This) void {
this.event_loop.enqueueTaskConcurrent(Task.init(this));
}
pub fn deinit(this: *This) void {
var allocator = this.allocator;
this.* = undefined;
allocator.destroy(this);
}
};
}
const CopyFilePromiseTask = WebCore.Blob.Store.CopyFile.CopyFilePromiseTask;
const AsyncTransformTask = @import("./api/transpiler.zig").TransformTask.AsyncTransformTask;
const BunTimerTimeoutTask = Bun.Timer.Timeout.TimeoutTask;
const ReadFileTask = WebCore.Blob.Store.ReadFile.ReadFileTask;
const WriteFileTask = WebCore.Blob.Store.WriteFile.WriteFileTask;
const napi_async_work = JSC.napi.napi_async_work;
// const PromiseTask = JSInternalPromise.Completion.PromiseTask;
pub const Task = TaggedPointerUnion(.{
FetchTasklet,
Microtask,
AsyncTransformTask,
BunTimerTimeoutTask,
ReadFileTask,
CopyFilePromiseTask,
WriteFileTask,
AnyTask,
napi_async_work,
ThreadSafeFunction,
// PromiseTask,
// TimeoutTasklet,
});
const SourceMap = @import("../../sourcemap/sourcemap.zig");
const MappingList = SourceMap.Mapping.List;
@@ -480,38 +246,23 @@ pub const SavedSourceMap = struct {
};
const uws = @import("uws");
pub const AnyTask = struct {
ctx: ?*anyopaque,
callback: fn (*anyopaque) void,
pub fn run(this: *AnyTask) void {
@setRuntimeSafety(false);
this.callback(this.ctx.?);
}
pub fn New(comptime Type: type, comptime Callback: anytype) type {
return struct {
pub fn init(ctx: *Type) AnyTask {
return AnyTask{
.callback = wrap,
.ctx = ctx,
};
}
pub fn wrap(this: ?*anyopaque) void {
Callback(@ptrCast(*Type, @alignCast(@alignOf(Type), this.?)));
}
};
}
};
pub export fn Bun__getDefaultGlobal() *JSGlobalObject {
return JSC.VirtualMachine.vm.global;
}
pub export fn Bun__getVM() *JSC.VirtualMachine {
return JSC.VirtualMachine.vm;
}
pub export fn Bun__drainMicrotasks() void {
JSC.VirtualMachine.vm.eventLoop().tick();
}
comptime {
if (!JSC.is_bindgen) {
_ = Bun__getDefaultGlobal;
_ = Bun__getVM;
_ = Bun__drainMicrotasks;
}
}
@@ -575,6 +326,15 @@ pub const VirtualMachine = struct {
response_objects_pool: ?*Response.Pool = null,
rare_data: ?*JSC.RareData = null,
poller: JSC.Poller = JSC.Poller{},
pub fn io(this: *VirtualMachine) *IO {
if (this.io_ == null) {
this.io_ = IO.init(this) catch @panic("Failed to initialize IO");
}
return &this.io_.?;
}
pub inline fn nodeFS(this: *VirtualMachine) *Node.NodeFS {
return this.node_fs orelse brk: {
@@ -606,160 +366,6 @@ pub const VirtualMachine = struct {
}
}
pub const EventLoop = struct {
ready_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
pending_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
io_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
tasks: Queue = undefined,
concurrent_tasks: Queue = undefined,
concurrent_lock: Lock = Lock.init(),
global: *JSGlobalObject = undefined,
virtual_machine: *VirtualMachine = undefined,
pub const Queue = std.fifo.LinearFifo(Task, .Dynamic);
pub fn tickWithCount(this: *EventLoop) u32 {
var finished: u32 = 0;
var global = this.global;
var vm_ = this.virtual_machine;
while (this.tasks.readItem()) |task| {
switch (task.tag()) {
.Microtask => {
var micro: *Microtask = task.as(Microtask);
micro.run(global);
finished += 1;
},
.FetchTasklet => {
var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?;
fetch_task.onDone();
finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(AsyncTransformTask)) => {
var transform_task: *AsyncTransformTask = task.get(AsyncTransformTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(CopyFilePromiseTask)) => {
var transform_task: *CopyFilePromiseTask = task.get(CopyFilePromiseTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(JSC.napi.napi_async_work)) => {
var transform_task: *JSC.napi.napi_async_work = task.get(JSC.napi.napi_async_work).?;
transform_task.*.runFromJS();
finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(BunTimerTimeoutTask)) => {
var transform_task: *BunTimerTimeoutTask = task.get(BunTimerTimeoutTask).?;
transform_task.*.runFromJS();
finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(ReadFileTask)) => {
var transform_task: *ReadFileTask = task.get(ReadFileTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(WriteFileTask)) => {
var transform_task: *WriteFileTask = task.get(WriteFileTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(AnyTask)) => {
var any: *AnyTask = task.get(AnyTask).?;
any.run();
finished += 1;
vm_.active_tasks -|= 1;
},
else => unreachable,
}
}
if (finished > 0) {
_ = this.pending_tasks_count.fetchSub(finished, .Monotonic);
}
return finished;
}
pub fn tickConcurrent(this: *EventLoop) void {
if (this.ready_tasks_count.load(.Monotonic) > 0) {
this.concurrent_lock.lock();
defer this.concurrent_lock.unlock();
const add: u32 = @truncate(u32, this.concurrent_tasks.readableLength());
// TODO: optimzie
this.tasks.ensureUnusedCapacity(add) catch unreachable;
{
this.tasks.writeAssumeCapacity(this.concurrent_tasks.readableSlice(0));
this.concurrent_tasks.discard(this.concurrent_tasks.count);
}
_ = this.pending_tasks_count.fetchAdd(add, .Monotonic);
_ = this.ready_tasks_count.fetchSub(add, .Monotonic);
}
}
// TODO: fix this technical debt
pub fn tick(this: *EventLoop) void {
while (true) {
this.tickConcurrent();
// this.global.vm().doWork();
while (this.tickWithCount() > 0) {}
this.tickConcurrent();
if (this.tickWithCount() == 0) break;
}
}
// TODO: fix this technical debt
pub fn waitForPromise(this: *EventLoop, promise: *JSC.JSInternalPromise) void {
switch (promise.status(this.global.vm())) {
JSC.JSPromise.Status.Pending => {
while (promise.status(this.global.vm()) == .Pending) {
this.tick();
}
},
else => {},
}
}
pub fn waitForTasks(this: *EventLoop) void {
this.tick();
while (this.pending_tasks_count.load(.Monotonic) > 0) {
this.tick();
}
}
pub fn enqueueTask(this: *EventLoop, task: Task) void {
_ = this.pending_tasks_count.fetchAdd(1, .Monotonic);
this.tasks.writeItem(task) catch unreachable;
}
pub fn enqueueTaskConcurrent(this: *EventLoop, task: Task) void {
this.concurrent_lock.lock();
defer this.concurrent_lock.unlock();
this.concurrent_tasks.writeItem(task) catch unreachable;
if (this.virtual_machine.uws_event_loop) |loop| {
loop.nextTick(*EventLoop, this, EventLoop.tick);
}
_ = this.ready_tasks_count.fetchAdd(1, .Monotonic);
}
};
pub inline fn enqueueTask(this: *VirtualMachine, task: Task) void {
this.eventLoop().enqueueTask(task);
}
@@ -1128,6 +734,15 @@ pub const VirtualMachine = struct {
.hash = 0,
};
},
.@"bun:jsc" => {
return ResolvedSource{
.allocator = null,
.source_code = ZigString.init(@embedFile("bun-jsc.exports.js") ++ JSC.Node.fs.constants_string),
.specifier = ZigString.init("bun:jsc"),
.source_url = ZigString.init("bun:jsc"),
.hash = 0,
};
},
.@"node:fs" => {
return ResolvedSource{
.allocator = null,
@@ -1506,12 +1121,17 @@ pub const VirtualMachine = struct {
ret.path = result_path.text;
}
pub fn queueMicrotaskToEventLoop(
_: *JSGlobalObject,
globalObject: *JSGlobalObject,
microtask: *Microtask,
) void {
std.debug.assert(VirtualMachine.vm_loaded);
vm.enqueueTask(Task.init(microtask));
var vm_ = globalObject.bunVM();
if (vm_.global == globalObject) {
vm_.enqueueTask(Task.init(@ptrCast(*JSC.MicrotaskForDefaultGlobalObject, microtask)));
} else {
vm_.enqueueTask(Task.init(microtask));
}
}
pub fn resolveForAPI(res: *ErrorableZigString, global: *JSGlobalObject, specifier: ZigString, source: ZigString) void {
@@ -2914,6 +2534,7 @@ pub const HardcodedModule = enum {
@"node:path",
@"detect-libc",
@"bun:sqlite",
@"bun:jsc",
@"node:module",
pub const Map = bun.ComptimeStringMap(
@@ -2932,12 +2553,14 @@ pub const HardcodedModule = enum {
.{ "bun:sqlite", HardcodedModule.@"bun:sqlite" },
.{ "node:module", HardcodedModule.@"node:module" },
.{ "module", HardcodedModule.@"node:module" },
.{ "bun:jsc", HardcodedModule.@"bun:jsc" },
},
);
pub const LinkerMap = bun.ComptimeStringMap(
string,
.{
.{ "bun:ffi", "bun:ffi" },
.{ "bun:jsc", "bun:jsc" },
.{ "detect-libc", "detect-libc" },
.{ "detect-libc/lib/detect-libc.js", "detect-libc" },
.{ "ffi", "bun:ffi" },