mirror of
https://github.com/oven-sh/bun
synced 2026-02-10 02:48:50 +00:00
570 lines
15 KiB
JavaScript
570 lines
15 KiB
JavaScript
import { describe, expect, it, jest } from "bun:test";
|
|
import { bunEnv, bunExe, isGlibcVersionAtLeast, isMacOS, tmpdirSync } from "harness";
|
|
import { createReadStream, mkdirSync, writeFileSync } from "node:fs";
|
|
import { tmpdir } from "node:os";
|
|
import { Duplex, PassThrough, Readable, Stream, Transform, Writable } from "node:stream";
|
|
import { join } from "path";
|
|
|
|
describe("Readable", () => {
|
|
it("should be able to be created without _construct method defined", done => {
|
|
const readable = new Readable({
|
|
read() {
|
|
this.push("Hello World!\n");
|
|
this.push(null);
|
|
},
|
|
});
|
|
expect(readable instanceof Readable).toBe(true);
|
|
let data = "";
|
|
readable.on("data", chunk => {
|
|
data += chunk.toString();
|
|
});
|
|
readable.on("end", () => {
|
|
expect(data).toBe("Hello World!\n");
|
|
done();
|
|
});
|
|
});
|
|
|
|
it("should be able to be piped via .pipe", done => {
|
|
const readable = new Readable({
|
|
read() {
|
|
this.push("Hello World!");
|
|
this.push(null);
|
|
},
|
|
});
|
|
|
|
const writable = new Writable({
|
|
write(chunk, encoding, callback) {
|
|
expect(chunk.toString()).toBe("Hello World!");
|
|
callback();
|
|
done();
|
|
},
|
|
});
|
|
|
|
readable.pipe(writable);
|
|
});
|
|
|
|
it("should be able to be piped via .pipe, issue #3607", done => {
|
|
const path = `${tmpdir()}/${Date.now()}.testReadStreamEmptyFile.txt`;
|
|
writeFileSync(path, "");
|
|
const stream = createReadStream(path);
|
|
stream.on("error", err => {
|
|
done(err);
|
|
});
|
|
|
|
let called = false;
|
|
const writable = new Writable({
|
|
write(chunk, encoding, callback) {
|
|
called = true;
|
|
callback();
|
|
},
|
|
});
|
|
writable.on("finish", () => {
|
|
try {
|
|
expect(called).toBeFalse();
|
|
} catch (err) {
|
|
return done(err);
|
|
}
|
|
done();
|
|
});
|
|
|
|
stream.pipe(writable);
|
|
});
|
|
|
|
it("should be able to be piped via .pipe, issue #3668", done => {
|
|
const path = `${tmpdir()}/${Date.now()}.testReadStream.txt`;
|
|
writeFileSync(path, "12345");
|
|
const stream = createReadStream(path, { start: 0, end: 4 });
|
|
|
|
const writable = new Writable({
|
|
write(chunk, encoding, callback) {
|
|
try {
|
|
expect(chunk.toString()).toBe("12345");
|
|
} catch (err) {
|
|
done(err);
|
|
return;
|
|
}
|
|
callback();
|
|
done();
|
|
},
|
|
});
|
|
|
|
stream.on("error", err => {
|
|
done(err);
|
|
});
|
|
|
|
stream.pipe(writable);
|
|
});
|
|
|
|
it("should be able to be piped via .pipe, both start and end are 0", done => {
|
|
const path = `${tmpdir()}/${Date.now()}.testReadStream2.txt`;
|
|
writeFileSync(path, "12345");
|
|
const stream = createReadStream(path, { start: 0, end: 0 });
|
|
|
|
const writable = new Writable({
|
|
write(chunk, encoding, callback) {
|
|
try {
|
|
// Both start and end are inclusive and start counting at 0.
|
|
expect(chunk.toString()).toBe("1");
|
|
} catch (err) {
|
|
done(err);
|
|
return;
|
|
}
|
|
callback();
|
|
done();
|
|
},
|
|
});
|
|
|
|
stream.on("error", err => {
|
|
done(err);
|
|
});
|
|
|
|
stream.pipe(writable);
|
|
});
|
|
|
|
it("should be able to be piped via .pipe with a large file", done => {
|
|
const data = Buffer.allocUnsafe(768 * 1024)
|
|
.fill("B")
|
|
.toString();
|
|
const length = data.length;
|
|
const path = `${tmpdir()}/${Date.now()}.testReadStreamLargeFile.txt`;
|
|
writeFileSync(path, data);
|
|
const stream = createReadStream(path, { start: 0, end: length - 1 });
|
|
|
|
let res = "";
|
|
let count = 0;
|
|
const writable = new Writable({
|
|
write(chunk, encoding, callback) {
|
|
count += 1;
|
|
res += chunk;
|
|
callback();
|
|
},
|
|
});
|
|
writable.on("finish", () => {
|
|
try {
|
|
expect(res).toEqual(data);
|
|
expect(count).toBeGreaterThan(1);
|
|
} catch (err) {
|
|
return done(err);
|
|
}
|
|
done();
|
|
});
|
|
stream.on("error", err => {
|
|
done(err);
|
|
});
|
|
stream.pipe(writable);
|
|
});
|
|
|
|
it.todo("should have the correct fields in _events", () => {
|
|
const s = Readable({});
|
|
expect(s._events).toHaveProperty("close");
|
|
expect(s._events).toHaveProperty("error");
|
|
expect(s._events).toHaveProperty("prefinish");
|
|
expect(s._events).toHaveProperty("finish");
|
|
expect(s._events).toHaveProperty("drain");
|
|
});
|
|
});
|
|
|
|
describe("createReadStream", () => {
|
|
it("should allow the options argument to be omitted", done => {
|
|
const testData = "Hello world";
|
|
const path = join(tmpdir(), `${Date.now()}-testNoOptions.txt`);
|
|
writeFileSync(path, testData);
|
|
const stream = createReadStream(path);
|
|
|
|
let data = "";
|
|
stream.on("data", chunk => {
|
|
data += chunk.toString();
|
|
});
|
|
stream.on("end", () => {
|
|
expect(data).toBe(testData);
|
|
done();
|
|
});
|
|
});
|
|
|
|
it("should interpret the option argument as encoding if it's a string", done => {
|
|
const testData = "Hello world";
|
|
const path = join(tmpdir(), `${Date.now()}-testEncodingArgument.txt`);
|
|
writeFileSync(path, testData);
|
|
const stream = createReadStream(path);
|
|
|
|
let data = "";
|
|
stream.on("data", chunk => {
|
|
data += chunk.toString("base64");
|
|
});
|
|
stream.on("end", () => {
|
|
expect(data).toBe(btoa(testData));
|
|
done();
|
|
});
|
|
});
|
|
|
|
it("should emit readable on end", () => {
|
|
expect([join(import.meta.dir, "emit-readable-on-end.js")]).toRun();
|
|
});
|
|
});
|
|
|
|
describe("Writable", () => {
|
|
it.todo("should have the correct fields in _events", () => {
|
|
const s = Writable({});
|
|
expect(s._events).toHaveProperty("close");
|
|
expect(s._events).toHaveProperty("error");
|
|
expect(s._events).toHaveProperty("prefinish");
|
|
expect(s._events).toHaveProperty("finish");
|
|
expect(s._events).toHaveProperty("drain");
|
|
});
|
|
});
|
|
|
|
describe("Duplex", () => {
|
|
it("should allow subclasses to be derived via .call() on class", () => {
|
|
function Subclass(opts) {
|
|
if (!(this instanceof Subclass)) return new Subclass(opts);
|
|
Duplex.call(this, opts);
|
|
}
|
|
|
|
Object.setPrototypeOf(Subclass.prototype, Duplex.prototype);
|
|
Object.setPrototypeOf(Subclass, Duplex);
|
|
|
|
const subclass = new Subclass();
|
|
expect(subclass instanceof Duplex).toBe(true);
|
|
});
|
|
|
|
it.todo("should have the correct fields in _events", () => {
|
|
const s = Duplex({});
|
|
expect(s._events).toHaveProperty("close");
|
|
expect(s._events).toHaveProperty("error");
|
|
expect(s._events).toHaveProperty("prefinish");
|
|
expect(s._events).toHaveProperty("finish");
|
|
expect(s._events).toHaveProperty("drain");
|
|
expect(s._events).toHaveProperty("data");
|
|
expect(s._events).toHaveProperty("end");
|
|
expect(s._events).toHaveProperty("readable");
|
|
});
|
|
});
|
|
|
|
describe("Transform", () => {
|
|
it("should allow subclasses to be derived via .call() on class", () => {
|
|
function Subclass(opts) {
|
|
if (!(this instanceof Subclass)) return new Subclass(opts);
|
|
Transform.call(this, opts);
|
|
}
|
|
|
|
Object.setPrototypeOf(Subclass.prototype, Transform.prototype);
|
|
Object.setPrototypeOf(Subclass, Transform);
|
|
|
|
const subclass = new Subclass();
|
|
expect(subclass instanceof Transform).toBe(true);
|
|
});
|
|
|
|
it.todo("should have the correct fields in _events", () => {
|
|
const s = Transform({});
|
|
expect(s._events).toHaveProperty("close");
|
|
expect(s._events).toHaveProperty("error");
|
|
expect(s._events).toHaveProperty("prefinish");
|
|
expect(s._events).toHaveProperty("finish");
|
|
expect(s._events).toHaveProperty("drain");
|
|
expect(s._events).toHaveProperty("data");
|
|
expect(s._events).toHaveProperty("end");
|
|
expect(s._events).toHaveProperty("readable");
|
|
});
|
|
});
|
|
|
|
describe("PassThrough", () => {
|
|
it("should allow subclasses to be derived via .call() on class", () => {
|
|
function Subclass(opts) {
|
|
if (!(this instanceof Subclass)) return new Subclass(opts);
|
|
PassThrough.call(this, opts);
|
|
}
|
|
|
|
Object.setPrototypeOf(Subclass.prototype, PassThrough.prototype);
|
|
Object.setPrototypeOf(Subclass, PassThrough);
|
|
|
|
const subclass = new Subclass();
|
|
expect(subclass instanceof PassThrough).toBe(true);
|
|
});
|
|
|
|
it.todo("should have the correct fields in _events", () => {
|
|
const s = PassThrough({});
|
|
expect(s._events).toHaveProperty("close");
|
|
expect(s._events).toHaveProperty("error");
|
|
expect(s._events).toHaveProperty("prefinish");
|
|
expect(s._events).toHaveProperty("finish");
|
|
expect(s._events).toHaveProperty("drain");
|
|
expect(s._events).toHaveProperty("data");
|
|
expect(s._events).toHaveProperty("end");
|
|
expect(s._events).toHaveProperty("readable");
|
|
});
|
|
});
|
|
|
|
const processStdInTest = `
|
|
const { Transform } = require("node:stream");
|
|
|
|
let totalChunkSize = 0;
|
|
const transform = new Transform({
|
|
transform(chunk, _encoding, callback) {
|
|
totalChunkSize += chunk.length;
|
|
callback(null, "");
|
|
},
|
|
});
|
|
|
|
process.stdin.pipe(transform).pipe(process.stdout);
|
|
process.stdin.on("end", () => console.log(totalChunkSize));
|
|
`;
|
|
describe("process.stdin", () => {
|
|
it("should pipe correctly", async () => {
|
|
const dir = join(tmpdir(), "process-stdin-test");
|
|
mkdirSync(dir, { recursive: true });
|
|
writeFileSync(join(dir, "process-stdin-test.js"), processStdInTest, {});
|
|
|
|
// A sufficiently large input to make at least four chunks
|
|
const ARRAY_SIZE = 8_388_628;
|
|
const typedArray = new Uint8Array(ARRAY_SIZE).fill(97);
|
|
|
|
const { stdout, exited, stdin } = Bun.spawn({
|
|
cmd: [bunExe(), "process-stdin-test.js"],
|
|
cwd: dir,
|
|
env: bunEnv,
|
|
stdin: "pipe",
|
|
stdout: "pipe",
|
|
stderr: "inherit",
|
|
});
|
|
|
|
stdin.write(typedArray);
|
|
await stdin.end();
|
|
|
|
expect(await exited).toBe(0);
|
|
expect(await new Response(stdout).text()).toBe(`${ARRAY_SIZE}\n`);
|
|
});
|
|
});
|
|
|
|
it.if(isMacOS || isGlibcVersionAtLeast("2.36.0"))("TTY streams", () => {
|
|
const { stdout, stderr, exitCode } = Bun.spawnSync({
|
|
cmd: [bunExe(), "test", join(import.meta.dir, "tty-streams.fixture.js")],
|
|
env: bunEnv,
|
|
stdio: ["ignore", "pipe", "pipe"],
|
|
});
|
|
|
|
expect(stdout.toString()).toEqual(expect.stringContaining("bun test v1."));
|
|
try {
|
|
expect(stderr.toString()).toContain("0 fail");
|
|
} catch (error) {
|
|
throw new Error(stderr.toString());
|
|
}
|
|
expect(exitCode).toBe(0);
|
|
});
|
|
|
|
it("Readable.toWeb", async () => {
|
|
const readable = new Readable({
|
|
read() {
|
|
this.push("Hello ");
|
|
this.push("World!\n");
|
|
this.push(null);
|
|
},
|
|
});
|
|
|
|
const webReadable = Readable.toWeb(readable);
|
|
expect(webReadable).toBeInstanceOf(ReadableStream);
|
|
|
|
const result = await new Response(webReadable).text();
|
|
expect(result).toBe("Hello World!\n");
|
|
});
|
|
|
|
it("Readable.fromWeb", async () => {
|
|
const readable = Readable.fromWeb(
|
|
new ReadableStream({
|
|
start(controller) {
|
|
controller.enqueue("Hello ");
|
|
controller.enqueue("World!\n");
|
|
controller.close();
|
|
},
|
|
}),
|
|
);
|
|
expect(readable).toBeInstanceOf(Readable);
|
|
|
|
const chunks = [];
|
|
for await (const chunk of readable) {
|
|
chunks.push(chunk);
|
|
}
|
|
expect(Buffer.concat(chunks).toString()).toBe("Hello World!\n");
|
|
});
|
|
|
|
it("#9242.5 Stream has constructor", () => {
|
|
const s = new Stream({});
|
|
expect(s.constructor).toBe(Stream);
|
|
});
|
|
it("#9242.6 Readable has constructor", () => {
|
|
const r = new Readable({});
|
|
expect(r.constructor).toBe(Readable);
|
|
});
|
|
it("#9242.7 Writable has constructor", () => {
|
|
const w = new Writable({});
|
|
expect(w.constructor).toBe(Writable);
|
|
});
|
|
it("#9242.8 Duplex has constructor", () => {
|
|
const d = new Duplex({});
|
|
expect(d.constructor).toBe(Duplex);
|
|
});
|
|
it("#9242.9 Transform has constructor", () => {
|
|
const t = new Transform({});
|
|
expect(t.constructor).toBe(Transform);
|
|
});
|
|
it("#9242.10 PassThrough has constructor", () => {
|
|
const pt = new PassThrough({});
|
|
expect(pt.constructor).toBe(PassThrough);
|
|
});
|
|
|
|
it("should send Readable events in the right order", async () => {
|
|
const package_dir = tmpdirSync();
|
|
const fixture_path = join(package_dir, "fixture.js");
|
|
|
|
await Bun.write(
|
|
fixture_path,
|
|
String.raw`
|
|
function patchEmitter(emitter, prefix) {
|
|
var oldEmit = emitter.emit;
|
|
|
|
emitter.emit = function () {
|
|
console.log([prefix, arguments[0]]);
|
|
oldEmit.apply(emitter, arguments);
|
|
};
|
|
}
|
|
|
|
const stream = require("node:stream");
|
|
|
|
const readable = new stream.Readable({
|
|
read() {
|
|
this.push("Hello ");
|
|
this.push("World!\n");
|
|
this.push(null);
|
|
},
|
|
});
|
|
patchEmitter(readable, "readable");
|
|
|
|
const webReadable = stream.Readable.toWeb(readable);
|
|
|
|
const result = await new Response(webReadable).text();
|
|
console.log([1, result]);
|
|
`,
|
|
);
|
|
|
|
const { stdout, stderr } = Bun.spawn({
|
|
cmd: [bunExe(), "run", fixture_path],
|
|
stdout: "pipe",
|
|
stdin: "ignore",
|
|
stderr: "pipe",
|
|
env: bunEnv,
|
|
});
|
|
const err = await new Response(stderr).text();
|
|
expect(err).toBeEmpty();
|
|
const out = await new Response(stdout).text();
|
|
expect(out.split("\n")).toEqual([
|
|
`[ "readable", "pause" ]`,
|
|
`[ "readable", "resume" ]`,
|
|
`[ "readable", "data" ]`,
|
|
`[ "readable", "data" ]`,
|
|
`[ "readable", "readable" ]`,
|
|
`[ "readable", "end" ]`,
|
|
`[ "readable", "close" ]`,
|
|
`[ 1, "Hello World!\\n" ]`,
|
|
``,
|
|
]);
|
|
});
|
|
|
|
it("emits newListener event _before_ adding the listener", () => {
|
|
const cb = jest.fn(event => {
|
|
expect(stream.listenerCount(event)).toBe(0);
|
|
});
|
|
const stream = new Stream();
|
|
stream.on("newListener", cb);
|
|
stream.on("foo", () => {});
|
|
expect(cb).toHaveBeenCalled();
|
|
});
|
|
|
|
it("reports error", () => {
|
|
expect(() => {
|
|
const dup = new Duplex({
|
|
read() {
|
|
this.push("Hello World!\n");
|
|
this.push(null);
|
|
},
|
|
write(chunk, encoding, callback) {
|
|
callback(new Error("test"));
|
|
},
|
|
});
|
|
|
|
dup.emit("error", new Error("test"));
|
|
}).toThrow("test");
|
|
});
|
|
|
|
it("should correctly call removed listeners", () => {
|
|
const s = new Stream();
|
|
let l2Called = false;
|
|
const l1 = () => {
|
|
s.removeListener("x", l2);
|
|
};
|
|
const l2 = () => {
|
|
l2Called = true;
|
|
};
|
|
s.on("x", l1);
|
|
s.on("x", l2);
|
|
|
|
s.emit("x");
|
|
expect(l2Called).toBeTrue();
|
|
});
|
|
|
|
it("should emit prefinish on current tick", done => {
|
|
class UpperCaseTransform extends Transform {
|
|
_transform(chunk, encoding, callback) {
|
|
this.push(chunk.toString().toUpperCase());
|
|
callback();
|
|
}
|
|
}
|
|
|
|
const upperCaseTransform = new UpperCaseTransform();
|
|
|
|
let prefinishCalled = false;
|
|
upperCaseTransform.on("prefinish", () => {
|
|
prefinishCalled = true;
|
|
});
|
|
|
|
let finishCalled = false;
|
|
upperCaseTransform.on("finish", () => {
|
|
finishCalled = true;
|
|
});
|
|
|
|
upperCaseTransform.end("hi");
|
|
|
|
expect(prefinishCalled).toBeTrue();
|
|
|
|
const res = upperCaseTransform.read();
|
|
expect(res.toString()).toBe("HI");
|
|
|
|
expect(finishCalled).toBeFalse();
|
|
|
|
process.nextTick(() => {
|
|
expect(finishCalled).toBeTrue();
|
|
done();
|
|
});
|
|
});
|
|
|
|
for (const size of [0x10, 0xffff, 0x10000, 0x1f000, 0x20000, 0x20010, 0x7ffff, 0x80000, 0xa0000, 0xa0010]) {
|
|
it(`should emit 'readable' with null data and 'close' exactly once each, 0x${size.toString(16)} bytes`, async () => {
|
|
const path = `${tmpdir()}/${Date.now()}.readable_and_close.txt`;
|
|
writeFileSync(path, new Uint8Array(size));
|
|
const stream = createReadStream(path);
|
|
const close_resolvers = Promise.withResolvers();
|
|
const readable_resolvers = Promise.withResolvers();
|
|
|
|
stream.on("close", () => {
|
|
close_resolvers.resolve();
|
|
});
|
|
|
|
stream.on("readable", () => {
|
|
const data = stream.read();
|
|
if (data === null) {
|
|
readable_resolvers.resolve();
|
|
}
|
|
});
|
|
|
|
await Promise.all([close_resolvers.promise, readable_resolvers.promise]);
|
|
});
|
|
}
|