Cork streams when possible

This commit is contained in:
Jarred Sumner
2022-06-27 05:32:46 -07:00
parent f66c277e54
commit 628cbc8eb3
2 changed files with 99 additions and 148 deletions

View File

@@ -875,6 +875,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
pub fn writeStatus(this: *RequestContext, status: u16) void {
var status_text_buf: [48]u8 = undefined;
std.debug.assert(!this.has_written_status);
this.has_written_status = true;
if (status == 302) {
this.resp.writeStatus("302 Found");
@@ -1073,7 +1075,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
.socket_fd = if (!this.aborted) this.resp.getNativeHandle() else -999,
};
this.resp.runCorked(*RequestContext, renderMetadataAndNewline, this);
this.resp.runCorkedWithType(*RequestContext, renderMetadataAndNewline, this);
if (this.blob.size == 0) {
this.cleanupAndFinalizeAfterSendfile();
@@ -1221,10 +1223,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
},
.JavaScript, .Direct => {
if (this.has_abort_handler)
this.resp.runCorked(*RequestContext, renderMetadata, this)
else
// uWS automatically adds the status line if needed
// we want to batch network calls as much as possible
if (!(this.response_ptr.?.statusCode() == 200 or this.response_headers == null)) {
this.renderMetadata();
}
stream.value.ensureStillAlive();
var response_stream = this.allocator.create(ResponseStream.JSSink) catch unreachable;
@@ -1243,11 +1246,14 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
signal.clear();
std.debug.assert(signal.isDead());
const assignment_result: JSValue = ResponseStream.JSSink.assignToStream(
this.server.globalThis,
stream.value,
response_stream,
@ptrCast(**anyopaque, &signal.ptr),
const assignment_result: JSValue = this.resp.corked(
ResponseStream.JSSink.assignToStream,
.{
this.server.globalThis,
stream.value,
response_stream,
@ptrCast(**anyopaque, &signal.ptr),
},
);
// assert that it was updated
@@ -1381,7 +1387,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
pub fn doRenderBlob(this: *RequestContext) void {
if (this.has_abort_handler)
this.resp.runCorked(*RequestContext, renderMetadata, this)
this.resp.runCorkedWithType(*RequestContext, renderMetadata, this)
else
this.renderMetadata();
@@ -1482,9 +1488,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
else
status;
std.debug.assert(!this.has_written_status);
this.has_written_status = true;
this.writeStatus(status);
var needs_content_type = true;
const content_type: MimeType = brk: {

View File

@@ -837,8 +837,9 @@ pub const ArrayBufferSink = struct {
}
pub fn start(this: *ArrayBufferSink, stream_start: StreamStart) JSC.Node.Maybe(void) {
this.bytes.len = 0;
var list = this.bytes.listManaged(this.allocator);
list.clearAndFree();
list.clearRetainingCapacity();
switch (stream_start) {
.ArrayBufferSink => |config| {
@@ -882,6 +883,8 @@ pub const ArrayBufferSink = struct {
this.bytes = bun.ByteList.init("");
this.done = true;
}
this.allocator.destroy(this);
}
pub fn init(allocator: std.mem.Allocator, next: ?Sink) !*ArrayBufferSink {
@@ -1100,13 +1103,28 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
shim.cppFn("detachPtr", .{ptr});
}
fn getThis(globalThis: *JSGlobalObject, callframe: *const JSC.CallFrame) ?*ThisSink {
return @ptrCast(
*ThisSink,
@alignCast(
std.meta.alignment(ThisSink),
fromJS(
globalThis,
callframe.this(),
) orelse return null,
),
);
}
fn invalidThis(globalThis: *JSGlobalObject) JSValue {
const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis);
globalThis.vm().throwError(globalThis, err);
return JSC.JSValue.jsUndefined();
}
pub fn write(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue {
JSC.markBinding();
var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), fromJS(globalThis, callframe.this()) orelse {
const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis);
globalThis.vm().throwError(globalThis, err);
return JSC.JSValue.jsUndefined();
}));
var this = getThis(globalThis, callframe) orelse return invalidThis(globalThis);
if (comptime @hasDecl(SinkType, "getPendingError")) {
if (this.sink.getPendingError()) |err| {
@@ -1153,11 +1171,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
pub fn writeString(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue {
JSC.markBinding();
var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), fromJS(globalThis, callframe.this()) orelse {
const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis);
globalThis.vm().throwError(globalThis, err);
return JSC.JSValue.jsUndefined();
}));
var this = getThis(globalThis, callframe) orelse return invalidThis(globalThis);
if (comptime @hasDecl(SinkType, "getPendingError")) {
if (this.sink.getPendingError()) |err| {
@@ -1194,11 +1208,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
pub fn close(globalThis: *JSGlobalObject, sink_ptr: ?*anyopaque) callconv(.C) JSValue {
JSC.markBinding();
var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), sink_ptr) orelse {
const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis);
globalThis.vm().throwError(globalThis, err);
return JSC.JSValue.jsUndefined();
});
var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), sink_ptr orelse return invalidThis(globalThis)));
if (comptime @hasDecl(SinkType, "getPendingError")) {
if (this.sink.getPendingError()) |err| {
@@ -1213,11 +1223,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
pub fn drain(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue {
JSC.markBinding();
var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), fromJS(globalThis, callframe.this()) orelse {
const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis);
globalThis.vm().throwError(globalThis, err);
return JSC.JSValue.jsUndefined();
}));
var this = getThis(globalThis, callframe) orelse return invalidThis(globalThis);
if (comptime @hasDecl(SinkType, "getPendingError")) {
if (this.sink.getPendingError()) |err| {
@@ -1236,11 +1242,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
pub fn start(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue {
JSC.markBinding();
var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), fromJS(globalThis, callframe.this()) orelse {
const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis);
globalThis.vm().throwError(globalThis, err);
return JSC.JSValue.jsUndefined();
}));
var this = getThis(globalThis, callframe) orelse return invalidThis(globalThis);
if (comptime @hasDecl(SinkType, "getPendingError")) {
if (this.sink.getPendingError()) |err| {
@@ -1273,11 +1275,7 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
pub fn end(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue {
JSC.markBinding();
var this = @ptrCast(*ThisSink, @alignCast(std.meta.alignment(ThisSink), fromJS(globalThis, callframe.this()) orelse {
const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Sink", .{}, globalThis);
globalThis.vm().throwError(globalThis, err);
return JSC.JSValue.jsUndefined();
}));
var this = getThis(globalThis, callframe) orelse return invalidThis(globalThis);
if (comptime @hasDecl(SinkType, "getPendingError")) {
if (this.sink.getPendingError()) |err| {
@@ -1336,109 +1334,30 @@ pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
};
}
pub fn WritableStreamSink(
comptime Context: type,
comptime onStart: ?fn (this: Context) void,
comptime onWrite: fn (this: Context, bytes: []const u8) JSC.Maybe(Blob.SizeType),
comptime onAbort: ?fn (this: Context) void,
comptime onClose: ?fn (this: Context) void,
comptime deinit: ?fn (this: Context) void,
) type {
return struct {
context: Context,
closed: bool = false,
deinited: bool = false,
pending_err: ?JSC.Node.Syscall.Error = null,
aborted: bool = false,
// pub fn NetworkSocket(comptime tls: bool) type {
// return struct {
// const Socket = uws.NewSocketHandler(tls);
// const ThisSocket = @This();
abort_signaler: ?*anyopaque = null,
onAbortCallback: ?fn (?*anyopaque) void = null,
// socket: Socket,
close_signaler: ?*anyopaque = null,
onCloseCallback: ?fn (?*anyopaque) void = null,
// pub fn connect(globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue {
// JSC.markBinding();
pub const This = @This();
// var this = @ptrCast(*ThisSocket, @alignCast(std.meta.alignment(ThisSocket), fromJS(globalThis, callframe.this()) orelse {
// const err = JSC.toTypeError(JSC.Node.ErrorCode.ERR_INVALID_THIS, "Expected Socket", .{}, globalThis);
// globalThis.vm().throwError(globalThis, err);
// return JSC.JSValue.jsUndefined();
// }));
pub fn write(this: *This, bytes: []const u8) JSC.Maybe(Blob.SizeType) {
if (this.pending_err) |err| {
this.pending_err = null;
return .{ .err = err };
}
if (this.closed or this.aborted or this.deinited) {
return .{ .result = 0 };
}
return onWrite(&this.context, bytes);
}
pub fn start(this: *This) StreamStart {
return onStart(&this.context);
}
pub fn abort(this: *This) void {
if (this.closed or this.deinited or this.aborted) {
return;
}
this.aborted = true;
onAbort(&this.context);
}
pub fn didAbort(this: *This) void {
if (this.closed or this.deinited or this.aborted) {
return;
}
this.aborted = true;
if (this.onAbortCallback) |cb| {
this.onAbortCallback = null;
cb(this.abort_signaler);
}
}
pub fn didClose(this: *This) void {
if (this.closed or this.deinited or this.aborted) {
return;
}
this.closed = true;
if (this.onCloseCallback) |cb| {
this.onCloseCallback = null;
cb(this.close_signaler);
}
}
pub fn close(this: *This) void {
if (this.closed or this.deinited or this.aborted) {
return;
}
this.closed = true;
onClose(this.context);
}
pub fn deinit(this: *This) void {
if (this.deinited) {
return;
}
this.deinited = true;
deinit(this.context);
}
pub fn getError(this: *This) ?JSC.Node.Syscall.Error {
if (this.pending_err) |err| {
this.pending_err = null;
return err;
}
return null;
}
};
}
// this.socket.connect()
// }
// };
// }
pub fn HTTPServerWritable(comptime ssl: bool) type {
return struct {
pub const UWSResponse = uws.NewApp(ssl).Response;
const UWSResponse = uws.NewApp(ssl).Response;
res: *UWSResponse,
buffer: bun.ByteList,
offset: Blob.SizeType = 0,
@@ -1460,6 +1379,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
end_len: usize = 0,
aborted: bool = false,
const log = Output.scoped(.HTTPServerWritable, false);
pub fn connect(this: *@This(), signal: Signal) void {
this.signal = signal;
}
@@ -1485,16 +1406,19 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
std.debug.assert(!this.done);
const success = if (!this.requested_end) this.res.write(buf) else this.res.tryEnd(buf, this.end_len);
this.has_backpressure = !success;
log("send: {d} bytes ({d})", .{ buf.len, this.has_backpressure });
return success;
}
fn readableSlice(this: *@This()) []const u8 {
return this.buffer.ptr[this.offset..this.buffer.cap];
return this.buffer.ptr[this.offset..this.buffer.cap][0..this.buffer.len];
}
pub fn onWritable(this: *@This(), available: c_ulong, _: *UWSResponse) callconv(.C) bool {
log("onWritable ({d})", .{available});
if (this.done) {
this.res.end("", false);
this.res.endStream(false);
return false;
}
@@ -1521,6 +1445,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
// flush the javascript promise from calling .drain()
if (this.pending_drain) |prom| {
this.pending_drain = null;
log("flush promise ({d})", .{readable.len});
prom.asValue(this.globalThis).unprotect();
prom.resolve(this.globalThis, JSC.jsNumber(readable.len));
}
@@ -1550,6 +1476,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
pub fn start(this: *@This(), _: StreamStart) JSC.Node.Maybe(void) {
log("start()", .{});
if (this.res.hasResponded()) {
this.done = true;
this.signal.close(null);
@@ -1562,7 +1490,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
pub fn drainFromJS(this: *@This(), globalThis: *JSGlobalObject) JSC.Node.Maybe(JSValue) {
if (this.buffer.len == 0 or this.done) {
log("drainFromJS()", .{});
if (!this.hasBackpressure() or this.done) {
return .{ .result = JSValue.jsNumberFromInt32(0) };
}
@@ -1579,7 +1509,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
pub fn drain(this: *@This()) JSC.Node.Maybe(void) {
if (this.buffer.len == 0 or this.done) {
log("drain()", .{});
if (!this.hasBackpressure() or this.done) {
return .{ .result = {} };
}
@@ -1597,6 +1528,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
const bytes = data.slice();
log("write({d})", .{bytes.len});
if (!this.hasBackpressure()) {
if (this.buffer.len == 0) {
// fast path:
@@ -1646,6 +1580,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
const bytes = data.slice();
log("writeLatin1({d})", .{bytes.len});
if (!this.hasBackpressure()) {
if (this.buffer.len == 0 and strings.isAllASCII(bytes)) {
// fast path:
@@ -1695,6 +1631,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
const bytes = data.slice();
log("writeUTF16({d})", .{bytes.len});
var written: usize = undefined;
if (!this.hasBackpressure()) {
// we must always buffer UTF-16
@@ -1720,6 +1659,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
pub fn end(this: *@This(), err: ?JSC.Node.Syscall.Error) JSC.Node.Maybe(void) {
log("end({s})", .{err});
if (this.requested_end) {
return .{ .result = {} };
}
@@ -1736,7 +1677,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (readable.len == 0) {
this.done = true;
this.res.end("", false);
this.res.endStream(false);
return .{ .result = {} };
}
@@ -1753,6 +1694,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
pub fn endFromJS(this: *@This(), globalThis: *JSGlobalObject) JSC.Node.Maybe(JSValue) {
log("endFromJS()", .{});
if (this.requested_end) {
return .{ .result = JSC.JSValue.jsNumber(0) };
}
@@ -1769,10 +1712,10 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (readable.len == 0) {
this.done = true;
this.res.end("", false);
this.res.endStream(false);
this.signal.close(null);
this.done = true;
return .{ .result = JSC.JSValue.jsNumber(0) };
return .{ .result = JSC.JSValue.jsNumber(this.wrote) };
}
if (!this.hasBackpressure()) {
@@ -1803,6 +1746,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
pub fn onAborted(this: *@This(), _: *UWSResponse) void {
log("onAborted()", .{});
this.signal.close(null);
this.done = true;
this.aborted = true;
@@ -1810,9 +1754,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
pub fn finalize(this: *@This()) void {
log("finalize()", .{});
if (!this.done) {
this.done = true;
this.res.end("", false);
this.res.endStream(false);
}
var bytes = this.buffer.listManaged(this.allocator);
@@ -1823,6 +1769,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
pub fn flushPromise(this: *@This()) void {
if (this.pending_drain) |prom| {
log("flushPromise()", .{});
this.pending_drain = null;
prom.asValue(this.globalThis).unprotect();
prom.resolve(this.globalThis, JSC.JSValue.jsNumber(0));