[bun.js] 2/? Implement Response.file, sendfile edition

This commit is contained in:
Jarred Sumner
2022-03-21 06:32:14 -07:00
parent fa343fa8ad
commit 7cd93e6670
8 changed files with 506 additions and 161 deletions

View File

@@ -79,6 +79,12 @@ const IOTask = JSC.IOTask;
const is_bindgen = JSC.is_bindgen;
const uws = @import("uws");
const SendfileContext = struct {
fd: i32,
remain: u32 = 0,
offset: i64 = 0,
};
pub fn NewServer(comptime ssl_enabled: bool) type {
return struct {
const ThisServer = @This();
@@ -133,9 +139,17 @@ pub fn NewServer(comptime ssl_enabled: bool) type {
blob: JSC.WebCore.Blob = JSC.WebCore.Blob{},
promise: ?*JSC.JSValue = null,
response_headers: ?*JSC.WebCore.Headers.RefCountedHeaders = null,
has_abort_handler: bool = false,
has_sendfile_ctx: bool = false,
sendfile: SendfileContext = undefined,
pub threadlocal var pool: *RequestContextStackAllocator = undefined;
pub fn setAbortHandler(this: *RequestContext) void {
if (this.has_abort_handler) return;
this.has_abort_handler = true;
this.resp.onAborted(*RequestContext, RequestContext.onAbort, this);
}
pub fn onResolve(
ctx: *RequestContext,
_: *JSC.JSGlobalObject,
@@ -215,30 +229,206 @@ pub fn NewServer(comptime ssl_enabled: bool) type {
this.server.request_pool_allocator.destroy(this);
}
fn writeHeaders(
this: *RequestContext,
headers_: *Headers.RefCountedHeaders,
) void {
var headers: *JSC.WebCore.Headers = headers_.get();
if (headers.getHeaderIndex("content-length")) |index| {
headers.entries.orderedRemove(index);
}
defer headers_.deref();
var entries = headers.entries.slice();
const names = entries.items(.name);
const values = entries.items(.value);
this.resp.writeHeaderInt("content-length", this.blob.size);
this.resp.writeHeaders(names, values, headers.buf.items);
}
pub fn writeStatus(this: *RequestContext, status: u16) void {
var status_text_buf: [48]u8 = undefined;
if (status == 302) {
this.resp.writeStatus("302 Found");
} else {
this.resp.writeStatus(std.fmt.bufPrint(&status_text_buf, "{d} HM", .{status}) catch unreachable);
}
}
fn cleanupAfterSendfile(this: *RequestContext) void {
this.resp.endWithoutBody();
std.os.close(this.sendfile.fd);
this.sendfile = undefined;
this.finalize();
}
pub fn onSendfile(this: *RequestContext, amount_: c_ulong, response: *App.Response) callconv(.C) bool {
const amount = @minimum(@truncate(u32, amount_), this.sendfile.remain);
if (amount == 0 or this.aborted) {
this.cleanupAfterSendfile();
return true;
}
const adjusted_count_temporary = @minimum(amount, @as(u63, std.math.maxInt(i32)));
// TODO we should not need this int cast; improve the return type of `@minimum`
const adjusted_count = @intCast(u63, adjusted_count_temporary);
var sbytes: std.os.off_t = adjusted_count;
const signed_offset = @bitCast(i64, this.sendfile.offset);
if (Environment.isLinux) {
const sent = @truncate(
u32,
std.os.linux.sendfile(response.getNativeHandle(), this.sendfile.fd, &this.sendfile.offset, amount),
);
this.sendfile.offset += sent;
this.sendfile.remain -= sent;
if (sent == 0 or this.aborted or this.sendfile.remain == 0) {
this.cleanupAfterSendfile();
return false;
}
} else {
const errcode = std.c.getErrno(std.c.sendfile(
this.sendfile.fd,
response.getNativeHandle(),
signed_offset,
&sbytes,
null,
0,
));
this.sendfile.offset += sbytes;
this.sendfile.remain -= if (errcode != .SUCCESS) @intCast(u32, sbytes) else 0;
if ((errcode != .AGAIN and errcode != .SUCCESS) or this.aborted or this.sendfile.remain == 0) {
if (errcode != .AGAIN and errcode != .SUCCESS) {
Output.prettyErrorln("Error: {s}", .{@tagName(errcode)});
Output.flush();
}
this.cleanupAfterSendfile();
return false;
}
}
this.resp.onWritable(*RequestContext, onSendfile, this);
return true;
}
pub fn onWritablePrepareSendfile(this: *RequestContext, _: c_ulong, _: *App.Response) callconv(.C) bool {
this.renderSendFile(this.blob);
return true;
}
pub fn onPrepareSendfileWrap(this: *anyopaque, fd: i32, size: anyerror!u32, _: *JSGlobalObject) void {
onPrepareSendfile(bun.cast(*RequestContext, this), fd, size);
}
fn onPrepareSendfile(this: *RequestContext, fd: i32, size: anyerror!u32) void {
this.setAbortHandler();
if (this.aborted) return;
const size_ = size catch {
this.req.setYield(true);
this.finalize();
return;
};
this.blob.size = size_;
const code = this.response_ptr.?.statusCode();
if (size_ == 0 and code >= 200 and code < 300) {
this.writeStatus(204);
} else {
this.writeStatus(code);
}
if (this.response_ptr.?.body.init.headers) |headers_| {
this.writeHeaders(headers_);
} else {
this.resp.writeHeaderInt("content-length", size_);
}
this.sendfile = .{
.fd = fd,
.remain = size_,
};
if (size_ == 0) {
this.cleanupAfterSendfile();
this.finalize();
return;
}
{
const wrote = std.os.write(
this.resp.getNativeHandle(),
"\r\n",
) catch {
this.cleanupAfterSendfile();
return;
};
if (wrote == 0) {
this.cleanupAfterSendfile();
return;
}
}
// if we're not immediately writable, go ahead and try
if (this.sendfile.remain == size_) {
_ = this.onSendfile(size_, this.resp);
}
}
pub fn renderSendFile(this: *RequestContext, blob: JSC.WebCore.Blob) void {
if (this.has_sendfile_ctx) return;
this.has_sendfile_ctx = true;
JSC.WebCore.Blob.doOpenAndStatFile(
&this.blob,
*RequestContext,
this,
onPrepareSendfileWrap,
blob.globalThis,
);
}
pub fn doRender(this: *RequestContext) void {
if (this.aborted) {
return;
}
var response = this.response_ptr.?;
this.blob = response.body.use();
var body = &response.body;
if (body.value == .Error) {
this.resp.writeStatus("500 Internal Server Error");
this.resp.writeHeader("content-type", "text/plain");
this.resp.endWithoutBody();
JSC.VirtualMachine.vm.defaultErrorHandler(body.value.Error, null);
body.value = JSC.WebCore.Body.Value.empty;
this.finalize();
return;
}
if (body.value == .Blob) {
if (body.value.Blob.needsToReadFile()) {
this.blob = response.body.use();
this.req.setYield(false);
this.setAbortHandler();
this.resp.onWritable(*RequestContext, onWritablePrepareSendfile, this);
if (!this.has_sendfile_ctx) this.renderSendFile(this.blob);
return;
}
}
this.renderBytes(response);
}
pub fn renderBytes(this: *RequestContext, response: *JSC.WebCore.Response) void {
const status = response.statusCode();
this.writeStatus(status);
if (response.body.init.headers) |headers_| {
var headers: *JSC.WebCore.Headers = headers_.get();
defer headers_.deref();
var entries = headers.entries.slice();
const names = entries.items(.name);
const values = entries.items(.value);
var status_text_buf: [48]u8 = undefined;
if (status == 302) {
this.resp.writeStatus("302 Found");
} else {
this.resp.writeStatus(std.fmt.bufPrint(&status_text_buf, "{d} HM", .{response.body.init.status_code}) catch unreachable);
}
this.resp.writeHeaders(names, values, headers.buf.items);
this.writeHeaders(headers_);
}
if (status == 302 or status == 202 or this.blob.size == 0) {
@@ -253,8 +443,8 @@ pub fn NewServer(comptime ssl_enabled: bool) type {
pub fn render(this: *RequestContext, response: *JSC.WebCore.Response) void {
this.response_ptr = response;
this.resp.runCorked(*RequestContext, doRender, this);
this.response_ptr = null;
// this.resp.runCorked(*RequestContext, doRender, this);
this.doRender();
}
};
@@ -292,7 +482,7 @@ pub fn NewServer(comptime ssl_enabled: bool) type {
}
if (ctx.response_jsvalue.jsTypeLoose() == .JSPromise) {
resp.onAborted(*RequestContext, RequestContext.onAbort, ctx);
ctx.setAbortHandler();
JSC.VirtualMachine.vm.tick();
ctx.response_jsvalue.then(