fix(node-fetch): use stream.Readable instead of web streams (#4394)

* fix blobFrom

* fix(node-fetch): use stream.Readable instead of web streams

* uncomment

* comment why
This commit is contained in:
dave caruso
2023-08-29 19:45:16 -07:00
committed by GitHub
parent 3f4bc625ff
commit a846852818
10 changed files with 209 additions and 106 deletions

View File

@@ -84,7 +84,7 @@ globalThis.requireTransformer = (specifier: string, from: string) => {
const found = moduleList.indexOf(path.relative(BASE, relativeMatch));
if (found === -1) {
throw new Error(
`Builtin Bundler: "${specifier}" cannot be imported here because it doesn't get a module ID. Only files in "src/js" besides "src/js/builtins" can be used here.`,
`Builtin Bundler: "${specifier}" cannot be imported here because it doesn't get a module ID. Only files in "src/js" besides "src/js/builtins" can be used here. Note that the 'node:' or 'bun:' prefix is required here. `,
);
}
return codegenRequireId(`${found}/*${path.relative(BASE, relativeMatch)}*/`);

View File

@@ -3327,7 +3327,7 @@ var require_readable = __commonJS({
streamReadable.pause();
const cleanup = finished(streamReadable, error => {
const cleanup = eos(streamReadable, error => {
if (error?.code === "ERR_STREAM_PREMATURE_CLOSE") {
const err = new AbortError(undefined, { cause: error });
error = err;

File diff suppressed because one or more lines are too long

View File

@@ -1,5 +1,5 @@
const bunFetch = Bun.fetch;
const fetch = (...args) => bunFetch(...args);
const fetch = (...args: Parameters<typeof bunFetch>) => bunFetch(...args);
fetch.default = fetch;
fetch.fetch = fetch;
export default fetch;

View File

@@ -1,64 +0,0 @@
const { Headers, Request, Response, Blob, File = Blob, FormData } = globalThis;
const realFetch = Bun.fetch;
function fetch(...args) {
// require("node-fetch") returns the default export which means we need to
// repeat the ESM exports onto it.
//
// We don't want to copy that onto the global fetch object, so we wrap it.
return realFetch(...args);
}
class AbortError extends DOMException {
constructor(message) {
super(message, "AbortError");
}
}
class FetchBaseError extends Error {
constructor(message, type) {
super(message);
this.type = type;
}
}
class FetchError extends FetchBaseError {
constructor(message, type, systemError) {
super(message, type);
this.code = systemError?.code;
}
}
function blobFrom(path, options) {
return Promise.resolve(Bun.file(data));
}
function blobFromSync(path, options) {
return Bun.file(data);
}
var fileFrom = blobFrom;
var fileFromSync = blobFromSync;
function isRedirect(code) {
return code === 301 || code === 302 || code === 303 || code === 307 || code === 308;
}
export default Object.assign(fetch, {
AbortError,
Blob,
FetchBaseError,
FetchError,
File,
FormData,
Headers,
Request,
Response,
blobFrom,
blobFromSync,
fileFrom,
fileFromSync,
isRedirect,
fetch,
default: fetch,
});

97
src/js/thirdparty/node-fetch.ts vendored Normal file
View File

@@ -0,0 +1,97 @@
import type * as s from "stream";
const { Headers, Request, Response: WebResponse, Blob, File = Blob, FormData } = globalThis as any;
const nativeFetch = Bun.fetch;
const { Readable } = require("node:stream");
class Response extends WebResponse {
_body: any;
get body() {
return this._body ?? (this._body = Readable.fromWeb(super.body));
}
}
/**
* `node-fetch` works like the browser-fetch API, except it's a little more strict on some features,
* and uses node streams instead of web streams.
*
* It's overall a positive on speed to override the implementation, since most people will use something
* like `.json()` or `.text()`, which is faster in Bun's native fetch, vs `node-fetch` going
* through `node:http`, a node stream, then processing the data.
*/
async function fetch(url: any, init?: RequestInit & { body?: any }) {
// input node stream -> web stream
let body: s.Readable | undefined = init?.body;
if (body) {
const chunks: any = [];
if (body instanceof Readable) {
// TODO: Bun fetch() doesn't support ReadableStream at all.
for await (const chunk of body) {
chunks.push(chunk);
}
init = { ...init, body: new Blob(chunks) };
}
}
const response = await nativeFetch(url, init);
Object.setPrototypeOf(response, Response.prototype);
return response;
}
class AbortError extends DOMException {
constructor(message) {
super(message, "AbortError");
}
}
class FetchBaseError extends Error {
type: string;
constructor(message, type) {
super(message);
this.type = type;
}
}
class FetchError extends FetchBaseError {
constructor(message, type, systemError) {
super(message, type);
this.code = systemError?.code;
}
}
function blobFrom(path, options) {
return Promise.resolve(Bun.file(path, options));
}
function blobFromSync(path, options) {
return Bun.file(path, options);
}
var fileFrom = blobFrom;
var fileFromSync = blobFromSync;
function isRedirect(code) {
return code === 301 || code === 302 || code === 303 || code === 307 || code === 308;
}
export default Object.assign(fetch, {
AbortError,
Blob,
FetchBaseError,
FetchError,
File,
FormData,
Headers,
Request,
Response,
blobFrom,
blobFromSync,
fileFrom,
fileFromSync,
isRedirect,
fetch,
default: fetch,
});

View File

@@ -1,33 +0,0 @@
import fetch2, { fetch, Response, Request, Headers } from "node-fetch";
import * as iso from "isomorphic-fetch";
import * as vercelFetch from "@vercel/fetch";
import { test, expect } from "bun:test";
test("node-fetch", () => {
expect(Response).toBe(globalThis.Response);
expect(Request).toBe(globalThis.Request);
expect(Headers).toBe(globalThis.Headers);
});
for (const [impl, name] of [
[fetch, "node-fetch.fetch"],
[fetch2, "node-fetch.default"],
[fetch2.default, "node-fetch.default.default"],
[iso.fetch, "isomorphic-fetch.fetch"],
[iso.default.fetch, "isomorphic-fetch.default.fetch"],
[iso.default, "isomorphic-fetch.default"],
[vercelFetch.default(fetch), "@vercel/fetch.default"],
]) {
test(name + " fetches", async () => {
const server = Bun.serve({
port: 0,
fetch(req, server) {
server.stop();
return new Response();
},
});
expect(await impl("http://" + server.hostname + ":" + server.port)).toBeInstanceOf(Response);
server.stop(true);
});
}

View File

@@ -0,0 +1,68 @@
import fetch2, { fetch, Response, Request, Headers } from "node-fetch";
import * as iso from "isomorphic-fetch";
import * as vercelFetch from "@vercel/fetch";
import * as stream from "stream";
import { test, expect } from "bun:test";
test("node-fetch", () => {
expect(Response.prototype).toBeInstanceOf(globalThis.Response);
expect(Request).toBe(globalThis.Request);
expect(Headers).toBe(globalThis.Headers);
expect(fetch2.default).toBe(fetch2);
expect(fetch2.Response).toBe(Response);
});
for (const [impl, name] of [
[fetch, "node-fetch.fetch"],
[fetch2, "node-fetch.default"],
[fetch2.default, "node-fetch.default.default"],
[iso.fetch, "isomorphic-fetch.fetch"],
[iso.default.fetch, "isomorphic-fetch.default.fetch"],
[iso.default, "isomorphic-fetch.default"],
[vercelFetch.default(fetch), "@vercel/fetch.default"],
]) {
test(name + " fetches", async () => {
const server = Bun.serve({
port: 0,
fetch(req, server) {
server.stop();
return new Response("it works");
},
});
expect(await impl("http://" + server.hostname + ":" + server.port)).toBeInstanceOf(globalThis.Response);
server.stop(true);
});
}
test("node-fetch uses node streams instead of web streams", async () => {
const server = Bun.serve({
port: 0,
async fetch(req, server) {
const body = await req.text();
expect(body).toBe("the input text");
return new Response("hello world");
},
});
try {
const result = await fetch2("http://" + server.hostname + ":" + server.port, {
body: new stream.Readable({
read() {
this.push("the input text");
this.push(null);
},
}),
method: "POST",
});
expect(result.body).toBeInstanceOf(stream.Readable);
expect(result.body === result.body).toBe(true); // cached lazy getter
const chunks = [];
for await (const chunk of result.body) {
chunks.push(chunk);
}
expect(Buffer.concat(chunks).toString()).toBe("hello world");
} finally {
server.stop(true);
}
});

View File

@@ -314,3 +314,38 @@ it("TTY streams", () => {
expect(stderr.toString()).toContain("0 fail");
expect(exitCode).toBe(0);
});
it("Readable.toWeb", async () => {
const readable = new Readable({
read() {
this.push("Hello ");
this.push("World!\n");
this.push(null);
},
});
const webReadable = Readable.toWeb(readable);
expect(webReadable).toBeInstanceOf(ReadableStream);
const result = await new Response(webReadable).text();
expect(result).toBe("Hello World!\n");
});
it("Readable.fromWeb", async () => {
const readable = Readable.fromWeb(
new ReadableStream({
start(controller) {
controller.enqueue("Hello ");
controller.enqueue("World!\n");
controller.close();
},
}),
);
expect(readable).toBeInstanceOf(Readable);
const chunks = [];
for await (const chunk of readable) {
chunks.push(chunk);
}
expect(Buffer.concat(chunks).toString()).toBe("Hello World!\n");
});