Fix macOS test failure

This commit is contained in:
Jarred Sumner
2024-02-18 14:40:44 -08:00
parent 4090e199a1
commit b0f951f306
6 changed files with 82 additions and 3 deletions

View File

@@ -24,6 +24,10 @@ function source(name) {
getter: "getOnCloseFromJS",
setter: "setOnCloseFromJS",
},
onDrain: {
getter: "getOnDrainFromJS",
setter: "setOnDrainFromJS",
},
cancel: {
fn: "cancelFromJS",
length: 1,
@@ -37,7 +41,7 @@ function source(name) {
},
},
klass: {},
values: ["pendingPromise", "onCloseCallback"],
values: ["pendingPromise", "onCloseCallback", "onDrainCallback"],
});
}

View File

@@ -2702,6 +2702,8 @@ pub fn ReadableStreamSource(
pub const updateRefFromJS = JSReadableStreamSource.updateRef;
pub const setOnCloseFromJS = JSReadableStreamSource.setOnCloseFromJS;
pub const getOnCloseFromJS = JSReadableStreamSource.getOnCloseFromJS;
pub const setOnDrainFromJS = JSReadableStreamSource.setOnDrainFromJS;
pub const getOnDrainFromJS = JSReadableStreamSource.getOnDrainFromJS;
pub const finalize = JSReadableStreamSource.finalize;
pub const construct = JSReadableStreamSource.construct;
pub const getIsClosedFromJS = JSReadableStreamSource.isClosed;
@@ -2802,6 +2804,24 @@ pub fn ReadableStreamSource(
return true;
}
pub fn setOnDrainFromJS(this: *ReadableStreamSourceType, globalObject: *JSC.JSGlobalObject, value: JSC.JSValue) callconv(.C) bool {
JSC.markBinding(@src());
this.globalThis = globalObject;
if (value.isUndefined()) {
ReadableStreamSourceType.onDrainCallbackSetCached(this.this_jsvalue, globalObject, .undefined);
return true;
}
if (!value.isCallable(globalObject.vm())) {
globalObject.throwInvalidArgumentType("ReadableStreamSource", "onDrain", "function");
return false;
}
const cb = value.withAsyncContextIfNeeded(globalObject);
ReadableStreamSourceType.onDrainCallbackSetCached(this.this_jsvalue, globalObject, cb);
return true;
}
pub fn getOnCloseFromJS(this: *ReadableStreamSourceType, globalObject: *JSC.JSGlobalObject) callconv(.C) JSC.JSValue {
_ = globalObject; // autofix
@@ -2810,6 +2830,18 @@ pub fn ReadableStreamSource(
return this.close_jsvalue.get() orelse .undefined;
}
pub fn getOnDrainFromJS(this: *ReadableStreamSourceType, globalObject: *JSC.JSGlobalObject) callconv(.C) JSC.JSValue {
_ = globalObject; // autofix
JSC.markBinding(@src());
if (ReadableStreamSourceType.onDrainCallbackGetCached(this.this_jsvalue)) |val| {
return val;
}
return .undefined;
}
pub fn updateRef(this: *ReadableStreamSourceType, globalObject: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
JSC.markBinding(@src());
this.this_jsvalue = callFrame.this();
@@ -3688,7 +3720,35 @@ pub const FileReader = struct {
if (!this.isPulling()) {
this.consumeReaderBuffer();
if (this.pending.state == .pending) {
if (this.buffered.items.len > 0)
this.pending.result = .{ .owned_and_done = bun.ByteList.fromList(this.buffered) };
this.buffered = .{};
this.pending.run();
} else if (this.buffered.items.len > 0) {
const this_value = this.parent().this_jsvalue;
const globalThis = this.parent().globalThis;
if (this_value != .zero) {
if (Source.onDrainCallbackGetCached(this_value)) |cb| {
const buffered = this.buffered;
this.buffered = .{};
this.parent().incrementCount();
defer _ = this.parent().decrementCount();
this.eventLoop().js.runCallback(
cb,
globalThis,
.undefined,
&.{
JSC.ArrayBuffer.fromBytes(
buffered.items,
.Uint8Array,
).toJS(
globalThis,
null,
),
},
);
}
}
}
}

View File

@@ -1679,6 +1679,14 @@ export function lazyLoadStream(stream, autoAllocateChunkSize) {
}
handle.onClose = this.#onClose.bind(this);
handle.onDrain = this.#onDrain.bind(this);
}
#onDrain(chunk) {
var controller = this.#controller;
if (controller) {
controller.enqueue(chunk);
}
}
#controller;

View File

@@ -5251,12 +5251,17 @@ function createNativeStreamReadable(nativeType, Readable) {
this.#remainingChunk = undefined;
this.#pendingRead = false;
ptr.onClose = this.#onClose.bind(this);
ptr.onDrain = this.#onDrain.bind(this);
}
#onClose() {
this.push(null);
}
#onDrain(chunk) {
this.push(chunk);
}
// maxToRead is by default the highWaterMark passed from the Readable.read call to this fn
// However, in the case of an fs.ReadStream, we can pass the number of bytes we want to read
// which may be significantly less than the actual highWaterMark

View File

@@ -485,12 +485,14 @@ test("timed output should work", async () => {
const producer = Bun.spawn([bunExe(), "run", producer_file], {
stderr: "pipe",
stdout: "inherit",
stdin: "inherit",
});
let text = "";
for await (const chunk of producer.stderr) {
text += [...chunk].map(x => String.fromCharCode(x)).join("");
await new Promise(r => setTimeout(r, 1000));
await Bun.sleep(1000);
}
expect(text).toBe("0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n11\n12\n13\n14\n15\n16\n17\n18\n19\n20\n21\n22\n23\n24\n25\n");
});

View File

@@ -1,4 +1,4 @@
for (let i = 0; i <= 25; i++) {
await Bun.write(Bun.stderr, i + "\n");
await new Promise(r => setTimeout(r, 100));
await Bun.sleep(100);
}