diff --git a/integration/bunjs-only-snippets/ffi.test.fixture.callback.c b/integration/bunjs-only-snippets/ffi.test.fixture.callback.c index 36949e158f..3a557e7d5d 100644 --- a/integration/bunjs-only-snippets/ffi.test.fixture.callback.c +++ b/integration/bunjs-only-snippets/ffi.test.fixture.callback.c @@ -258,6 +258,9 @@ void* JSFunctionCall(void* globalObject, void* callFrame); bool my_callback_function(void* arg0); bool my_callback_function(void* arg0) { +#ifdef INJECT_BEFORE +INJECT_BEFORE; +#endif EncodedJSValue arguments[1] = { PTR_TO_JSVALUE(arg0) }; diff --git a/src/javascript/jsc/bindings/ReadableStreamBuiltins.cpp b/src/javascript/jsc/bindings/ReadableStreamBuiltins.cpp index 3aa51f5b2f..3064544ec9 100644 --- a/src/javascript/jsc/bindings/ReadableStreamBuiltins.cpp +++ b/src/javascript/jsc/bindings/ReadableStreamBuiltins.cpp @@ -119,18 +119,29 @@ const char* const s_readableStreamInitializeReadableStreamCode = const JSC::ConstructAbility s_readableStreamCreateNativeReadableStreamCodeConstructAbility = JSC::ConstructAbility::CannotConstruct; const JSC::ConstructorKind s_readableStreamCreateNativeReadableStreamCodeConstructorKind = JSC::ConstructorKind::None; -const int s_readableStreamCreateNativeReadableStreamCodeLength = 2522; +const int s_readableStreamCreateNativeReadableStreamCodeLength = 2355; static const JSC::Intrinsic s_readableStreamCreateNativeReadableStreamCodeIntrinsic = JSC::NoIntrinsic; const char* const s_readableStreamCreateNativeReadableStreamCode = "(function (nativeTag, nativeID) {\n" \ - " var cached = @getByIdDirectPrivate(globalThis, \"nativeReadableStreamPrototype\");\n" \ - " if (!cached) {\n" \ - " cached = new @Map();\n" \ - " @putByIdDirectPrivate(globalThis, \"nativeReadableStreamPrototype\", cached);\n" \ - " }\n" \ - " var Prototype = cached.get(nativeID);\n" \ - " if (!Prototype) {\n" \ + " var cached = globalThis[Symbol.for(\"Bun.nativeReadableStreamPrototype\")] ||= new @Map;\n" \ + " var Prototype = cached.@get(nativeID);\n" \ + " if (Prototype === @undefined) {\n" \ " var [pull, start, cancel, setClose, deinit] = globalThis[Symbol.for(\"Bun.lazy\")](nativeID);\n" \ + " var closer = [false];\n" \ + "\n" \ + " var handleResult = function handleResult(result, controller) {\n" \ + " if (result && @isPromise(result)) {\n" \ + " result.then((val) => handleResult(val, controller), err => controller.error(err));\n" \ + " } else if (result !== false) {\n" \ + " controller.enqueue(result);\n" \ + " }\n" \ + "\n" \ + " if (closer[0] || result === false) {\n" \ + " new @Promise((resolve, reject) => resolve(controller.close())).then(() => {}, () => {});\n" \ + " closer[0] = false;\n" \ + " }\n" \ + " }\n" \ + "\n" \ " Prototype = class NativeReadableStreamSource {\n" \ " constructor(tag) {\n" \ " this.pull = this.pull_.bind(tag);\n" \ @@ -143,44 +154,30 @@ const char* const s_readableStreamCreateNativeReadableStreamCode = " cancel;\n" \ " \n" \ " pull_(controller) {\n" \ + " closer[0] = false;\n" \ " var result;\n" \ + "\n" \ " try {\n" \ - " result = pull(this);\n" \ - " } catch (e) {\n" \ - " controller.error(e);\n" \ - " return;\n" \ - " }\n" \ - " \n" \ - " if (result === false) {\n" \ - " //\n" \ - " new @Promise((resolve, reject) => resolve(controller.close())).then(() => {}, () => {});\n" \ - " return;\n" \ + " result = pull(this, closer);\n" \ + " } catch(err) {\n" \ + " return controller.error(err);\n" \ " }\n" \ "\n" \ - " if (@isPromise(result)) {\n" \ - " result.then(controller.enqueue, controller.error);\n" \ - " } else {\n" \ - " controller.enqueue(result);\n" \ - " }\n" \ + " handleResult(result, controller);\n" \ " }\n" \ "\n" \ " start_(controller) {\n" \ " setClose(this, controller.close);\n" \ - " const result = start(this, controller.enqueue, controller.error);\n" \ - " if (result === false) {\n" \ - " //\n" \ - " new @Promise((resolve, reject) => resolve(controller.close())).then(() => {}, () => {});\n" \ - " return;\n" \ + " closer[0] = false;\n" \ + " var result;\n" \ + "\n" \ + " try {\n" \ + " result = start(this, closer);\n" \ + " } catch(err) {\n" \ + " return controller.error(err);\n" \ " }\n" \ "\n" \ - "\n" \ - " if (@isPromise(result)) {\n" \ - " result.then(controller.enqueue, controller.error);\n" \ - " } else {\n" \ - " controller.enqueue(result);\n" \ - " }\n" \ - "\n" \ - " return result;\n" \ + " handleResult(result, controller);\n" \ " }\n" \ "\n" \ " cancel_(reason) {\n" \ @@ -189,7 +186,7 @@ const char* const s_readableStreamCreateNativeReadableStreamCode = "\n" \ " static registry = new FinalizationRegistry(deinit);\n" \ " }\n" \ - " cached.set(nativeID, Prototype);\n" \ + " cached.@set(nativeID, Prototype);\n" \ " }\n" \ " \n" \ " var instance = new Prototype(nativeTag);\n" \ diff --git a/src/javascript/jsc/bindings/ZigGlobalObject.cpp b/src/javascript/jsc/bindings/ZigGlobalObject.cpp index 61f94e3f84..6c119ac2b7 100644 --- a/src/javascript/jsc/bindings/ZigGlobalObject.cpp +++ b/src/javascript/jsc/bindings/ZigGlobalObject.cpp @@ -914,6 +914,9 @@ JSC: case 1: { return ByteBlob__JSReadableStreamSource__load(globalObject); } + case 2: { + return FileBlobLoader__JSReadableStreamSource__load(globalObject); + } default: { auto scope = DECLARE_THROW_SCOPE(globalObject->vm()); JSC::throwTypeError(globalObject, scope, "lazyLoad expects a string"_s); @@ -1388,9 +1391,10 @@ void GlobalObject::addBuiltinGlobals(JSC::VM& vm) putDirectCustomAccessor(vm, static_cast(vm.clientData)->builtinNames().WritableStreamPublicName(), CustomGetterSetter::create(vm, jsServiceWorkerGlobalScope_WritableStreamConstructor, nullptr), JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DontDelete | JSC::PropertyAttribute::ReadOnly); putDirectCustomAccessor(vm, static_cast(vm.clientData)->builtinNames().WritableStreamDefaultControllerPublicName(), CustomGetterSetter::create(vm, jsServiceWorkerGlobalScope_WritableStreamDefaultControllerConstructor, nullptr), JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DontDelete | JSC::PropertyAttribute::ReadOnly); putDirectCustomAccessor(vm, static_cast(vm.clientData)->builtinNames().WritableStreamDefaultWriterPublicName(), CustomGetterSetter::create(vm, jsServiceWorkerGlobalScope_WritableStreamDefaultWriterConstructor, nullptr), JSC::PropertyAttribute::CustomAccessor | JSC::PropertyAttribute::DontDelete | JSC::PropertyAttribute::ReadOnly); - putDirectCustomAccessor(vm, JSC::Identifier::fromString(vm, "ByteLengthQueuingStrategy"_s), CustomGetterSetter::create(vm, jsServiceWorkerGlobalScope_ByteLengthQueuingStrategyConstructor, nullptr), JSC::PropertyAttribute::DontDelete | JSC::PropertyAttribute::ReadOnly); putDirectCustomAccessor(vm, JSC::Identifier::fromString(vm, "CountQueuingStrategy"_s), CustomGetterSetter::create(vm, jsServiceWorkerGlobalScope_CountQueuingStrategyConstructor, nullptr), JSC::PropertyAttribute::DontDelete | JSC::PropertyAttribute::ReadOnly); + + // putDirect(vm, static_cast(vm.clientData)->builtinNames().nativeReadableStreamPrototypePrivateName(), jsUndefined(), JSC::PropertyAttribute::DontDelete | JSC::PropertyAttribute::DontEnum | 0); } // This is not a publicly exposed API currently. diff --git a/src/javascript/jsc/bindings/bindings.cpp b/src/javascript/jsc/bindings/bindings.cpp index 1dda5c6be3..2d21ec9ccd 100644 --- a/src/javascript/jsc/bindings/bindings.cpp +++ b/src/javascript/jsc/bindings/bindings.cpp @@ -2513,13 +2513,14 @@ bool JSC__VM__isEntered(JSC__VM* arg0) { return (*arg0).isEntered(); } void JSC__VM__setExecutionForbidden(JSC__VM* arg0, bool arg1) { (*arg0).setExecutionForbidden(); } -bool JSC__VM__throwError(JSC__VM* arg0, JSC__JSGlobalObject* arg1, JSC__ThrowScope* arg2, - const unsigned char* arg3, size_t arg4) +void JSC__VM__throwError(JSC__VM* vm_, JSC__JSGlobalObject* arg1, JSC__JSValue value) { - auto scope = arg2; - auto global = arg1; - const String& message = WTF::String(arg3, arg4); - return JSC::throwException(global, (*scope), createError(global, message)); + JSC::VM& vm = *reinterpret_cast(vm_); + + auto scope = DECLARE_THROW_SCOPE(vm); + JSC::JSObject* error = JSC::JSValue::decode(value).getObject(); + JSC::Exception* exception = JSC::Exception::create(vm, error); + scope.throwException(arg1, exception); } #pragma mark - JSC::ThrowScope diff --git a/src/javascript/jsc/bindings/bindings.zig b/src/javascript/jsc/bindings/bindings.zig index 2e752304d3..94139c1d79 100644 --- a/src/javascript/jsc/bindings/bindings.zig +++ b/src/javascript/jsc/bindings/bindings.zig @@ -3009,15 +3009,11 @@ pub const VM = extern struct { }); } - pub fn throwError(vm: *VM, global_object: *JSGlobalObject, scope: *ThrowScope, message: [*]const u8, len: usize) bool { + pub fn throwError(vm: *VM, global_object: *JSGlobalObject, value: JSValue) void { return cppFn("throwError", .{ vm, - global_object, - scope, - - message, - len, + value, }); } @@ -3036,7 +3032,7 @@ pub const VM = extern struct { vm, }); } - pub const Extern = [_][]const u8{ "doWork", "deferGC", "holdAPILock", "runGC", "generateHeapSnapshot", "isJITEnabled", "deleteAllCode", "create", "deinit", "setExecutionForbidden", "executionForbidden", "isEntered", "throwError", "drainMicrotasks", "whenIdle", "shrinkFootprint", "setExecutionTimeLimit", "clearExecutionTimeLimit" }; + pub const Extern = [_][]const u8{ "throwError", "doWork", "deferGC", "holdAPILock", "runGC", "generateHeapSnapshot", "isJITEnabled", "deleteAllCode", "create", "deinit", "setExecutionForbidden", "executionForbidden", "isEntered", "throwError", "drainMicrotasks", "whenIdle", "shrinkFootprint", "setExecutionTimeLimit", "clearExecutionTimeLimit" }; }; pub const ThrowScope = extern struct { diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js index 446d28bb3a..6f4dea3db1 100644 --- a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js +++ b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js @@ -90,14 +90,25 @@ function initializeReadableStream(underlyingSource, strategy) @globalPrivate function createNativeReadableStream(nativeTag, nativeID) { - var cached = @getByIdDirectPrivate(globalThis, "nativeReadableStreamPrototype"); - if (!cached) { - cached = new @Map(); - @putByIdDirectPrivate(globalThis, "nativeReadableStreamPrototype", cached); - } - var Prototype = cached.get(nativeID); - if (!Prototype) { + var cached = globalThis[Symbol.for("Bun.nativeReadableStreamPrototype")] ||= new @Map; + var Prototype = cached.@get(nativeID); + if (Prototype === @undefined) { var [pull, start, cancel, setClose, deinit] = globalThis[Symbol.for("Bun.lazy")](nativeID); + var closer = [false]; + + var handleResult = function handleResult(result, controller) { + if (result && @isPromise(result)) { + result.then((val) => handleResult(val, controller), err => controller.error(err)); + } else if (result !== false) { + controller.enqueue(result); + } + + if (closer[0] || result === false) { + new @Promise((resolve, reject) => resolve(controller.close())).then(() => {}, () => {}); + closer[0] = false; + } + } + Prototype = class NativeReadableStreamSource { constructor(tag) { this.pull = this.pull_.bind(tag); @@ -110,44 +121,30 @@ function createNativeReadableStream(nativeTag, nativeID) { cancel; pull_(controller) { + closer[0] = false; var result; + try { - result = pull(this); - } catch (e) { - controller.error(e); - return; - } - - if (result === false) { - // close on next tick - new @Promise((resolve, reject) => resolve(controller.close())).then(() => {}, () => {}); - return; + result = pull(this, closer); + } catch(err) { + return controller.error(err); } - if (@isPromise(result)) { - result.then(controller.enqueue, controller.error); - } else { - controller.enqueue(result); - } + handleResult(result, controller); } start_(controller) { setClose(this, controller.close); - const result = start(this, controller.enqueue, controller.error); - if (result === false) { - // close on next tick - new @Promise((resolve, reject) => resolve(controller.close())).then(() => {}, () => {}); - return; + closer[0] = false; + var result; + + try { + result = start(this, closer); + } catch(err) { + return controller.error(err); } - - if (@isPromise(result)) { - result.then(controller.enqueue, controller.error); - } else { - controller.enqueue(result); - } - - return result; + handleResult(result, controller); } cancel_(reason) { @@ -156,7 +153,7 @@ function createNativeReadableStream(nativeTag, nativeID) { static registry = new FinalizationRegistry(deinit); } - cached.set(nativeID, Prototype); + cached.@set(nativeID, Prototype); } var instance = new Prototype(nativeTag); diff --git a/src/javascript/jsc/bindings/exports.zig b/src/javascript/jsc/bindings/exports.zig index f346b9f197..8a35883bac 100644 --- a/src/javascript/jsc/bindings/exports.zig +++ b/src/javascript/jsc/bindings/exports.zig @@ -180,7 +180,8 @@ pub const NodeWritableStream = JSC.Node.Writable.State; pub const NodePath = JSC.Node.Path; // Web Streams -pub const BlobReadableStream = JSC.WebCore.ByteBlobLoader.Source.JSReadableStreamSource; +pub const JSReadableStreamBlob = JSC.WebCore.ByteBlobLoader.Source.JSReadableStreamSource; +pub const JSReadableStreamFile = JSC.WebCore.FileBlobLoader.Source.JSReadableStreamSource; pub fn Errorable(comptime Type: type) type { return extern struct { @@ -2505,7 +2506,8 @@ comptime { std.testing.refAllDecls(Bun.Timer); std.testing.refAllDecls(NodeWritableStream); std.testing.refAllDecls(NodePath); - std.testing.refAllDecls(BlobReadableStream); + std.testing.refAllDecls(JSReadableStreamBlob); + std.testing.refAllDecls(JSReadableStreamFile); _ = ZigString__free; _ = ZigString__free_global; } diff --git a/src/javascript/jsc/bindings/headers-cpp.h b/src/javascript/jsc/bindings/headers-cpp.h index 37fd0a7a99..f3fbbc79b8 100644 --- a/src/javascript/jsc/bindings/headers-cpp.h +++ b/src/javascript/jsc/bindings/headers-cpp.h @@ -1,4 +1,4 @@ -//-- AUTOGENERATED FILE -- 1653376853 +//-- AUTOGENERATED FILE -- 1653447610 // clang-format off #pragma once diff --git a/src/javascript/jsc/bindings/headers.h b/src/javascript/jsc/bindings/headers.h index 8adcfbb950..0313a43837 100644 --- a/src/javascript/jsc/bindings/headers.h +++ b/src/javascript/jsc/bindings/headers.h @@ -1,5 +1,5 @@ // clang-format: off -//-- AUTOGENERATED FILE -- 1653376853 +//-- AUTOGENERATED FILE -- 1653447610 #pragma once #include @@ -567,7 +567,8 @@ CPP_DECL JSC__JSValue JSC__VM__runGC(JSC__VM* arg0, bool arg1); CPP_DECL void JSC__VM__setExecutionForbidden(JSC__VM* arg0, bool arg1); CPP_DECL void JSC__VM__setExecutionTimeLimit(JSC__VM* arg0, double arg1); CPP_DECL void JSC__VM__shrinkFootprint(JSC__VM* arg0); -CPP_DECL bool JSC__VM__throwError(JSC__VM* arg0, JSC__JSGlobalObject* arg1, JSC__ThrowScope* arg2, const unsigned char* arg3, size_t arg4); +CPP_DECL void JSC__VM__throwError(JSC__VM* arg0, JSC__JSGlobalObject* arg1, JSC__JSValue JSValue2); +CPP_DECL void JSC__VM__throwError(JSC__VM* arg0, JSC__JSGlobalObject* arg1, JSC__JSValue JSValue2); CPP_DECL void JSC__VM__whenIdle(JSC__VM* arg0, void (* ArgFn1)()); #pragma mark - JSC::ThrowScope @@ -721,6 +722,12 @@ ZIG_DECL JSC__JSValue ByteBlob__JSReadableStreamSource__load(JSC__JSGlobalObject #ifdef __cplusplus +ZIG_DECL JSC__JSValue FileBlobLoader__JSReadableStreamSource__load(JSC__JSGlobalObject* arg0); + +#endif + +#ifdef __cplusplus + ZIG_DECL void Bun__Process__exit(JSC__JSGlobalObject* arg0, int32_t arg1); ZIG_DECL JSC__JSValue Bun__Process__getArgv(JSC__JSGlobalObject* arg0); ZIG_DECL JSC__JSValue Bun__Process__getCwd(JSC__JSGlobalObject* arg0); diff --git a/src/javascript/jsc/bindings/headers.zig b/src/javascript/jsc/bindings/headers.zig index 3dd158d3ee..341d641095 100644 --- a/src/javascript/jsc/bindings/headers.zig +++ b/src/javascript/jsc/bindings/headers.zig @@ -404,7 +404,7 @@ pub extern fn JSC__VM__runGC(arg0: [*c]JSC__VM, arg1: bool) JSC__JSValue; pub extern fn JSC__VM__setExecutionForbidden(arg0: [*c]JSC__VM, arg1: bool) void; pub extern fn JSC__VM__setExecutionTimeLimit(arg0: [*c]JSC__VM, arg1: f64) void; pub extern fn JSC__VM__shrinkFootprint(arg0: [*c]JSC__VM) void; -pub extern fn JSC__VM__throwError(arg0: [*c]JSC__VM, arg1: [*c]JSC__JSGlobalObject, arg2: [*c]JSC__ThrowScope, arg3: [*c]const u8, arg4: usize) bool; +pub extern fn JSC__VM__throwError(arg0: [*c]JSC__VM, arg1: [*c]JSC__JSGlobalObject, JSValue2: JSC__JSValue) void; pub extern fn JSC__VM__whenIdle(arg0: [*c]JSC__VM, ArgFn1: ?fn (...) callconv(.C) void) void; pub extern fn JSC__ThrowScope__clearException(arg0: [*c]JSC__ThrowScope) void; pub extern fn JSC__ThrowScope__declare(arg0: [*c]JSC__VM, arg1: [*c]u8, arg2: [*c]u8, arg3: usize) bJSC__ThrowScope; diff --git a/src/javascript/jsc/event_loop.zig b/src/javascript/jsc/event_loop.zig index 43f4f9ac9a..b124032d69 100644 --- a/src/javascript/jsc/event_loop.zig +++ b/src/javascript/jsc/event_loop.zig @@ -437,6 +437,7 @@ pub const Poller = struct { /// kqueue() or epoll() /// 0 == unset watch_fd: i32 = 0, + active: u32 = 0, pub const PlatformSpecificFlags = struct {}; @@ -464,6 +465,7 @@ pub const Poller = struct { return JSC.Maybe(void).errnoSys(this.watch_fd, .kqueue).?; } } + std.debug.assert(this.watch_fd != 0); var events_list = std.mem.zeroes([2]kevent64); events_list[0] = switch (flag) { @@ -513,13 +515,18 @@ pub const Poller = struct { switch (rc) { std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(std.c.getErrno(rc)), .kevent).?, - 0 => return JSC.Maybe(void).success, + 0 => { + this.active += 1; + return JSC.Maybe(void).success; + }, 1 => { dispatchKQueueEvent(&events_list[0]); return JSC.Maybe(void).success; }, 2 => { dispatchKQueueEvent(&events_list[0]); + this.active -= 1; + dispatchKQueueEvent(&events_list[1]); return JSC.Maybe(void).success; }, @@ -533,7 +540,7 @@ pub const Poller = struct { const kqueue_events_ = std.mem.zeroes([4]kevent64); pub fn tick(this: *Poller) void { if (comptime Environment.isMac) { - if (this.watch_fd == 0) return; + if (this.active == 0) return; var events_list = kqueue_events_; // ub extern "c" fn kevent64( @@ -566,18 +573,22 @@ pub const Poller = struct { }, 0 => {}, 1 => { + this.active -= 1; dispatchKQueueEvent(&events_list[0]); }, 2 => { + this.active -= 2; dispatchKQueueEvent(&events_list[0]); dispatchKQueueEvent(&events_list[1]); }, 3 => { + this.active -= 3; dispatchKQueueEvent(&events_list[0]); dispatchKQueueEvent(&events_list[1]); dispatchKQueueEvent(&events_list[2]); }, 4 => { + this.active -= 4; dispatchKQueueEvent(&events_list[0]); dispatchKQueueEvent(&events_list[1]); dispatchKQueueEvent(&events_list[2]); diff --git a/src/javascript/jsc/node/syscall.zig b/src/javascript/jsc/node/syscall.zig index ba99cd0638..35baac3c7d 100644 --- a/src/javascript/jsc/node/syscall.zig +++ b/src/javascript/jsc/node/syscall.zig @@ -542,6 +542,12 @@ pub const Error = struct { syscall: Syscall.Tag = @intToEnum(Syscall.Tag, 0), path: []const u8 = "", + pub fn fromCode(errno: os.E, syscall: Syscall.Tag) Error { + return .{ .errno = @truncate(Int, @enumToInt(errno)), .syscall = syscall }; + } + + pub const oom = fromCode(os.E.NOMEM, .read); + pub const retry = Error{ .errno = if (Environment.isLinux) @intCast(Int, @enumToInt(os.E.AGAIN)) diff --git a/src/javascript/jsc/webcore/response.zig b/src/javascript/jsc/webcore/response.zig index 0f4e06f5ce..5f30d5c681 100644 --- a/src/javascript/jsc/webcore/response.zig +++ b/src/javascript/jsc/webcore/response.zig @@ -1030,7 +1030,7 @@ pub const ReadableStream = struct { pub fn fromNative(globalThis: *JSGlobalObject, id: Tag, ptr: *anyopaque) JSC.JSValue { return ZigGlobalObject__createNativeReadableStream(globalThis, JSValue.fromPtr(ptr), JSValue.jsNumber(@enumToInt(id))); } - pub fn fromBlob(globalThis: *JSGlobalObject, blob: *const Blob) JSC.JSValue { + pub fn fromBlob(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType) JSC.JSValue { if (comptime JSC.is_bindgen) unreachable; var store = blob.store orelse { @@ -1042,11 +1042,16 @@ pub const ReadableStream = struct { reader.* = .{ .context = undefined, }; - reader.context.setup(blob); + reader.context.setup(blob, recommended_chunk_size); return reader.toJS(globalThis); }, .file => { - return JSValue.jsUndefined(); + var reader = bun.default_allocator.create(FileBlobLoader.Source) catch unreachable; + reader.* = .{ + .context = undefined, + }; + reader.context.setup(store, recommended_chunk_size); + return reader.toJS(globalThis); }, } } @@ -1669,9 +1674,7 @@ pub const Blob = struct { // return BlobStore__onRead(ctx, slice.ptr, slice.len) and has_more; // }, - .file => |*file| { - - } + } comptime { @@ -2957,12 +2960,22 @@ pub const Blob = struct { ctx: js.JSContextRef, _: js.JSObjectRef, _: js.JSObjectRef, - _: []const js.JSValueRef, - _: js.ExceptionRef, + arguments: []const js.JSValueRef, + exception: js.ExceptionRef, ) JSC.C.JSValueRef { + var recommended_chunk_size: SizeType = 0; + if (arguments.len > 0) { + if (!JSValue.c(arguments[0]).isNumber() and !JSValue.c(arguments[0]).isUndefinedOrNull()) { + JSC.throwInvalidArguments("chunkSize must be a number", .{}, ctx, exception); + return null; + } + + recommended_chunk_size = @intCast(SizeType, @maximum(0, @truncate(i52, JSValue.c(arguments[0]).toInt64()))); + } return ReadableStream.fromBlob( ctx.ptr(), this, + recommended_chunk_size, ).asObjectRef(); } @@ -4953,29 +4966,68 @@ pub const FetchEvent = struct { pub const StreamResult = union(enum) { owned: bun.ByteList, + owned_and_done: bun.ByteList, + temporary_and_done: bun.ByteList, temporary: bun.ByteList, - pending: JSC.Async(StreamResult), + pending: *Pending, err: JSC.Node.Syscall.Error, done: void, + pub const Pending = struct { + frame: anyframe, + result: StreamResult, + used: bool = false, + }; + + fn toPromisedWrap(globalThis: *JSGlobalObject, promise: *JSPromise, pending: *Pending) void { + suspend {} + + pending.used = true; + const result: StreamResult = pending.result; + + switch (result) { + .err => |err| { + promise.reject(globalThis, err.toJSC(globalThis)); + }, + .done => { + promise.resolve(globalThis, JSValue.jsBoolean(false)); + }, + else => { + promise.resolve(globalThis, result.toJS(globalThis)); + }, + } + } + + pub fn toPromised(globalThis: *JSGlobalObject, promise: *JSPromise, pending: *Pending) void { + var frame = bun.default_allocator.create(@Frame(toPromisedWrap)) catch unreachable; + frame.* = async toPromisedWrap(globalThis, promise, pending); + pending.frame = frame; + } + pub fn toJS(this: *const StreamResult, globalThis: *JSGlobalObject) JSValue { switch (this.*) { .owned => |list| { return JSC.ArrayBuffer.fromBytes(list.slice(), .Uint8Array).toJS(globalThis.ref(), null); }, + .owned_and_done => |list| { + return JSC.ArrayBuffer.fromBytes(list.slice(), .Uint8Array).toJS(globalThis.ref(), null); + }, .temporary => |temp| { var array = JSC.JSValue.createUninitializedUint8Array(globalThis, temp.len); var slice = array.asArrayBuffer(globalThis).?.slice(); @memcpy(slice.ptr, temp.ptr, temp.len); return array; }, - .pending => |pend| { - if (pend.promise.*) |promise| { - return promise.asValue(globalThis); - } - - pend.promise.* = JSC.JSPromise.create(globalThis); - return pend.promise.*.?.asValue(globalThis); + .temporary_and_done => |temp| { + var array = JSC.JSValue.createUninitializedUint8Array(globalThis, temp.len); + var slice = array.asArrayBuffer(globalThis).?.slice(); + @memcpy(slice.ptr, temp.ptr, temp.len); + return array; + }, + .pending => |pending| { + var promise = JSC.JSPromise.create(globalThis); + toPromised(globalThis, promise, pending); + return promise.asValue(globalThis); }, .err => |err| { @@ -5203,21 +5255,21 @@ pub fn ReadableStreamSource( const ReadableStreamSourceType = @This(); pub fn pull(this: *This) StreamResult { - return onPull(this.context, undefined, false); + return onPull(&this.context); } pub fn start( this: *This, ) StreamResult { - return onStart(&this.context, undefined, false); + return onStart(&this.context); } - pub fn pullFromJS(this: *This, globalThis: *JSGlobalObject) StreamResult { - return onPull(&this.context, globalThis, true); + pub fn pullFromJS(this: *This) StreamResult { + return onPull(&this.context); } - pub fn startFromJS(this: *This, globalThis: *JSGlobalObject) StreamResult { - return onStart(&this.context, globalThis, true); + pub fn startFromJS(this: *This) StreamResult { + return onStart(&this.context); } pub fn cancel(this: *This) void { @@ -5267,11 +5319,33 @@ pub fn ReadableStreamSource( pub fn pull(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue { var this = callFrame.argument(0).asPtr(ReadableStreamSourceType); - return this.pullFromJS(globalThis).toJS(globalThis); + return processResult( + globalThis, + callFrame, + this.pullFromJS(), + ); } pub fn start(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue { var this = callFrame.argument(0).asPtr(ReadableStreamSourceType); - return this.startFromJS(globalThis).toJS(globalThis); + return processResult( + globalThis, + callFrame, + this.startFromJS(), + ); + } + + pub fn processResult(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame, result: StreamResult) JSC.JSValue { + switch (result) { + .err => |err| { + globalThis.vm().throwError(globalThis, err.toJSC(globalThis)); + return JSValue.jsUndefined(); + }, + .temporary_and_done, .owned_and_done => { + JSC.C.JSObjectSetPropertyAtIndex(globalThis.ref(), callFrame.argument(1).asObjectRef(), 0, JSValue.jsBoolean(true).asObjectRef(), null); + return result.toJS(globalThis); + }, + else => return result.toJS(globalThis), + } } pub fn cancel(_: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue { var this = callFrame.argument(0).asPtr(ReadableStreamSourceType); @@ -5350,6 +5424,7 @@ pub const ByteBlobLoader = struct { pub fn setup( this: *ByteBlobLoader, blob: *const Blob, + user_chunk_size: Blob.SizeType, ) void { blob.store.?.ref(); var blobe = blob.*; @@ -5357,18 +5432,17 @@ pub const ByteBlobLoader = struct { this.* = ByteBlobLoader{ .offset = blobe.offset, .store = blobe.store.?, - .chunk_size = @minimum(1024 * 1024 * 2, blobe.size), + .chunk_size = if (user_chunk_size > 0) @minimum(user_chunk_size, blobe.size) else @minimum(1024 * 1024 * 2, blobe.size), .remain = blobe.size, - .done = false, }; } - pub fn onStart(this: *ByteBlobLoader, global: *JSGlobalObject, comptime is_js: bool) StreamResult { - return this.onPull(global, is_js); + pub fn onStart(this: *ByteBlobLoader) StreamResult { + return this.onPull(); } - pub fn onPull(this: *ByteBlobLoader, _: *JSGlobalObject, comptime _: bool) StreamResult { + pub fn onPull(this: *ByteBlobLoader) StreamResult { if (this.done) { return .{ .done = {} }; } @@ -5404,118 +5478,284 @@ pub const ByteBlobLoader = struct { pub const Source = ReadableStreamSource(@This(), "ByteBlob", onStart, onPull, onCancel, deinit); }; +pub const FileBlobLoader = struct { + buf: []u8 = &[_]u8{}, + fd: JSC.Node.FileDescriptor = 0, + auto_close: bool = false, + loop: *JSC.EventLoop = undefined, + mode: JSC.Node.Mode = 0, + store: *Blob.Store, + total_read: Blob.SizeType = 0, + callback: anyframe = undefined, + pending: StreamResult.Pending = StreamResult.Pending{ + .frame = undefined, + .used = false, + .result = .{ .done = {} }, + }, + cancelled: bool = false, + user_chunk_size: Blob.SizeType = 0, -// pub const FileStreamReader = struct { -// buf: []u8, -// fd: JSC.Node.FileDescriptor, -// loop: *JSC.EventLoop = undefined, -// mode: JSC.Node.Mode, -// pending: ?*JSC.JSPromise = null, + const FileReader = @This(); + pub const tag = ReadableStream.Tag.File; -// onRead: fn (ctx: *Context, buf: []u8, len: Blob.SizeType) bool -// onError: fn (ctx: *Context, err: JSC.SystemError) void -// onWouldBlock: fn (ctx: *Context,) void -// isCancelled: fn (ctx: *Context) bool + pub fn setup(this: *FileBlobLoader, store: *Blob.Store, chunk_size: Blob.SizeType) void { + store.ref(); + this.* = .{ + .loop = JSC.VirtualMachine.vm.eventLoop(), + .auto_close = store.data.file.pathlike == .path, + .store = store, + .user_chunk_size = chunk_size, + }; + } -// const FileReader = @This(); + pub fn watch(this: *FileReader) void { + _ = JSC.VirtualMachine.vm.poller.watch(this.fd, .read, this, callback); + } -// pub fn init(ctx: *Context, buf: []u8, fd: JSC.Node.FileDescriptor, mode: JSC.Node.Mode) FileReader { -// return .{ -// .buf = buf, -// .fd = fd, -// .ctx = ctx, -// .loop = JSC.VirtualMachine.vm.eventLoop(), -// .mode = mode, -// }; -// } + const default_fifo_chunk_size = 1024; + const default_file_chunk_size = 1024 * 1024 * 2; + pub fn onStart(this: *FileBlobLoader) StreamResult { + var file = &this.store.data.file; + var file_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined; + var auto_close = this.auto_close; + defer this.auto_close = auto_close; + var fd = if (!auto_close) + file.pathlike.fd + else switch (JSC.Node.Syscall.open(file.pathlike.path.sliceZ(&file_buf), std.os.O.RDONLY | std.os.O.NONBLOCK | std.os.O.CLOEXEC, 0)) { + .result => |_fd| _fd, + .err => |err| { + return .{ .err = err.withPath(file.pathlike.path.slice()) }; + }, + }; -// pub fn watch(this: *FileReader) void { -// _ = JSC.VirtualMachine.vm.poller.watch(this.fd, .read, this, callback); -// } + if (!auto_close) { + // ensure we have non-blocking IO set + const flags = std.os.fcntl(fd, std.os.F.GETFL, 0) catch return .{ .err = JSC.Node.Syscall.Error.fromCode(std.os.E.BADF, .fcntl) }; -// pub fn read( -// ctx: *Context, -// path: []const u8, -// buf: []u8, -// fd: JSC.Node.FileDescriptor, -// remaining: usize, -// global: *JSGlobalObject, -// would_block: ?*bool, -// ) bool { -// const rc = // read() for files -// JSC.Node.Syscall.read(fd, buf); + // if we do not, clone the descriptor and set non-blocking + // it is important for us to clone it so we don't cause Weird Things to happen + if ((flags & std.os.O.NONBLOCK) == 0) { + auto_close = true; + fd = @intCast(@TypeOf(fd), std.os.fcntl(fd, std.os.F.DUPFD, 0) catch return .{ .err = JSC.Node.Syscall.Error.fromCode(std.os.E.BADF, .fcntl) }); + _ = std.os.fcntl(fd, std.os.F.SETFL, flags | std.os.O.NONBLOCK) catch return .{ .err = JSC.Node.Syscall.Error.fromCode(std.os.E.BADF, .fcntl) }; + } + } -// switch (rc) { -// .err => |err| { -// const retry = comptime if (Environment.isLinux) -// std.os.E.WOULDBLOCK -// else -// std.os.E.AGAIN; + const stat: std.os.Stat = switch (JSC.Node.Syscall.fstat(fd)) { + .result => |result| result, + .err => |err| { + if (auto_close) { + _ = JSC.Node.Syscall.close(fd); + } + return .{ .err = err.withPath(file.pathlike.path.slice()) }; + }, + }; -// switch (err.getErrno()) { -// retry => { -// if (would_block) |blocker| { -// blocker.* = true; -// } else { -// onWouldBlock(ctx); -// } + if (std.os.S.ISDIR(stat.mode)) { + const err = JSC.Node.Syscall.Error.fromCode(.ISDIR, .fstat); + if (auto_close) { + _ = JSC.Node.Syscall.close(fd); + } + return .{ .err = err }; + } -// return true; -// }, -// else => {}, -// } -// const sys = if (path.len > 0) err.withPath(path).toSystemError() else err.toSystemError(); + if (std.os.S.ISSOCK(stat.mode)) { + const err = JSC.Node.Syscall.Error.fromCode(.INVAL, .fstat); -// onError(ctx, &sys, global); -// return false; -// }, -// .result => |result| { -// // this handles: -// // - empty file -// // - stream closed for some reason -// if ((result == 0 and remaining == 0) or -// isCancelled(ctx)) -// { -// return false; -// } + if (auto_close) { + _ = JSC.Node.Syscall.close(fd); + } + return .{ .err = err }; + } -// return onRead( -// ctx, -// buf, -// result, -// ) and -// // if it's not a regular file, we don't know how much to read so we should continue -// (remaining == std.math.maxInt(usize)) or -// // if it is a regular file, we stop reading when we've read all the data -// !(@intCast(Blob.SizeType, result) >= remaining); -// }, -// } -// } + // if (comptime Environment.isMac) { + // if (std.os.S.ISSOCK(stat.mode)) { + // // darwin doesn't support os.MSG.NOSIGNAL, + // // but instead a socket option to avoid SIGPIPE. + // const _bytes = &std.mem.toBytes(@as(c_int, 1)); + // _ = std.os.darwin.setsockopt(fd, std.os.SOL.SOCKET, std.os.SO.NOSIGPIPE, _bytes, @intCast(std.os.socklen_t, _bytes.len)); + // } + // } -// pub fn callback(task: ?*anyopaque, sizeOrOffset: i64, _: u16) void { -// var this: *FileReader = bun.cast(*FileReader, task.?); -// var buf = this.buf; -// var blocked = false; -// var remaining = std.math.maxInt(usize); -// if (comptime Environment.isMac) { -// if (std.os.S.ISREG(this.mode)) { -// remaining = @intCast(usize, @maximum(sizeOrOffset, 0)); -// // Returns when the file pointer is not at the end of -// // file. data contains the offset from current position -// // to end of file, and may be negative. -// buf = buf[0..@minimum(remaining, this.buf.len)]; -// } else if (std.os.S.ISCHR(this.mode) or std.os.S.ISFIFO(this.mode)) { -// buf = buf[0..@minimum(@intCast(usize, @maximum(sizeOrOffset, 0)), this.buf.len)]; -// } -// } + file.seekable = std.os.S.ISREG(stat.mode); + file.mode = @intCast(JSC.Node.Mode, stat.mode); + this.mode = file.mode; -// if (read(this.ctx, "", buf, this.fd, remaining, this.loop.global, &blocked) and blocked) { -// this.watch(); -// return; -// } -// } -// }; + if (file.seekable orelse false) + file.max_size = @intCast(Blob.SizeType, stat.size); + if ((file.seekable orelse false) and file.max_size == 0) { + if (auto_close) { + _ = JSC.Node.Syscall.close(fd); + } + return .{ .done = {} }; + } + + this.fd = fd; + this.auto_close = auto_close; + + if (this.allocateBuffer(std.math.maxInt(usize))) |err| { + return .{ .err = err }; + } + + return this.read(this.buf); + } + + fn allocateBuffer(this: *FileBlobLoader, available_to_read: usize) ?JSC.Node.Syscall.Error { + var file = &this.store.data.file; + const chunk_size: usize = if (this.user_chunk_size > 0) + @as(usize, this.user_chunk_size) + else if (file.seekable orelse false) + @as(usize, default_file_chunk_size) + else + @as(usize, default_fifo_chunk_size); + + const this_chunk_size = if (file.max_size > 0) + if (available_to_read != std.math.maxInt(usize)) @minimum(chunk_size, available_to_read) else @minimum(@maximum(this.total_read, file.max_size) - this.total_read, chunk_size) + else + @minimum(available_to_read, chunk_size); + + var buf = bun.default_allocator.alloc( + u8, + this_chunk_size, + ) catch { + this.maybeAutoClose(); + return JSC.Node.Syscall.Error.oom.withPath(if (file.pathlike.path.slice().len > 0) file.pathlike.path.slice() else ""); + }; + + this.buf = buf; + + return null; + } + + pub fn onPull(this: *FileBlobLoader) StreamResult { + if (this.buf.len == 0) { + if (this.allocateBuffer(std.math.maxInt(usize))) |err| { + return .{ .err = err }; + } + } + + return this.read(this.buf); + } + + fn maybeAutoClose(this: *FileBlobLoader) void { + if (this.auto_close) { + _ = JSC.Node.Syscall.close(this.fd); + this.auto_close = false; + } + } + + pub fn read( + this: *FileBlobLoader, + read_buf: []u8, + ) StreamResult { + const rc = + JSC.Node.Syscall.read(this.fd, read_buf); + + switch (rc) { + .err => |err| { + const retry = comptime if (Environment.isLinux) + std.os.E.WOULDBLOCK + else + std.os.E.AGAIN; + + switch (err.getErrno()) { + retry => { + this.watch(); + return .{ + .pending = &this.pending, + }; + }, + else => {}, + } + const sys = if (this.store.data.file.pathlike == .path and this.store.data.file.pathlike.path.slice().len > 0) + err.withPath(this.store.data.file.pathlike.path.slice()) + else + err; + + return .{ .err = sys }; + }, + .result => |result| { + this.total_read += @intCast(Blob.SizeType, result); + const remaining: Blob.SizeType = if (this.store.data.file.seekable orelse false) + this.store.data.file.max_size -| this.total_read + else + @as(Blob.SizeType, std.math.maxInt(Blob.SizeType)); + + defer this.buf = &.{}; + + // this handles: + // - empty file + // - stream closed for some reason + if ((result == 0 and remaining == 0)) { + this.maybeAutoClose(); + bun.default_allocator.free(this.buf); + return .{ .done = {} }; + } + + const has_more = remaining > 0; + + if (!has_more) { + defer this.maybeAutoClose(); + return .{ + .owned_and_done = bun.ByteList.init(read_buf[0..result]), + }; + } + + return .{ + .owned = bun.ByteList.init(read_buf[0..result]), + }; + }, + } + } + + pub fn callback(task: ?*anyopaque, sizeOrOffset: i64, _: u16) void { + var this: *FileReader = bun.cast(*FileReader, task.?); + var available_to_read: usize = std.math.maxInt(usize); + if (comptime Environment.isMac) { + if (std.os.S.ISREG(this.mode)) { + // Returns when the file pointer is not at the end of + // file. data contains the offset from current position + // to end of file, and may be negative. + available_to_read = @intCast(usize, @maximum(sizeOrOffset, 0)); + } else if (std.os.S.ISCHR(this.mode) or std.os.S.ISFIFO(this.mode)) { + available_to_read = @intCast(usize, @maximum(sizeOrOffset, 0)); + } + } + if (this.cancelled) + return; + + if (this.buf.len == 0) { + if (this.allocateBuffer(available_to_read)) |err| { + this.pending.result = .{ .err = err }; + resume this.pending.frame; + return; + } + } else { + this.buf.len = @minimum(this.buf.len, available_to_read); + } + + this.pending.result = this.read(this.buf); + resume this.pending.frame; + } + + pub fn deinit(this: *FileBlobLoader) void { + if (this.buf.len > 0) { + bun.default_allocator.free(this.buf); + this.buf = &.{}; + } + + this.maybeAutoClose(); + + this.store.deref(); + } + + pub fn onCancel(this: *FileBlobLoader) void { + this.cancelled = true; + } + + pub const Source = ReadableStreamSource(@This(), "FileBlobLoader", onStart, onPull, onCancel, deinit); +}; // pub const BlobFileLoader = struct { // reader: Reader,