Compare commits

...

5 Commits

Author SHA1 Message Date
Jarred Sumner
d0a5078b66 Merge branch 'main' into jarred/stream-invalid-arg 2024-09-05 21:10:49 -07:00
Jarred Sumner
53706de8d6 Mark as cancelled 2024-09-04 01:38:00 -07:00
Jarred Sumner
6bf542d350 Fix poorly written test 2024-09-04 00:04:51 -07:00
Jarred Sumner
841b617c3d Fix test 2024-09-03 23:56:07 -07:00
Jarred Sumner
5c6cfd2657 Fix crash when calling controller.enqueue with invalid arguments in certain cases 2024-09-02 16:43:47 -07:00
5 changed files with 122 additions and 78 deletions

View File

@@ -1762,12 +1762,10 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
const args = args_list.ptr[0..args_list.len];
if (args.len == 0) {
globalThis.vm().throwError(globalThis, JSC.toTypeError(
.ERR_MISSING_ARGS,
"write() expects a string, ArrayBufferView, or ArrayBuffer",
globalThis.ERR_STREAM_NULL_VALUES(
"Expected a string, ArrayBufferView, or ArrayBuffer. Received undefined",
.{},
globalThis,
));
).throw();
return .undefined;
}
@@ -1776,12 +1774,12 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
defer arg.ensureStillAlive();
if (arg.isEmptyOrUndefinedOrNull()) {
globalThis.vm().throwError(globalThis, JSC.toTypeError(
.ERR_STREAM_NULL_VALUES,
"write() expects a string, ArrayBufferView, or ArrayBuffer",
.{},
globalThis,
));
const name_to_display = arg.jsTypeString(globalThis).toSlice(globalThis, bun.default_allocator);
defer name_to_display.deinit();
globalThis.ERR_STREAM_NULL_VALUES(
"Expected a string, ArrayBufferView, or ArrayBuffer. Received \"{s}\"",
.{name_to_display.slice()},
).throw();
return .undefined;
}
@@ -1795,12 +1793,12 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
}
if (!arg.isString()) {
globalThis.vm().throwError(globalThis, JSC.toTypeError(
.ERR_INVALID_ARG_TYPE,
"write() expects a string, ArrayBufferView, or ArrayBuffer",
.{},
globalThis,
));
globalThis.ERR_INVALID_ARG_TYPE(
"Expected a string, ArrayBufferView, or ArrayBuffer. Received \"{}\"",
.{
arg.jsTypeString(globalThis).getZigString(globalThis),
},
).throw();
return .undefined;
}

View File

@@ -757,15 +757,16 @@ export async function readStreamIntoSink(stream, sink, isNative) {
var didThrow = false;
var started = false;
const highWaterMark = $getByIdDirectPrivate(stream, "highWaterMark") || 0;
function onSinkClose(stream, reason) {
if (!didThrow && !didClose && stream && stream.$state !== $streamClosed) {
const cancelResult = $readableStreamCancel(stream, reason);
$isPromise(cancelResult) && $markPromiseAsHandled(cancelResult);
}
}
try {
var reader = stream.getReader();
var many = reader.readMany();
function onSinkClose(stream, reason) {
if (!didThrow && !didClose && stream && stream.$state !== $streamClosed) {
$readableStreamCancel(stream, reason);
}
}
if (many && $isPromise(many)) {
// Some time may pass before this Promise is fulfilled. The sink may
@@ -792,7 +793,6 @@ export async function readStreamIntoSink(stream, sink, isNative) {
for (var i = 0, values = many.value, length = many.value.length; i < length; i++) {
sink.write(values[i]);
}
var streamState = $getByIdDirectPrivate(stream, "state");
if (streamState === $streamClosed) {
didClose = true;
@@ -809,16 +809,9 @@ export async function readStreamIntoSink(stream, sink, isNative) {
sink.write(value);
}
} catch (e) {
onSinkClose(stream, e);
didThrow = true;
try {
reader = undefined;
const prom = stream.cancel(e);
if ($isPromise(prom)) {
$markPromiseAsHandled(prom);
}
} catch (j) {}
if (sink && !didClose) {
didClose = true;
try {
@@ -1518,8 +1511,7 @@ export function readableStreamCloseIfPossible(stream) {
switch ($getByIdDirectPrivate(stream, "state")) {
case $streamReadable:
case $streamClosing: {
$readableStreamClose(stream);
break;
return $readableStreamClose(stream);
}
}
}

View File

@@ -15,9 +15,14 @@ const server = Bun.serve({
controller.enqueue("world!");
controller.close();
}
throw new Error("Oops");
// Use a base64-encoded error string to ensure the test printing
// source code stack traces is not confused with an error message.
throw new Error(atob("T29w"));
},
cancel(reason) {
console.log("Cancel call");
console.error(reason);
},
cancel(reason) {},
}),
{
status: 402,

View File

@@ -469,64 +469,48 @@ it("request.url should be based on the Host header", async () => {
describe("streaming", () => {
describe("error handler", () => {
it("throw on pull renders headers, does not call error handler", async () => {
let subprocess;
afterAll(() => {
subprocess?.kill();
});
const onMessage = mock(async url => {
const response = await fetch(url);
expect(response.status).toBe(402);
expect(response.headers.get("X-Hey")).toBe("123");
expect(response.text()).resolves.toBe("");
subprocess.kill();
});
subprocess = Bun.spawn({
const onMessage = Promise.withResolvers();
const subprocess = Bun.spawn({
cwd: import.meta.dirname,
cmd: [bunExe(), "readable-stream-throws.fixture.js"],
env: bunEnv,
stdout: "ignore",
stdout: "inherit",
stderr: "pipe",
ipc: onMessage,
ipc: url => onMessage.resolve(url),
});
const url = await onMessage.promise;
const response = await fetch(url);
expect(response.status).toBe(402);
expect(response.headers.get("X-Hey")).toBe("123");
expect(await response.text()).toBe("");
let [exitCode, stderr] = await Promise.all([subprocess.exited, new Response(subprocess.stderr).text()]);
expect(exitCode).toBeInteger();
expect(stderr).toContain("error: Oops");
expect(onMessage).toHaveBeenCalled();
expect(stderr).toContain("error: Oop");
});
it("throw on pull after writing should not call the error handler", async () => {
let subprocess;
const onMessage = Promise.withResolvers();
afterAll(() => {
subprocess?.kill();
});
const onMessage = mock(async href => {
const url = new URL("write", href);
const response = await fetch(url);
expect(response.status).toBe(402);
expect(response.headers.get("X-Hey")).toBe("123");
expect(response.text()).resolves.toBe("");
subprocess.kill();
});
subprocess = Bun.spawn({
const subprocess = Bun.spawn({
cwd: import.meta.dirname,
cmd: [bunExe(), "readable-stream-throws.fixture.js"],
env: bunEnv,
stdout: "ignore",
stdout: "inherit",
stderr: "pipe",
ipc: onMessage,
ipc: url => onMessage.resolve(url),
});
const url = await onMessage.promise;
const response = await fetch(url);
expect(response.status).toBe(402);
expect(response.headers.get("X-Hey")).toBe("123");
expect(await response.text()).toBe("");
let [exitCode, stderr] = await Promise.all([subprocess.exited, new Response(subprocess.stderr).text()]);
expect(exitCode).toBeInteger();
expect(stderr).toContain("error: Oops");
expect(onMessage).toHaveBeenCalled();
expect(stderr).toContain("error: Oop");
});
});
@@ -1554,7 +1538,7 @@ it("should response with HTTP 413 when request body is larger than maxRequestBod
it("should support promise returned from error", async () => {
const { promise, resolve } = Promise.withResolvers<string>();
const subprocess = Bun.spawn({
await using subprocess = Bun.spawn({
cwd: import.meta.dirname,
cmd: [bunExe(), "bun-serve.fixture.js"],
env: bunEnv,
@@ -1565,10 +1549,6 @@ it("should support promise returned from error", async () => {
},
});
afterAll(() => {
subprocess.kill();
});
const url = new URL(await promise);
{

View File

@@ -0,0 +1,69 @@
import { test, expect, mock } from "bun:test";
test("cancel() is called on a ReadableStream which passes invalid arguments to enqueue()", async () => {
var defer = Promise.withResolvers();
var onCancel = Promise.withResolvers();
using server = Bun.serve({
port: 0,
async fetch(req) {
return new Response(
new ReadableStream({
async pull(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
await Bun.sleep(10);
defer.resolve();
// Invalid argument
controller.enqueue([new Uint8Array(32)]);
},
cancel(reason) {
onCancel.resolve(reason);
},
}),
);
},
});
const resp = await fetch(server.url);
resp.body;
await defer.promise;
expect(await resp.bytes()).toEqual(new Uint8Array([1, 2, 3]));
server.stop(true);
expect(await onCancel.promise).toBeInstanceOf(TypeError);
});
// This is mostly to test we don't crash in this case.
test("cancel() is NOT called on a ReadableStream with invalid arguments and close called", async () => {
var defer = Promise.withResolvers();
var cancel = mock(() => {});
var endOfPullFunction = mock(() => {});
using server = Bun.serve({
port: 0,
async fetch(req) {
return new Response(
new ReadableStream({
async pull(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
await Bun.sleep(10);
defer.resolve();
// Invalid argument
controller.enqueue([new Uint8Array(32)]);
controller.close();
endOfPullFunction();
},
cancel,
}),
);
},
});
const resp = await fetch(server.url);
resp.body;
await defer.promise;
expect(await resp.bytes()).toEqual(new Uint8Array([1, 2, 3]));
server.stop(true);
await Bun.sleep(10);
expect(cancel).not.toHaveBeenCalled();
expect(endOfPullFunction).toHaveBeenCalled();
});