Files
bun.sh/test/js/bun/http/async-iterator-stream.test.ts
Jarred Sumner 0b549321e9 Start using test.concurrent in our tests (#22823)
### What does this PR do?

### How did you verify your code works?

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Claude Bot <claude-bot@bun.sh>
2025-09-22 05:30:34 -07:00

222 lines
5.9 KiB
TypeScript

import { spawn } from "bun";
import { describe, expect, mock, test } from "bun:test";
import { bunEnv, bunExe } from "harness";
describe.concurrent("Streaming body via", () => {
test("async generator function", async () => {
using server = Bun.serve({
port: 0,
async fetch(req) {
return new Response(async function* yo() {
yield "Hello, ";
await Bun.sleep(30);
yield Buffer.from("world!");
return "!";
});
},
});
const res = await fetch(`${server.url}/`);
const chunks = [];
for await (const chunk of res.body) {
chunks.push(chunk);
}
expect(Buffer.concat(chunks).toString()).toBe("Hello, world!!");
expect(chunks).toHaveLength(2);
});
test("async generator function throws an error but continues to send the headers", async () => {
const onMessage = mock(async url => {
const response = await fetch(url);
expect(response.headers.get("X-Hey")).toBe("123");
subprocess?.kill();
});
await using subprocess = spawn({
cwd: import.meta.dirname,
cmd: [bunExe(), "async-iterator-throws.fixture.js"],
env: bunEnv,
ipc: onMessage,
stdout: "inherit",
stderr: "pipe",
});
let [exitCode, stderr] = await Promise.all([subprocess.exited, subprocess.stderr.text()]);
expect(exitCode).toBeInteger();
expect(stderr).toContain("error: Oops");
expect(onMessage).toHaveBeenCalledTimes(1);
});
test("async generator aborted doesn't crash", async () => {
var aborter = new AbortController();
using server = Bun.serve({
port: 0,
async fetch(req) {
return new Response(
async function* yo() {
queueMicrotask(() => aborter.abort());
yield "123";
await Bun.sleep(0);
},
{
headers: {
"X-Hey": "123",
},
},
);
},
});
try {
const res = await fetch(`${server.url}/`, { signal: aborter.signal });
} catch (e) {
expect(e).toBeInstanceOf(DOMException);
expect(e.name).toBe("AbortError");
}
});
test("[Symbol.asyncIterator]", async () => {
using server = Bun.serve({
port: 0,
async fetch(req) {
return new Response({
async *[Symbol.asyncIterator]() {
var controller = yield "my string goes here\n";
var controller2 = yield Buffer.from("my buffer goes here\n");
await Bun.sleep(30);
yield Buffer.from("end!\n");
if (controller !== controller2 || typeof controller.sinkId !== "number") {
throw new Error("Controller mismatch");
}
return "!";
},
});
},
});
const res = await fetch(`${server.url}/`);
const chunks = [];
for await (const chunk of res.body) {
chunks.push(chunk);
}
expect(Buffer.concat(chunks).toString()).toBe("my string goes here\nmy buffer goes here\nend!\n!");
expect(chunks).toHaveLength(2);
});
test("[Symbol.asyncIterator] with a custom iterator", async () => {
using server = Bun.serve({
port: 0,
async fetch(req) {
var hasRun = false;
return new Response({
[Symbol.asyncIterator]() {
return {
async next() {
await Bun.sleep(30);
if (hasRun) {
return { value: Buffer.from("world!"), done: true };
}
hasRun = true;
return { value: "Hello, ", done: false };
},
};
},
});
},
});
const res = await fetch(server.url);
const chunks = [];
for await (const chunk of res.body) {
chunks.push(chunk);
}
expect(Buffer.concat(chunks).toString()).toBe("Hello, world!");
// TODO:
// expect(chunks).toHaveLength(2);
});
test("yield", async () => {
const response = new Response({
[Symbol.asyncIterator]: async function* () {
const controller = yield "hello";
await controller.end();
},
});
expect(await response.text()).toBe("hello");
});
const callbacks = [
{
fn: async function* () {
yield '"Hello, ';
yield Buffer.from('world! #1"');
return;
},
expected: '"Hello, world! #1"',
},
{
fn: async function* () {
yield '"Hello, ';
await Bun.sleep(30);
yield Buffer.from('world! #2"');
return;
},
expected: '"Hello, world! #2"',
},
{
fn: async function* () {
yield '"Hello, ';
await 42;
yield Buffer.from('world! #3"');
return;
},
expected: '"Hello, world! #3"',
},
{
fn: async function* () {
yield '"Hello, ';
await 42;
return Buffer.from('world! #4"');
},
expected: '"Hello, world! #4"',
},
];
for (let { fn, expected } of callbacks) {
describe(expected, () => {
for (let bodyInit of [fn, { [Symbol.asyncIterator]: fn }] as const) {
for (let [label, constructFn] of [
["Response", () => new Response(bodyInit)],
["Request", () => new Request({ "url": "https://example.com", body: bodyInit })],
]) {
for (let method of ["arrayBuffer", "bytes", "text"]) {
test(`${label}(${method})`, async () => {
const result = await constructFn()[method]();
expect(Buffer.from(result)).toEqual(Buffer.from(expected));
});
}
test(`${label}(json)`, async () => {
const result = await constructFn().json();
expect(result).toEqual(JSON.parse(expected));
});
test(`${label}(blob)`, async () => {
const result = await constructFn().blob();
expect(await result.arrayBuffer()).toEqual(await new Blob([expected]).arrayBuffer());
});
}
}
});
}
});