mirror of
https://github.com/oven-sh/bun
synced 2026-02-02 15:08:46 +00:00
Make uploading files with fetch()fast (#3125)
* Make file uploads fast * Add benchmark * Update README.md * defaults * print * prettier * smaller * fix(path) fix parse behavior (#3134) * Add macro docs (#3139) * Add macro doc * Updates * Tweaks * Update doc * Update macro serialization doc * Update macro doc * `--no-macros` flag, disable macros in node_modules * invert base/filename internally (#3141) * always false * Fix broken test * Add a test sendfile() test with large file --------- Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> Co-authored-by: Ciro Spaciari <ciro.spaciari@gmail.com> Co-authored-by: Colin McDonnell <colinmcd94@gmail.com>
This commit is contained in:
1
bench/stream-file-upload-client/.gitignore
vendored
Normal file
1
bench/stream-file-upload-client/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
|||||||
|
hello.txt
|
||||||
35
bench/stream-file-upload-client/README.md
Normal file
35
bench/stream-file-upload-client/README.md
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
# HTTP request file upload benchmark
|
||||||
|
|
||||||
|
This is a simple benchmark of uploading a file to a web server in different runtimes.
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
|
||||||
|
Generate a file to upload (default is `hello.txt`):
|
||||||
|
|
||||||
|
```bash
|
||||||
|
bun generate-file.js
|
||||||
|
```
|
||||||
|
|
||||||
|
Run the server:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
node server-node.mjs
|
||||||
|
```
|
||||||
|
|
||||||
|
Run the benchmark in bun:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
bun stream-file-bun.js
|
||||||
|
```
|
||||||
|
|
||||||
|
Run the benchmark in node:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
node stream-file-node.mjs
|
||||||
|
```
|
||||||
|
|
||||||
|
Run the benchmark in deno:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
deno run -A stream-file-deno.js
|
||||||
|
```
|
||||||
8
bench/stream-file-upload-client/generate-file.js
Normal file
8
bench/stream-file-upload-client/generate-file.js
Normal file
File diff suppressed because one or more lines are too long
15
bench/stream-file-upload-client/server-node.mjs
Normal file
15
bench/stream-file-upload-client/server-node.mjs
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
import { createServer } from "node:http";
|
||||||
|
const server = createServer((req, res) => {
|
||||||
|
var chunkSize = 0;
|
||||||
|
req.on("data", chunk => {
|
||||||
|
chunkSize += chunk.byteLength;
|
||||||
|
});
|
||||||
|
|
||||||
|
req.on("end", () => {
|
||||||
|
console.log("Received", chunkSize, "bytes");
|
||||||
|
res.end(`${chunkSize}`);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
server.listen(parseInt(process.env.PORT ?? "3000"), (err, port) => {
|
||||||
|
console.log(`http://localhost:${server.address().port}`);
|
||||||
|
});
|
||||||
9
bench/stream-file-upload-client/stream-file-bun.js
Normal file
9
bench/stream-file-upload-client/stream-file-bun.js
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
import { file } from "bun";
|
||||||
|
console.time("stream-file-bun");
|
||||||
|
const response = await fetch(process.env.URL ?? "http://localhost:3000", {
|
||||||
|
method: "POST",
|
||||||
|
body: file(process.env.FILE ?? "hello.txt"),
|
||||||
|
});
|
||||||
|
console.timeEnd("stream-file-bun");
|
||||||
|
|
||||||
|
console.log("Sent", await response.text(), "bytes");
|
||||||
12
bench/stream-file-upload-client/stream-file-deno.js
Normal file
12
bench/stream-file-upload-client/stream-file-deno.js
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
const file = await Deno.open(Deno.env.get("FILE") ?? "hello.txt", {
|
||||||
|
read: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
console.time("stream-file-deno");
|
||||||
|
const response = await fetch(Deno.env.get("URL") ?? "http://localhost:3000", {
|
||||||
|
method: "POST",
|
||||||
|
body: file.readable,
|
||||||
|
});
|
||||||
|
console.timeEnd("stream-file-deno");
|
||||||
|
|
||||||
|
console.log("Sent", await response.text(), "bytes");
|
||||||
19
bench/stream-file-upload-client/stream-file-node.mjs
Normal file
19
bench/stream-file-upload-client/stream-file-node.mjs
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
import { createReadStream } from "node:fs";
|
||||||
|
import http from "node:http";
|
||||||
|
|
||||||
|
console.time("stream-file-node");
|
||||||
|
createReadStream(process.env.FILE ?? "hello.txt")
|
||||||
|
.pipe(
|
||||||
|
http
|
||||||
|
.request(process.env.URL ?? "http://localhost:3000", {
|
||||||
|
method: "POST",
|
||||||
|
})
|
||||||
|
.on("response", response => {
|
||||||
|
response.on("data", data => {
|
||||||
|
console.log("Sent", parseInt(data.toString(), 10), "bytes");
|
||||||
|
});
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.on("close", () => {
|
||||||
|
console.timeEnd("stream-file-node");
|
||||||
|
});
|
||||||
@@ -629,7 +629,7 @@ pub const Fetch = struct {
|
|||||||
result: HTTPClient.HTTPClientResult = .{},
|
result: HTTPClient.HTTPClientResult = .{},
|
||||||
javascript_vm: *VirtualMachine = undefined,
|
javascript_vm: *VirtualMachine = undefined,
|
||||||
global_this: *JSGlobalObject = undefined,
|
global_this: *JSGlobalObject = undefined,
|
||||||
request_body: AnyBlob = undefined,
|
request_body: HTTPRequestBody = undefined,
|
||||||
response_buffer: MutableString = undefined,
|
response_buffer: MutableString = undefined,
|
||||||
request_headers: Headers = Headers{ .allocator = undefined },
|
request_headers: Headers = Headers{ .allocator = undefined },
|
||||||
promise: JSC.JSPromise.Strong,
|
promise: JSC.JSPromise.Strong,
|
||||||
@@ -647,6 +647,38 @@ pub const Fetch = struct {
|
|||||||
abort_reason: JSValue = JSValue.zero,
|
abort_reason: JSValue = JSValue.zero,
|
||||||
// Custom Hostname
|
// Custom Hostname
|
||||||
hostname: ?[]u8 = null,
|
hostname: ?[]u8 = null,
|
||||||
|
|
||||||
|
pub const HTTPRequestBody = union(enum) {
|
||||||
|
AnyBlob: AnyBlob,
|
||||||
|
Sendfile: HTTPClient.Sendfile,
|
||||||
|
|
||||||
|
pub fn store(this: *HTTPRequestBody) ?*JSC.WebCore.Blob.Store {
|
||||||
|
return switch (this.*) {
|
||||||
|
.AnyBlob => this.AnyBlob.store(),
|
||||||
|
else => null,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn slice(this: *const HTTPRequestBody) []const u8 {
|
||||||
|
return switch (this.*) {
|
||||||
|
.AnyBlob => this.AnyBlob.slice(),
|
||||||
|
else => "",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn detach(this: *HTTPRequestBody) void {
|
||||||
|
switch (this.*) {
|
||||||
|
.AnyBlob => this.AnyBlob.detach(),
|
||||||
|
.Sendfile => {
|
||||||
|
if (@max(this.Sendfile.offset, this.Sendfile.remain) > 0)
|
||||||
|
_ = JSC.Node.Syscall.close(this.Sendfile.fd);
|
||||||
|
this.Sendfile.offset = 0;
|
||||||
|
this.Sendfile.remain = 0;
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
pub fn init(_: std.mem.Allocator) anyerror!FetchTasklet {
|
pub fn init(_: std.mem.Allocator) anyerror!FetchTasklet {
|
||||||
return FetchTasklet{};
|
return FetchTasklet{};
|
||||||
}
|
}
|
||||||
@@ -850,12 +882,26 @@ pub const Fetch = struct {
|
|||||||
proxy = jsc_vm.bundler.env.getHttpProxy(fetch_options.url);
|
proxy = jsc_vm.bundler.env.getHttpProxy(fetch_options.url);
|
||||||
}
|
}
|
||||||
|
|
||||||
fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init(allocator, fetch_options.method, fetch_options.url, fetch_options.headers.entries, fetch_options.headers.buf.items, &fetch_tasklet.response_buffer, fetch_tasklet.request_body.slice(), fetch_options.timeout, HTTPClient.HTTPClientResult.Callback.New(
|
fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init(
|
||||||
*FetchTasklet,
|
allocator,
|
||||||
FetchTasklet.callback,
|
fetch_options.method,
|
||||||
).init(
|
fetch_options.url,
|
||||||
fetch_tasklet,
|
fetch_options.headers.entries,
|
||||||
), proxy, if (fetch_tasklet.signal != null) &fetch_tasklet.aborted else null, fetch_options.hostname, fetch_options.redirect_type);
|
fetch_options.headers.buf.items,
|
||||||
|
&fetch_tasklet.response_buffer,
|
||||||
|
fetch_tasklet.request_body.slice(),
|
||||||
|
fetch_options.timeout,
|
||||||
|
HTTPClient.HTTPClientResult.Callback.New(
|
||||||
|
*FetchTasklet,
|
||||||
|
FetchTasklet.callback,
|
||||||
|
).init(
|
||||||
|
fetch_tasklet,
|
||||||
|
),
|
||||||
|
proxy,
|
||||||
|
if (fetch_tasklet.signal != null) &fetch_tasklet.aborted else null,
|
||||||
|
fetch_options.hostname,
|
||||||
|
fetch_options.redirect_type,
|
||||||
|
);
|
||||||
|
|
||||||
if (fetch_options.redirect_type != FetchRedirect.follow) {
|
if (fetch_options.redirect_type != FetchRedirect.follow) {
|
||||||
fetch_tasklet.http.?.client.remaining_redirect_count = 0;
|
fetch_tasklet.http.?.client.remaining_redirect_count = 0;
|
||||||
@@ -865,6 +911,12 @@ pub const Fetch = struct {
|
|||||||
fetch_tasklet.http.?.client.verbose = fetch_options.verbose;
|
fetch_tasklet.http.?.client.verbose = fetch_options.verbose;
|
||||||
fetch_tasklet.http.?.client.disable_keepalive = fetch_options.disable_keepalive;
|
fetch_tasklet.http.?.client.disable_keepalive = fetch_options.disable_keepalive;
|
||||||
|
|
||||||
|
if (fetch_tasklet.request_body == .Sendfile) {
|
||||||
|
std.debug.assert(fetch_options.url.isHTTP());
|
||||||
|
std.debug.assert(fetch_options.proxy == null);
|
||||||
|
fetch_tasklet.http.?.request_body = .{ .sendfile = fetch_tasklet.request_body.Sendfile };
|
||||||
|
}
|
||||||
|
|
||||||
if (fetch_tasklet.signal) |signal| {
|
if (fetch_tasklet.signal) |signal| {
|
||||||
fetch_tasklet.signal = signal.listen(FetchTasklet, fetch_tasklet, FetchTasklet.abortListener);
|
fetch_tasklet.signal = signal.listen(FetchTasklet, fetch_tasklet, FetchTasklet.abortListener);
|
||||||
}
|
}
|
||||||
@@ -886,7 +938,7 @@ pub const Fetch = struct {
|
|||||||
const FetchOptions = struct {
|
const FetchOptions = struct {
|
||||||
method: Method,
|
method: Method,
|
||||||
headers: Headers,
|
headers: Headers,
|
||||||
body: AnyBlob,
|
body: HTTPRequestBody,
|
||||||
timeout: usize,
|
timeout: usize,
|
||||||
disable_timeout: bool,
|
disable_timeout: bool,
|
||||||
disable_keepalive: bool,
|
disable_keepalive: bool,
|
||||||
@@ -1339,36 +1391,114 @@ pub const Fetch = struct {
|
|||||||
) catch unreachable;
|
) catch unreachable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var http_body = FetchTasklet.HTTPRequestBody{
|
||||||
|
.AnyBlob = body,
|
||||||
|
};
|
||||||
|
|
||||||
if (body.needsToReadFile()) {
|
if (body.needsToReadFile()) {
|
||||||
// TODO: make this async + lazy
|
prepare_body: {
|
||||||
const res = JSC.Node.NodeFS.readFile(
|
const opened_fd_res: JSC.Node.Maybe(bun.FileDescriptor) = switch (body.Blob.store.?.data.file.pathlike) {
|
||||||
globalThis.bunVM().nodeFS(),
|
.fd => |fd| JSC.Node.Maybe(bun.FileDescriptor).errnoSysFd(JSC.Node.Syscall.system.dup(fd), .open, fd) orelse .{ .result = fd },
|
||||||
.{
|
.path => |path| JSC.Node.Syscall.open(path.sliceZ(&globalThis.bunVM().nodeFS().sync_error_buf), std.os.O.RDONLY | std.os.O.NOCTTY, 0),
|
||||||
.encoding = .buffer,
|
};
|
||||||
.path = body.Blob.store.?.data.file.pathlike,
|
|
||||||
.offset = body.Blob.offset,
|
|
||||||
.max_size = body.Blob.size,
|
|
||||||
},
|
|
||||||
.sync,
|
|
||||||
);
|
|
||||||
|
|
||||||
switch (res) {
|
const opened_fd = switch (opened_fd_res) {
|
||||||
.err => |err| {
|
.err => |err| {
|
||||||
bun.default_allocator.free(url_proxy_buffer);
|
bun.default_allocator.free(url_proxy_buffer);
|
||||||
|
|
||||||
const rejected_value = JSPromise.rejectedPromiseValue(globalThis, err.toJSC(globalThis));
|
const rejected_value = JSPromise.rejectedPromiseValue(globalThis, err.toJSC(globalThis));
|
||||||
body.detach();
|
body.detach();
|
||||||
if (headers) |*headers_| {
|
if (headers) |*headers_| {
|
||||||
headers_.buf.deinit(bun.default_allocator);
|
headers_.buf.deinit(bun.default_allocator);
|
||||||
headers_.entries.deinit(bun.default_allocator);
|
headers_.entries.deinit(bun.default_allocator);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rejected_value;
|
||||||
|
},
|
||||||
|
.result => |fd| fd,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (proxy == null and bun.HTTP.Sendfile.isEligible(url)) {
|
||||||
|
use_sendfile: {
|
||||||
|
const stat: std.os.Stat = switch (JSC.Node.Syscall.fstat(opened_fd)) {
|
||||||
|
.result => |result| result,
|
||||||
|
// bail out for any reason
|
||||||
|
.err => break :use_sendfile,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (Environment.isMac) {
|
||||||
|
// macOS only supports regular files for sendfile()
|
||||||
|
if (!std.os.S.ISREG(stat.mode)) {
|
||||||
|
break :use_sendfile;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// if it's < 32 KB, it's not worth it
|
||||||
|
if (stat.size < 32 * 1024) {
|
||||||
|
break :use_sendfile;
|
||||||
|
}
|
||||||
|
|
||||||
|
const original_size = body.Blob.size;
|
||||||
|
const stat_size = @intCast(Blob.SizeType, stat.size);
|
||||||
|
const blob_size = if (std.os.S.ISREG(stat.mode))
|
||||||
|
stat_size
|
||||||
|
else
|
||||||
|
@min(original_size, stat_size);
|
||||||
|
|
||||||
|
http_body = .{
|
||||||
|
.Sendfile = .{
|
||||||
|
.fd = opened_fd,
|
||||||
|
.remain = body.Blob.offset + original_size,
|
||||||
|
.offset = body.Blob.offset,
|
||||||
|
.content_size = blob_size,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
if (std.os.S.ISREG(stat.mode)) {
|
||||||
|
http_body.Sendfile.offset = @min(http_body.Sendfile.offset, stat_size);
|
||||||
|
http_body.Sendfile.remain = @min(@max(http_body.Sendfile.remain, http_body.Sendfile.offset), stat_size) -| http_body.Sendfile.offset;
|
||||||
|
}
|
||||||
|
body.detach();
|
||||||
|
|
||||||
|
break :prepare_body;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return rejected_value;
|
// TODO: make this async + lazy
|
||||||
},
|
const res = JSC.Node.NodeFS.readFile(
|
||||||
.result => |result| {
|
globalThis.bunVM().nodeFS(),
|
||||||
body.detach();
|
.{
|
||||||
body.from(std.ArrayList(u8).fromOwnedSlice(bun.default_allocator, @constCast(result.slice())));
|
.encoding = .buffer,
|
||||||
},
|
.path = .{ .fd = opened_fd },
|
||||||
|
.offset = body.Blob.offset,
|
||||||
|
.max_size = body.Blob.size,
|
||||||
|
},
|
||||||
|
.sync,
|
||||||
|
);
|
||||||
|
|
||||||
|
if (body.Blob.store.?.data.file.pathlike == .path) {
|
||||||
|
_ = JSC.Node.Syscall.close(opened_fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (res) {
|
||||||
|
.err => |err| {
|
||||||
|
bun.default_allocator.free(url_proxy_buffer);
|
||||||
|
|
||||||
|
const rejected_value = JSPromise.rejectedPromiseValue(globalThis, err.toJSC(globalThis));
|
||||||
|
body.detach();
|
||||||
|
if (headers) |*headers_| {
|
||||||
|
headers_.buf.deinit(bun.default_allocator);
|
||||||
|
headers_.entries.deinit(bun.default_allocator);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rejected_value;
|
||||||
|
},
|
||||||
|
.result => |result| {
|
||||||
|
body.detach();
|
||||||
|
body.from(std.ArrayList(u8).fromOwnedSlice(bun.default_allocator, @constCast(result.slice())));
|
||||||
|
http_body = .{ .AnyBlob = body };
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1388,7 +1518,7 @@ pub const Fetch = struct {
|
|||||||
.headers = headers orelse Headers{
|
.headers = headers orelse Headers{
|
||||||
.allocator = bun.default_allocator,
|
.allocator = bun.default_allocator,
|
||||||
},
|
},
|
||||||
.body = body,
|
.body = http_body,
|
||||||
.timeout = std.time.ns_per_hour,
|
.timeout = std.time.ns_per_hour,
|
||||||
.disable_keepalive = disable_keepalive,
|
.disable_keepalive = disable_keepalive,
|
||||||
.disable_timeout = disable_timeout,
|
.disable_timeout = disable_timeout,
|
||||||
|
|||||||
@@ -1572,4 +1572,9 @@ extern "C"
|
|||||||
return uwsRes->getNativeHandle();
|
return uwsRes->getNativeHandle();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void us_socket_sendfile_needs_more(us_socket_t *s) {
|
||||||
|
s->context->loop->data.last_write_failed = 1;
|
||||||
|
us_poll_change(&s->p, s->context->loop, LIBUS_SOCKET_READABLE | LIBUS_SOCKET_WRITABLE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -43,6 +43,23 @@ pub fn NewSocketHandler(comptime ssl: bool) type {
|
|||||||
pub fn getNativeHandle(this: ThisSocket) *NativeSocketHandleType(ssl) {
|
pub fn getNativeHandle(this: ThisSocket) *NativeSocketHandleType(ssl) {
|
||||||
return @ptrCast(*NativeSocketHandleType(ssl), us_socket_get_native_handle(comptime ssl_int, this.socket).?);
|
return @ptrCast(*NativeSocketHandleType(ssl), us_socket_get_native_handle(comptime ssl_int, this.socket).?);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn fd(this: ThisSocket) i32 {
|
||||||
|
if (comptime ssl) {
|
||||||
|
@compileError("SSL sockets do not have a file descriptor accessible this way");
|
||||||
|
}
|
||||||
|
|
||||||
|
return @intCast(i32, @ptrToInt(us_socket_get_native_handle(0, this.socket)));
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn markNeedsMoreForSendfile(this: ThisSocket) void {
|
||||||
|
if (comptime ssl) {
|
||||||
|
@compileError("SSL sockets do not support sendfile yet");
|
||||||
|
}
|
||||||
|
|
||||||
|
us_socket_sendfile_needs_more(this.socket);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn ext(this: ThisSocket, comptime ContextType: type) ?*ContextType {
|
pub fn ext(this: ThisSocket, comptime ContextType: type) ?*ContextType {
|
||||||
const alignment = if (ContextType == *anyopaque)
|
const alignment = if (ContextType == *anyopaque)
|
||||||
@sizeOf(usize)
|
@sizeOf(usize)
|
||||||
@@ -1882,3 +1899,5 @@ pub const State = enum(i32) {
|
|||||||
return @enumToInt(this) & @enumToInt(State.HTTP_CONNECTION_CLOSE) != 0;
|
return @enumToInt(this) & @enumToInt(State.HTTP_CONNECTION_CLOSE) != 0;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
extern fn us_socket_sendfile_needs_more(socket: *Socket) void;
|
||||||
|
|||||||
@@ -170,3 +170,5 @@ pub const source_map_debug_id = true;
|
|||||||
pub const alignment_tweak = false;
|
pub const alignment_tweak = false;
|
||||||
|
|
||||||
pub const export_star_redirect = false;
|
pub const export_star_redirect = false;
|
||||||
|
|
||||||
|
pub const streaming_file_uploads_for_http_client = true;
|
||||||
|
|||||||
@@ -71,6 +71,89 @@ pub const FetchRedirect = enum(u8) {
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
pub const HTTPRequestBody = union(enum) {
|
||||||
|
bytes: []const u8,
|
||||||
|
sendfile: Sendfile,
|
||||||
|
|
||||||
|
pub fn len(this: *const HTTPRequestBody) usize {
|
||||||
|
return switch (this.*) {
|
||||||
|
.bytes => this.bytes.len,
|
||||||
|
.sendfile => this.sendfile.content_size,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
pub const Sendfile = struct {
|
||||||
|
fd: bun.FileDescriptor,
|
||||||
|
remain: usize = 0,
|
||||||
|
offset: usize = 0,
|
||||||
|
content_size: usize = 0,
|
||||||
|
|
||||||
|
pub fn isEligible(url: bun.URL) bool {
|
||||||
|
return url.isHTTP() and url.href.len > 0 and FeatureFlags.streaming_file_uploads_for_http_client;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write(
|
||||||
|
this: *Sendfile,
|
||||||
|
socket: NewHTTPContext(false).HTTPSocket,
|
||||||
|
) Status {
|
||||||
|
const adjusted_count_temporary = @min(@as(u64, this.remain), @as(u63, std.math.maxInt(u63)));
|
||||||
|
// TODO we should not need this int cast; improve the return type of `@min`
|
||||||
|
const adjusted_count = @intCast(u63, adjusted_count_temporary);
|
||||||
|
|
||||||
|
if (Environment.isLinux) {
|
||||||
|
var signed_offset = @intCast(i64, this.offset);
|
||||||
|
const begin = this.offset;
|
||||||
|
const val =
|
||||||
|
// this does the syscall directly, without libc
|
||||||
|
std.os.linux.sendfile(socket.fd(), this.fd, &signed_offset, this.remain);
|
||||||
|
this.offset = @intCast(u64, signed_offset);
|
||||||
|
|
||||||
|
const errcode = std.os.linux.getErrno(val);
|
||||||
|
|
||||||
|
this.remain -|= @intCast(u64, this.offset -| begin);
|
||||||
|
|
||||||
|
if (errcode != .SUCCESS or this.remain == 0 or val == 0) {
|
||||||
|
if (errcode == .SUCCESS) {
|
||||||
|
return .{ .done = {} };
|
||||||
|
}
|
||||||
|
|
||||||
|
return .{ .err = AsyncIO.asError(errcode) };
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
var sbytes: std.os.off_t = adjusted_count;
|
||||||
|
const signed_offset = @bitCast(i64, @as(u64, this.offset));
|
||||||
|
const errcode = std.c.getErrno(std.c.sendfile(
|
||||||
|
this.fd,
|
||||||
|
socket.fd(),
|
||||||
|
|
||||||
|
signed_offset,
|
||||||
|
&sbytes,
|
||||||
|
null,
|
||||||
|
0,
|
||||||
|
));
|
||||||
|
const wrote = @intCast(u64, sbytes);
|
||||||
|
this.offset +|= wrote;
|
||||||
|
this.remain -|= wrote;
|
||||||
|
if (errcode != .AGAIN or this.remain == 0 or sbytes == 0) {
|
||||||
|
if (errcode == .SUCCESS) {
|
||||||
|
return .{ .done = {} };
|
||||||
|
}
|
||||||
|
|
||||||
|
return .{ .err = AsyncIO.asError(errcode) };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return .{ .again = {} };
|
||||||
|
}
|
||||||
|
|
||||||
|
pub const Status = union(enum) {
|
||||||
|
done: void,
|
||||||
|
err: anyerror,
|
||||||
|
again: void,
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
const ProxySSLData = struct {
|
const ProxySSLData = struct {
|
||||||
buffer: std.ArrayList(u8),
|
buffer: std.ArrayList(u8),
|
||||||
partial: bool,
|
partial: bool,
|
||||||
@@ -738,7 +821,7 @@ pub fn onClose(
|
|||||||
|
|
||||||
if (client.allow_retry) {
|
if (client.allow_retry) {
|
||||||
client.allow_retry = false;
|
client.allow_retry = false;
|
||||||
client.start(client.state.request_body, client.state.body_out_str.?);
|
client.start(client.state.original_request_body, client.state.body_out_str.?);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -915,14 +998,16 @@ pub const InternalState = struct {
|
|||||||
compressed_body: MutableString = undefined,
|
compressed_body: MutableString = undefined,
|
||||||
body_size: usize = 0,
|
body_size: usize = 0,
|
||||||
request_body: []const u8 = "",
|
request_body: []const u8 = "",
|
||||||
|
original_request_body: HTTPRequestBody = .{ .bytes = "" },
|
||||||
request_sent_len: usize = 0,
|
request_sent_len: usize = 0,
|
||||||
fail: anyerror = error.NoError,
|
fail: anyerror = error.NoError,
|
||||||
request_stage: HTTPStage = .pending,
|
request_stage: HTTPStage = .pending,
|
||||||
response_stage: HTTPStage = .pending,
|
response_stage: HTTPStage = .pending,
|
||||||
|
|
||||||
pub fn init(body: []const u8, body_out_str: *MutableString) InternalState {
|
pub fn init(body: HTTPRequestBody, body_out_str: *MutableString) InternalState {
|
||||||
return .{
|
return .{
|
||||||
.request_body = body,
|
.original_request_body = body,
|
||||||
|
.request_body = if (body == .bytes) body.bytes else "",
|
||||||
.compressed_body = MutableString{ .allocator = default_allocator, .list = .{} },
|
.compressed_body = MutableString{ .allocator = default_allocator, .list = .{} },
|
||||||
.response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
|
.response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
|
||||||
.body_out_str = body_out_str,
|
.body_out_str = body_out_str,
|
||||||
@@ -942,6 +1027,7 @@ pub const InternalState = struct {
|
|||||||
.body_out_str = body_msg,
|
.body_out_str = body_msg,
|
||||||
.compressed_body = MutableString{ .allocator = default_allocator, .list = .{} },
|
.compressed_body = MutableString{ .allocator = default_allocator, .list = .{} },
|
||||||
.response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
|
.response_message_buffer = MutableString{ .allocator = default_allocator, .list = .{} },
|
||||||
|
.original_request_body = .{ .bytes = "" },
|
||||||
.request_body = "",
|
.request_body = "",
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@@ -1191,7 +1277,7 @@ pub const AsyncHTTP = struct {
|
|||||||
request_headers: Headers.Entries = Headers.Entries{},
|
request_headers: Headers.Entries = Headers.Entries{},
|
||||||
response_headers: Headers.Entries = Headers.Entries{},
|
response_headers: Headers.Entries = Headers.Entries{},
|
||||||
response_buffer: *MutableString,
|
response_buffer: *MutableString,
|
||||||
request_body: []const u8 = "",
|
request_body: HTTPRequestBody = .{ .bytes = "" },
|
||||||
allocator: std.mem.Allocator,
|
allocator: std.mem.Allocator,
|
||||||
request_header_buf: string = "",
|
request_header_buf: string = "",
|
||||||
method: Method = Method.GET,
|
method: Method = Method.GET,
|
||||||
@@ -1278,7 +1364,18 @@ pub const AsyncHTTP = struct {
|
|||||||
hostname: ?[]u8,
|
hostname: ?[]u8,
|
||||||
redirect_type: FetchRedirect,
|
redirect_type: FetchRedirect,
|
||||||
) AsyncHTTP {
|
) AsyncHTTP {
|
||||||
var this = AsyncHTTP{ .allocator = allocator, .url = url, .method = method, .request_headers = headers, .request_header_buf = headers_buf, .request_body = request_body, .response_buffer = response_buffer, .completion_callback = callback, .http_proxy = http_proxy, .async_http_id = if (signal != null) async_http_id.fetchAdd(1, .Monotonic) else 0 };
|
var this = AsyncHTTP{
|
||||||
|
.allocator = allocator,
|
||||||
|
.url = url,
|
||||||
|
.method = method,
|
||||||
|
.request_headers = headers,
|
||||||
|
.request_header_buf = headers_buf,
|
||||||
|
.request_body = .{ .bytes = request_body },
|
||||||
|
.response_buffer = response_buffer,
|
||||||
|
.completion_callback = callback,
|
||||||
|
.http_proxy = http_proxy,
|
||||||
|
.async_http_id = if (signal != null) async_http_id.fetchAdd(1, .Monotonic) else 0,
|
||||||
|
};
|
||||||
|
|
||||||
this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, signal, hostname);
|
this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, signal, hostname);
|
||||||
this.client.async_http_id = this.async_http_id;
|
this.client.async_http_id = this.async_http_id;
|
||||||
@@ -1648,7 +1745,7 @@ pub fn doRedirect(this: *HTTPClient) void {
|
|||||||
if (this.aborted != null) {
|
if (this.aborted != null) {
|
||||||
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
|
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
|
||||||
}
|
}
|
||||||
return this.start("", body_out_str);
|
return this.start(.{ .bytes = "" }, body_out_str);
|
||||||
}
|
}
|
||||||
pub fn isHTTPS(this: *HTTPClient) bool {
|
pub fn isHTTPS(this: *HTTPClient) bool {
|
||||||
if (this.http_proxy) |proxy| {
|
if (this.http_proxy) |proxy| {
|
||||||
@@ -1662,7 +1759,7 @@ pub fn isHTTPS(this: *HTTPClient) bool {
|
|||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
pub fn start(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) void {
|
pub fn start(this: *HTTPClient, body: HTTPRequestBody, body_out_str: *MutableString) void {
|
||||||
body_out_str.reset();
|
body_out_str.reset();
|
||||||
|
|
||||||
std.debug.assert(this.state.response_message_buffer.list.capacity == 0);
|
std.debug.assert(this.state.response_message_buffer.list.capacity == 0);
|
||||||
@@ -1730,7 +1827,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
|
|||||||
|
|
||||||
this.setTimeout(socket, 60);
|
this.setTimeout(socket, 60);
|
||||||
|
|
||||||
const request = this.buildRequest(this.state.request_body.len);
|
const request = this.buildRequest(this.state.original_request_body.len());
|
||||||
|
|
||||||
if (this.http_proxy) |_| {
|
if (this.http_proxy) |_| {
|
||||||
if (this.url.isHTTPS()) {
|
if (this.url.isHTTPS()) {
|
||||||
@@ -1784,7 +1881,10 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
|
|||||||
std.debug.assert(!socket.isShutdown());
|
std.debug.assert(!socket.isShutdown());
|
||||||
std.debug.assert(!socket.isClosed());
|
std.debug.assert(!socket.isClosed());
|
||||||
}
|
}
|
||||||
const amount = socket.write(to_send, false);
|
const amount = socket.write(
|
||||||
|
to_send,
|
||||||
|
false,
|
||||||
|
);
|
||||||
if (comptime is_first_call) {
|
if (comptime is_first_call) {
|
||||||
if (amount == 0) {
|
if (amount == 0) {
|
||||||
// don't worry about it
|
// don't worry about it
|
||||||
@@ -1804,7 +1904,10 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
|
|||||||
this.state.request_body = this.state.request_body[this.state.request_sent_len - headers_len ..];
|
this.state.request_body = this.state.request_body[this.state.request_sent_len - headers_len ..];
|
||||||
}
|
}
|
||||||
|
|
||||||
const has_sent_body = this.state.request_body.len == 0;
|
const has_sent_body = if (this.state.original_request_body == .bytes)
|
||||||
|
this.state.request_body.len == 0
|
||||||
|
else
|
||||||
|
false;
|
||||||
|
|
||||||
if (has_sent_headers and has_sent_body) {
|
if (has_sent_headers and has_sent_body) {
|
||||||
this.state.request_stage = .done;
|
this.state.request_stage = .done;
|
||||||
@@ -1813,7 +1916,11 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
|
|||||||
|
|
||||||
if (has_sent_headers) {
|
if (has_sent_headers) {
|
||||||
this.state.request_stage = .body;
|
this.state.request_stage = .body;
|
||||||
std.debug.assert(this.state.request_body.len > 0);
|
std.debug.assert(
|
||||||
|
// we should have leftover data OR we use sendfile()
|
||||||
|
(this.state.original_request_body == .bytes and this.state.request_body.len > 0) or
|
||||||
|
this.state.original_request_body == .sendfile,
|
||||||
|
);
|
||||||
|
|
||||||
// we sent everything, but there's some body leftover
|
// we sent everything, but there's some body leftover
|
||||||
if (amount == @intCast(c_int, to_send.len)) {
|
if (amount == @intCast(c_int, to_send.len)) {
|
||||||
@@ -1826,19 +1933,42 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
|
|||||||
.body => {
|
.body => {
|
||||||
this.setTimeout(socket, 60);
|
this.setTimeout(socket, 60);
|
||||||
|
|
||||||
const to_send = this.state.request_body;
|
switch (this.state.original_request_body) {
|
||||||
const amount = socket.write(to_send, true);
|
.bytes => {
|
||||||
if (amount < 0) {
|
const to_send = this.state.request_body;
|
||||||
this.closeAndFail(error.WriteFailed, is_ssl, socket);
|
const amount = socket.write(to_send, true);
|
||||||
return;
|
if (amount < 0) {
|
||||||
}
|
this.closeAndFail(error.WriteFailed, is_ssl, socket);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
this.state.request_sent_len += @intCast(usize, amount);
|
this.state.request_sent_len += @intCast(usize, amount);
|
||||||
this.state.request_body = this.state.request_body[@intCast(usize, amount)..];
|
this.state.request_body = this.state.request_body[@intCast(usize, amount)..];
|
||||||
|
|
||||||
if (this.state.request_body.len == 0) {
|
if (this.state.request_body.len == 0) {
|
||||||
this.state.request_stage = .done;
|
this.state.request_stage = .done;
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
.sendfile => |*sendfile| {
|
||||||
|
if (comptime is_ssl) {
|
||||||
|
@panic("sendfile is only supported without SSL. This code should never have been reached!");
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (sendfile.write(socket)) {
|
||||||
|
.done => {
|
||||||
|
this.state.request_stage = .done;
|
||||||
|
return;
|
||||||
|
},
|
||||||
|
.err => |err| {
|
||||||
|
this.closeAndFail(err, false, socket);
|
||||||
|
return;
|
||||||
|
},
|
||||||
|
.again => {
|
||||||
|
socket.markNeedsMoreForSendfile();
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
.proxy_body => {
|
.proxy_body => {
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
import { expect, test, describe } from "bun:test";
|
import { expect, test, describe } from "bun:test";
|
||||||
import { withoutAggressiveGC } from "harness";
|
import { withoutAggressiveGC } from "harness";
|
||||||
|
import { tmpdir } from "os";
|
||||||
|
import { join } from "path";
|
||||||
|
|
||||||
test("uploads roundtrip", async () => {
|
test("uploads roundtrip", async () => {
|
||||||
const body = Bun.file(import.meta.dir + "/fetch.js.txt");
|
const body = Bun.file(import.meta.dir + "/fetch.js.txt");
|
||||||
@@ -32,6 +34,35 @@ test("uploads roundtrip", async () => {
|
|||||||
server.stop(true);
|
server.stop(true);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test("uploads roundtrip with sendfile()", async () => {
|
||||||
|
var hugeTxt = "huge".repeat(1024 * 1024 * 32);
|
||||||
|
const path = join(tmpdir(), "huge.txt");
|
||||||
|
require("fs").writeFileSync(path, hugeTxt);
|
||||||
|
|
||||||
|
const server = Bun.serve({
|
||||||
|
maxRequestBodySize: 1024 * 1024 * 1024 * 8,
|
||||||
|
async fetch(req) {
|
||||||
|
var count = 0;
|
||||||
|
for await (let chunk of req.body!) {
|
||||||
|
count += chunk.byteLength;
|
||||||
|
}
|
||||||
|
return new Response(count + "");
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const resp = await fetch("http://" + server.hostname + ":" + server.port, {
|
||||||
|
body: Bun.file(path),
|
||||||
|
method: "PUT",
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(resp.status).toBe(200);
|
||||||
|
|
||||||
|
const body = parseInt(await resp.text());
|
||||||
|
expect(body).toBe(hugeTxt.length);
|
||||||
|
|
||||||
|
server.stop(true);
|
||||||
|
});
|
||||||
|
|
||||||
test("missing file throws the expected error", async () => {
|
test("missing file throws the expected error", async () => {
|
||||||
Bun.gc(true);
|
Bun.gc(true);
|
||||||
// Run this 1000 times to check for GC bugs
|
// Run this 1000 times to check for GC bugs
|
||||||
|
|||||||
@@ -12,8 +12,7 @@ it("onAborted() and onWritable are not called after receiving an empty response
|
|||||||
testDone(new Error("Test timed out, which means it failed"));
|
testDone(new Error("Test timed out, which means it failed"));
|
||||||
};
|
};
|
||||||
|
|
||||||
const body = new FormData();
|
const invalidJSON = Buffer.from("invalid json");
|
||||||
body.append("hey", "hi");
|
|
||||||
|
|
||||||
// We want to test that the server isn't keeping the connection open in a
|
// We want to test that the server isn't keeping the connection open in a
|
||||||
// zombie-like state when an error occurs due to an unhandled rejected promise
|
// zombie-like state when an error occurs due to an unhandled rejected promise
|
||||||
@@ -69,7 +68,7 @@ it("onAborted() and onWritable are not called after receiving an empty response
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
await fetch(`http://${hostname}:${port}/upload`, {
|
await fetch(`http://${hostname}:${port}/upload`, {
|
||||||
body,
|
body: invalidJSON,
|
||||||
keepalive: false,
|
keepalive: false,
|
||||||
method: "POST",
|
method: "POST",
|
||||||
timeout: true,
|
timeout: true,
|
||||||
@@ -91,4 +90,4 @@ it("onAborted() and onWritable are not called after receiving an empty response
|
|||||||
}
|
}
|
||||||
timeout.onabort = () => {};
|
timeout.onabort = () => {};
|
||||||
testDone();
|
testDone();
|
||||||
});
|
}, 30_000);
|
||||||
|
|||||||
Reference in New Issue
Block a user