From b0f951f306bef90a72443b41cf9ff8d380d03433 Mon Sep 17 00:00:00 2001 From: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> Date: Sun, 18 Feb 2024 14:40:44 -0800 Subject: [PATCH] Fix macOS test failure --- src/bun.js/api/streams.classes.ts | 6 ++- src/bun.js/webcore/streams.zig | 60 ++++++++++++++++++++++ src/js/builtins/ReadableStreamInternals.ts | 8 +++ src/js/node/stream.js | 5 ++ test/js/bun/io/bun-write.test.js | 4 +- test/js/bun/io/timed-stderr-output.js | 2 +- 6 files changed, 82 insertions(+), 3 deletions(-) diff --git a/src/bun.js/api/streams.classes.ts b/src/bun.js/api/streams.classes.ts index 45280b9608..707a03e2ea 100644 --- a/src/bun.js/api/streams.classes.ts +++ b/src/bun.js/api/streams.classes.ts @@ -24,6 +24,10 @@ function source(name) { getter: "getOnCloseFromJS", setter: "setOnCloseFromJS", }, + onDrain: { + getter: "getOnDrainFromJS", + setter: "setOnDrainFromJS", + }, cancel: { fn: "cancelFromJS", length: 1, @@ -37,7 +41,7 @@ function source(name) { }, }, klass: {}, - values: ["pendingPromise", "onCloseCallback"], + values: ["pendingPromise", "onCloseCallback", "onDrainCallback"], }); } diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index eb99644c25..b8f8a919ce 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -2702,6 +2702,8 @@ pub fn ReadableStreamSource( pub const updateRefFromJS = JSReadableStreamSource.updateRef; pub const setOnCloseFromJS = JSReadableStreamSource.setOnCloseFromJS; pub const getOnCloseFromJS = JSReadableStreamSource.getOnCloseFromJS; + pub const setOnDrainFromJS = JSReadableStreamSource.setOnDrainFromJS; + pub const getOnDrainFromJS = JSReadableStreamSource.getOnDrainFromJS; pub const finalize = JSReadableStreamSource.finalize; pub const construct = JSReadableStreamSource.construct; pub const getIsClosedFromJS = JSReadableStreamSource.isClosed; @@ -2802,6 +2804,24 @@ pub fn ReadableStreamSource( return true; } + pub fn setOnDrainFromJS(this: *ReadableStreamSourceType, globalObject: *JSC.JSGlobalObject, value: JSC.JSValue) callconv(.C) bool { + JSC.markBinding(@src()); + this.globalThis = globalObject; + + if (value.isUndefined()) { + ReadableStreamSourceType.onDrainCallbackSetCached(this.this_jsvalue, globalObject, .undefined); + return true; + } + + if (!value.isCallable(globalObject.vm())) { + globalObject.throwInvalidArgumentType("ReadableStreamSource", "onDrain", "function"); + return false; + } + const cb = value.withAsyncContextIfNeeded(globalObject); + ReadableStreamSourceType.onDrainCallbackSetCached(this.this_jsvalue, globalObject, cb); + return true; + } + pub fn getOnCloseFromJS(this: *ReadableStreamSourceType, globalObject: *JSC.JSGlobalObject) callconv(.C) JSC.JSValue { _ = globalObject; // autofix @@ -2810,6 +2830,18 @@ pub fn ReadableStreamSource( return this.close_jsvalue.get() orelse .undefined; } + pub fn getOnDrainFromJS(this: *ReadableStreamSourceType, globalObject: *JSC.JSGlobalObject) callconv(.C) JSC.JSValue { + _ = globalObject; // autofix + + JSC.markBinding(@src()); + + if (ReadableStreamSourceType.onDrainCallbackGetCached(this.this_jsvalue)) |val| { + return val; + } + + return .undefined; + } + pub fn updateRef(this: *ReadableStreamSourceType, globalObject: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue { JSC.markBinding(@src()); this.this_jsvalue = callFrame.this(); @@ -3688,7 +3720,35 @@ pub const FileReader = struct { if (!this.isPulling()) { this.consumeReaderBuffer(); if (this.pending.state == .pending) { + if (this.buffered.items.len > 0) + this.pending.result = .{ .owned_and_done = bun.ByteList.fromList(this.buffered) }; + this.buffered = .{}; this.pending.run(); + } else if (this.buffered.items.len > 0) { + const this_value = this.parent().this_jsvalue; + const globalThis = this.parent().globalThis; + if (this_value != .zero) { + if (Source.onDrainCallbackGetCached(this_value)) |cb| { + const buffered = this.buffered; + this.buffered = .{}; + this.parent().incrementCount(); + defer _ = this.parent().decrementCount(); + this.eventLoop().js.runCallback( + cb, + globalThis, + .undefined, + &.{ + JSC.ArrayBuffer.fromBytes( + buffered.items, + .Uint8Array, + ).toJS( + globalThis, + null, + ), + }, + ); + } + } } } diff --git a/src/js/builtins/ReadableStreamInternals.ts b/src/js/builtins/ReadableStreamInternals.ts index 0c13a39e87..b1c22ce02d 100644 --- a/src/js/builtins/ReadableStreamInternals.ts +++ b/src/js/builtins/ReadableStreamInternals.ts @@ -1679,6 +1679,14 @@ export function lazyLoadStream(stream, autoAllocateChunkSize) { } handle.onClose = this.#onClose.bind(this); + handle.onDrain = this.#onDrain.bind(this); + } + + #onDrain(chunk) { + var controller = this.#controller; + if (controller) { + controller.enqueue(chunk); + } } #controller; diff --git a/src/js/node/stream.js b/src/js/node/stream.js index cd41944a47..1f1d034248 100644 --- a/src/js/node/stream.js +++ b/src/js/node/stream.js @@ -5251,12 +5251,17 @@ function createNativeStreamReadable(nativeType, Readable) { this.#remainingChunk = undefined; this.#pendingRead = false; ptr.onClose = this.#onClose.bind(this); + ptr.onDrain = this.#onDrain.bind(this); } #onClose() { this.push(null); } + #onDrain(chunk) { + this.push(chunk); + } + // maxToRead is by default the highWaterMark passed from the Readable.read call to this fn // However, in the case of an fs.ReadStream, we can pass the number of bytes we want to read // which may be significantly less than the actual highWaterMark diff --git a/test/js/bun/io/bun-write.test.js b/test/js/bun/io/bun-write.test.js index 954a5cd167..7ab28c887c 100644 --- a/test/js/bun/io/bun-write.test.js +++ b/test/js/bun/io/bun-write.test.js @@ -485,12 +485,14 @@ test("timed output should work", async () => { const producer = Bun.spawn([bunExe(), "run", producer_file], { stderr: "pipe", + stdout: "inherit", + stdin: "inherit", }); let text = ""; for await (const chunk of producer.stderr) { text += [...chunk].map(x => String.fromCharCode(x)).join(""); - await new Promise(r => setTimeout(r, 1000)); + await Bun.sleep(1000); } expect(text).toBe("0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n11\n12\n13\n14\n15\n16\n17\n18\n19\n20\n21\n22\n23\n24\n25\n"); }); diff --git a/test/js/bun/io/timed-stderr-output.js b/test/js/bun/io/timed-stderr-output.js index fd13eb2585..3a3e9892f8 100644 --- a/test/js/bun/io/timed-stderr-output.js +++ b/test/js/bun/io/timed-stderr-output.js @@ -1,4 +1,4 @@ for (let i = 0; i <= 25; i++) { await Bun.write(Bun.stderr, i + "\n"); - await new Promise(r => setTimeout(r, 100)); + await Bun.sleep(100); }