Fix failing tests from backpressure

This commit is contained in:
Jarred Sumner
2022-09-30 19:43:03 -07:00
parent 48cb526e0b
commit e4e7966d64

View File

@@ -1917,12 +1917,18 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
fn send(this: *@This(), buf: []const u8) bool {
std.debug.assert(!this.done);
defer log("send: {d} bytes (backpressure: {d})", .{ buf.len, this.has_backpressure });
if (this.requested_end and !this.res.state().isHttpWriteCalled()) {
const success = this.res.tryEnd(buf, this.end_len, false);
this.has_backpressure = !success;
return success;
}
// uWebSockets lacks a tryWrite() function
// This means that backpressure will be handled by appending to an "infinite" memory buffer
// It will do the backpressure handling for us
// so in this scenario, we just append to the buffer
// and report success
if (this.requested_end) {
this.res.end(buf, false);
this.has_backpressure = false;
@@ -1930,7 +1936,6 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
} else {
const backpressure = this.res.write(buf);
this.has_backpressure = backpressure;
return true;
}
@@ -2119,47 +2124,40 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
const len = @truncate(Blob.SizeType, bytes.len);
log("write({d})", .{bytes.len});
if (!this.hasBackpressure()) {
if (this.buffer.len == 0 and len >= this.highWaterMark) {
// fast path:
// - large-ish chunk
// - no backpressure
if (this.send(bytes)) {
this.handleWrote(len);
return .{ .owned = len };
}
_ = this.buffer.write(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
} else if (this.buffer.len + len >= this.highWaterMark) {
// TODO: attempt to write both in a corked buffer?
_ = this.buffer.write(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
const slice = this.readableSlice();
if (this.send(slice)) {
this.handleWrote(slice.len);
this.buffer.len = 0;
return .{ .owned = len };
}
} else {
// queue the data
// do not send it
_ = this.buffer.write(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
if (this.buffer.len == 0 and len >= this.highWaterMark) {
// fast path:
// - large-ish chunk
// - no backpressure
if (this.send(bytes)) {
this.handleWrote(len);
return .{ .owned = len };
}
this.res.onWritable(*@This(), onWritable, this);
} else {
log("has backpressure", .{});
_ = this.buffer.write(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
} else if (this.buffer.len + len >= this.highWaterMark) {
// TODO: attempt to write both in a corked buffer?
_ = this.buffer.write(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
const slice = this.readableSlice();
if (this.send(slice)) {
this.handleWrote(slice.len);
this.buffer.len = 0;
return .{ .owned = len };
}
} else {
// queue the data
// do not send it
_ = this.buffer.write(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
return .{ .owned = len };
}
this.res.onWritable(*@This(), onWritable, this);
return .{ .owned = len };
}
pub const writeBytes = write;
@@ -2178,57 +2176,51 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
const len = @truncate(Blob.SizeType, bytes.len);
log("writeLatin1({d})", .{bytes.len});
if (!this.hasBackpressure()) {
if (this.buffer.len == 0 and len >= this.highWaterMark) {
var do_send = true;
// common case
if (strings.isAllASCII(bytes)) {
// fast path:
// - large-ish chunk
// - no backpressure
if (this.send(bytes)) {
this.handleWrote(bytes.len);
return .{ .owned = len };
}
do_send = false;
}
_ = this.buffer.writeLatin1(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
if (do_send) {
if (this.send(this.readableSlice())) {
this.handleWrote(bytes.len);
return .{ .owned = len };
}
}
} else if (this.buffer.len + len >= this.highWaterMark) {
// kinda fast path:
// - combined chunk is large enough to flush automatically
if (this.buffer.len == 0 and len >= this.highWaterMark) {
var do_send = true;
// common case
if (strings.isAllASCII(bytes)) {
// fast path:
// - large-ish chunk
// - no backpressure
_ = this.buffer.writeLatin1(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
const readable = this.readableSlice();
if (this.send(readable)) {
this.handleWrote(readable.len);
if (this.send(bytes)) {
this.handleWrote(bytes.len);
return .{ .owned = len };
}
} else {
_ = this.buffer.writeLatin1(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
return .{ .owned = len };
do_send = false;
}
this.res.onWritable(*@This(), onWritable, this);
_ = this.buffer.writeLatin1(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
if (do_send) {
if (this.send(this.readableSlice())) {
this.handleWrote(bytes.len);
return .{ .owned = len };
}
}
} else if (this.buffer.len + len >= this.highWaterMark) {
// kinda fast path:
// - combined chunk is large enough to flush automatically
// - no backpressure
_ = this.buffer.writeLatin1(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
const readable = this.readableSlice();
if (this.send(readable)) {
this.handleWrote(readable.len);
return .{ .owned = len };
}
} else {
_ = this.buffer.writeLatin1(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
return .{ .owned = len };
}
this.res.onWritable(*@This(), onWritable, this);
return .{ .owned = len };
}
pub fn writeUTF16(this: *@This(), data: StreamResult) StreamResult.Writable {
@@ -2246,31 +2238,24 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
log("writeUTF16({d})", .{bytes.len});
var written: usize = undefined;
if (!this.hasBackpressure()) {
// we must always buffer UTF-16
// we assume the case of all-ascii UTF-16 string is pretty uncommon
written = this.buffer.writeUTF16(this.allocator, @alignCast(2, std.mem.bytesAsSlice(u16, bytes))) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
// we must always buffer UTF-16
// we assume the case of all-ascii UTF-16 string is pretty uncommon
const written = this.buffer.writeUTF16(this.allocator, @alignCast(2, std.mem.bytesAsSlice(u16, bytes))) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
const readable = this.readableSlice();
const readable = this.readableSlice();
if (readable.len >= this.highWaterMark) {
if (this.send(readable)) {
this.handleWrote(readable.len);
return .{ .owned = @truncate(Blob.SizeType, written) };
}
this.res.onWritable(*@This(), onWritable, this);
if (readable.len >= this.highWaterMark or this.hasBackpressure()) {
if (this.send(readable)) {
this.handleWrote(readable.len);
return .{ .owned = @intCast(Blob.SizeType, written) };
}
} else {
written = this.buffer.writeUTF16(this.allocator, @alignCast(2, std.mem.bytesAsSlice(u16, bytes))) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
this.res.onWritable(*@This(), onWritable, this);
}
return .{ .owned = @truncate(Blob.SizeType, written) };
return .{ .owned = @intCast(Blob.SizeType, written) };
}
// In this case, it's always an error
@@ -2301,19 +2286,6 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
return .{ .result = {} };
}
if (!this.hasBackpressure()) {
if (this.send(readable)) {
this.handleWrote(readable.len);
this.signal.close(err);
this.done = true;
this.res.endStream(false);
this.finalize();
return .{ .result = {} };
}
this.res.onWritable(*@This(), onWritable, this);
}
return .{ .result = {} };
}
@@ -2335,38 +2307,25 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
const readable = this.readableSlice();
this.end_len = readable.len;
if (readable.len == 0) {
this.done = true;
this.res.endStream(false);
this.signal.close(null);
const wrote = this.wrote;
this.finalize();
return .{ .result = JSC.JSValue.jsNumber(wrote) };
}
if (!this.hasBackpressure()) {
if (this.send(readable)) {
this.handleWrote(readable.len);
this.signal.close(null);
this.done = true;
const wrote = this.wrote;
this.finalize();
return .{ .result = JSC.JSValue.jsNumber(wrote) };
if (readable.len > 0) {
if (!this.send(readable)) {
this.pending_flush = JSC.JSPromise.create(globalThis);
this.globalThis = globalThis;
const value = this.pending_flush.?.asValue(globalThis);
value.protect();
return .{ .result = value };
}
this.res.onWritable(*@This(), onWritable, this);
} else {
this.res.end("", false);
}
if (this.pending_flush) |prom| {
this.pending_flush = null;
return .{ .result = prom.asValue(globalThis) };
}
this.done = true;
this.flushPromise();
this.signal.close(null);
this.done = true;
this.finalize();
this.pending_flush = JSC.JSPromise.create(globalThis);
this.globalThis = globalThis;
const value = this.pending_flush.?.asValue(globalThis);
value.protect();
return .{ .result = value };
return .{ .result = JSC.JSValue.jsNumber(this.wrote) };
}
pub fn sink(this: *@This()) Sink {