fix(fetch) always use readable stream if it is available (#4503)

* always use readable stream if it is available

* use bun sleep

* fix tests

* rm uws dep
This commit is contained in:
Ciro Spaciari
2023-09-05 19:22:09 -03:00
committed by GitHub
parent d268097ded
commit 6e50dd210f
2 changed files with 82 additions and 27 deletions

View File

@@ -792,6 +792,36 @@ pub const Fetch = struct {
return;
}
if (this.readable_stream_ref.get()) |readable| {
if (readable.ptr == .Bytes) {
readable.ptr.Bytes.size_hint = this.getSizeHint();
// body can be marked as used but we still need to pipe the data
var scheduled_response_buffer = this.scheduled_response_buffer.list;
const chunk = scheduled_response_buffer.items;
if (this.result.has_more) {
readable.ptr.Bytes.onData(
.{
.temporary = bun.ByteList.initConst(chunk),
},
bun.default_allocator,
);
// clean for reuse later
this.scheduled_response_buffer.reset();
} else {
readable.ptr.Bytes.onData(
.{
.temporary_and_done = bun.ByteList.initConst(chunk),
},
bun.default_allocator,
);
}
return;
}
}
if (this.response.get()) |response_js| {
if (response_js.as(Response)) |response| {
const body = response.body;
@@ -854,33 +884,6 @@ pub const Fetch = struct {
old.resolve(&response.body.value, this.global_this);
}
}
} else if (this.readable_stream_ref.get()) |readable| {
if (readable.ptr == .Bytes) {
readable.ptr.Bytes.size_hint = this.getSizeHint();
// body can be marked as used but we still need to pipe the data
var scheduled_response_buffer = this.scheduled_response_buffer.list;
const chunk = scheduled_response_buffer.items;
if (this.result.has_more) {
readable.ptr.Bytes.onData(
.{
.temporary = bun.ByteList.initConst(chunk),
},
bun.default_allocator,
);
// clean for reuse later
this.scheduled_response_buffer.reset();
} else {
readable.ptr.Bytes.onData(
.{
.temporary_and_done = bun.ByteList.initConst(chunk),
},
bun.default_allocator,
);
}
}
}
}
}

View File

@@ -4,6 +4,17 @@ import { join } from "path";
import { describe, expect, it } from "bun:test";
import { gcTick } from "harness";
import zlib from "zlib";
import http from "http";
import { createReadStream } from "fs";
import { pipeline } from "stream";
import type { AddressInfo } from "net";
const files = [
join(import.meta.dir, "fixture.html"),
join(import.meta.dir, "fixture.png"),
join(import.meta.dir, "fixture.png.gz"),
];
const fixtures = {
"fixture": readFileSync(join(import.meta.dir, "fixture.html")),
"fixture.png": readFileSync(join(import.meta.dir, "fixture.png")),
@@ -51,6 +62,47 @@ describe("fetch() with streaming", () => {
}
});
for (let file of files) {
it("stream can handle response.body + await response.something() #4500", async () => {
let server: ReturnType<typeof http.createServer> | null = null;
try {
const errorHandler = (err: any) => expect(err).toBeUndefined();
server = http
.createServer(function (req, res) {
res.writeHead(200, { "Content-Type": "text/plain" });
pipeline(createReadStream(file), res, errorHandler);
})
.listen(0);
const address = server.address() as AddressInfo;
const url = `http://${address.address}:${address.port}`;
async function getRequestLen(url: string) {
const response = await fetch(url);
const hasBody = response.body;
if (hasBody) {
const res = await response.blob();
return res.size;
}
return 0;
}
for (let i = 0; i < 10; i++) {
let len = await getRequestLen(url);
if (len <= 0) {
throw new Error("Request length is 0");
}
await Bun.sleep(50);
}
expect(true).toBe(true);
} finally {
server?.close();
}
});
}
it("stream still works after response get out of scope", async () => {
let server: Server | null = null;
try {