mirror of
https://github.com/oven-sh/bun
synced 2026-02-02 15:08:46 +00:00
Compare commits
1 Commits
bun-v1.3.5
...
revert-989
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7cc09d23dc |
@@ -16,7 +16,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
// clang-format off
|
||||
|
||||
#ifndef UWS_ASYNCSOCKET_H
|
||||
#define UWS_ASYNCSOCKET_H
|
||||
|
||||
@@ -255,8 +255,10 @@ public:
|
||||
if (asyncSocketData->buffer.length()) {
|
||||
/* Write off as much as we can */
|
||||
int written = us_socket_write(SSL, (us_socket_t *) this, asyncSocketData->buffer.data(), (int) asyncSocketData->buffer.length(), /*nextLength != 0 | */length);
|
||||
|
||||
/* On failure return, otherwise continue down the function */
|
||||
if ((unsigned int) written < asyncSocketData->buffer.length()) {
|
||||
|
||||
/* Update buffering (todo: we can do better here if we keep track of what happens to this guy later on) */
|
||||
asyncSocketData->buffer.erase((unsigned int) written);
|
||||
|
||||
@@ -266,6 +268,7 @@ public:
|
||||
} else {
|
||||
/* This path is horrible and points towards erroneous usage */
|
||||
asyncSocketData->buffer.append(src, (unsigned int) length);
|
||||
|
||||
return {length, true};
|
||||
}
|
||||
}
|
||||
@@ -307,6 +310,7 @@ public:
|
||||
if (optionally) {
|
||||
return {written, true};
|
||||
}
|
||||
|
||||
/* Fall back to worst possible case (should be very rare for HTTP) */
|
||||
/* At least we can reserve room for next chunk if we know it up front */
|
||||
if (nextLength) {
|
||||
@@ -340,7 +344,7 @@ public:
|
||||
auto [written, failed] = write(loopData->corkBuffer, (int) loopData->corkOffset, false, length);
|
||||
loopData->corkOffset = 0;
|
||||
|
||||
if (failed && optionally) {
|
||||
if (failed) {
|
||||
/* We do not need to care for buffering here, write does that */
|
||||
return {0, true};
|
||||
}
|
||||
|
||||
@@ -374,7 +374,9 @@ private:
|
||||
return s;
|
||||
}
|
||||
|
||||
/* We need to drain any remaining buffered data if success == true*/
|
||||
/* We don't want to fall through since we don't want to mess with timeout.
|
||||
* It makes little sense to drain any backpressure when the user has registered onWritable. */
|
||||
return s;
|
||||
}
|
||||
|
||||
/* Drain any socket buffer, this might empty our backpressure and thus finish the request */
|
||||
|
||||
@@ -573,14 +573,6 @@ public:
|
||||
return this;
|
||||
}
|
||||
|
||||
/* Remove handler for writable HTTP response */
|
||||
HttpResponse *clearOnWritable() {
|
||||
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
|
||||
|
||||
httpResponseData->onWritable = nullptr;
|
||||
return this;
|
||||
}
|
||||
|
||||
/* Attach handler for aborted HTTP request */
|
||||
HttpResponse *onAborted(MoveOnlyFunction<void()> &&handler) {
|
||||
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
|
||||
|
||||
@@ -1967,6 +1967,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
const amount = @as(Blob.SizeType, @truncate(amount1));
|
||||
this.offset += amount;
|
||||
this.wrote += amount;
|
||||
this.buffer.len -|= @as(u32, @truncate(amount));
|
||||
|
||||
if (this.offset >= this.buffer.len) {
|
||||
this.offset = 0;
|
||||
@@ -1986,9 +1987,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
fn hasBackpressure(this: *const @This()) bool {
|
||||
return this.has_backpressure;
|
||||
}
|
||||
fn hasBackpressureAndIsTryEnd(this: *const @This()) bool {
|
||||
return this.has_backpressure and this.end_len > 0;
|
||||
}
|
||||
|
||||
fn sendWithoutAutoFlusher(this: *@This(), buf: []const u8) bool {
|
||||
bun.assert(!this.done);
|
||||
defer log("send: {d} bytes (backpressure: {any})", .{ buf.len, this.has_backpressure });
|
||||
@@ -1997,29 +1996,29 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
this.handleFirstWriteIfNecessary();
|
||||
const success = this.res.tryEnd(buf, this.end_len, false);
|
||||
this.has_backpressure = !success;
|
||||
if (this.has_backpressure) {
|
||||
this.res.onWritable(*@This(), onWritable, this);
|
||||
}
|
||||
return success;
|
||||
}
|
||||
// clean this so we know when its relevant or not
|
||||
this.end_len = 0;
|
||||
// we clear the onWritable handler so uWS can handle the backpressure for us
|
||||
this.res.clearOnWritable();
|
||||
this.handleFirstWriteIfNecessary();
|
||||
|
||||
// uWebSockets lacks a tryWrite() function
|
||||
// This means that backpressure will be handled by appending to an "infinite" memory buffer
|
||||
// It will do the backpressure handling for us
|
||||
// so in this scenario, we just append to the buffer
|
||||
// and report success
|
||||
if (this.requested_end) {
|
||||
this.handleFirstWriteIfNecessary();
|
||||
this.res.end(buf, false);
|
||||
this.has_backpressure = false;
|
||||
return true;
|
||||
} else {
|
||||
this.handleFirstWriteIfNecessary();
|
||||
this.has_backpressure = !this.res.write(buf);
|
||||
if (this.has_backpressure) {
|
||||
this.res.onWritable(*@This(), onWritable, this);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
return true;
|
||||
unreachable;
|
||||
}
|
||||
|
||||
fn send(this: *@This(), buf: []const u8) bool {
|
||||
@@ -2028,52 +2027,39 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
}
|
||||
|
||||
fn readableSlice(this: *@This()) []const u8 {
|
||||
return this.buffer.ptr[this.offset..this.buffer.len];
|
||||
return this.buffer.ptr[this.offset..this.buffer.cap][0..this.buffer.len];
|
||||
}
|
||||
|
||||
pub fn onWritable(this: *@This(), write_offset: u64, _: *UWSResponse) callconv(.C) bool {
|
||||
// write_offset is the amount of data that was written not how much we need to write
|
||||
pub fn onWritable(this: *@This(), write_offset_: u64, _: *UWSResponse) callconv(.C) bool {
|
||||
const write_offset: u64 = @as(u64, write_offset_);
|
||||
log("onWritable ({d})", .{write_offset});
|
||||
// onWritable reset backpressure state to allow flushing
|
||||
this.has_backpressure = false;
|
||||
if (this.aborted) {
|
||||
this.res.clearOnWritable();
|
||||
this.signal.close(null);
|
||||
this.flushPromise();
|
||||
|
||||
if (this.done) {
|
||||
if (this.aborted == false) {
|
||||
this.res.endStream(false);
|
||||
}
|
||||
this.finalize();
|
||||
return false;
|
||||
}
|
||||
var total_written: u64 = 0;
|
||||
|
||||
// do not write more than available
|
||||
// if we do, it will cause this to be delayed until the next call, each time
|
||||
// TODO: should we break it in smaller chunks?
|
||||
const to_write = @min(@as(Blob.SizeType, @truncate(write_offset)), @as(Blob.SizeType, this.buffer.len - 1));
|
||||
const chunk = this.readableSlice()[to_write..];
|
||||
// if we have nothing to write, we are done
|
||||
if (chunk.len == 0) {
|
||||
if (this.done) {
|
||||
this.res.clearOnWritable();
|
||||
this.signal.close(null);
|
||||
this.flushPromise();
|
||||
this.finalize();
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
if (!this.send(chunk)) {
|
||||
// if we were unable to send it, retry
|
||||
return false;
|
||||
}
|
||||
this.handleWrote(@as(Blob.SizeType, @truncate(chunk.len)));
|
||||
total_written = chunk.len;
|
||||
const to_write = @min(@as(Blob.SizeType, @truncate(write_offset)), @as(Blob.SizeType, this.buffer.len));
|
||||
|
||||
if (this.requested_end) {
|
||||
this.res.clearOnWritable();
|
||||
this.signal.close(null);
|
||||
this.flushPromise();
|
||||
this.finalize();
|
||||
return true;
|
||||
}
|
||||
// figure out how much data exactly to write
|
||||
const readable = this.readableSlice()[0..to_write];
|
||||
if (!this.send(readable)) {
|
||||
// if we were unable to send it, retry
|
||||
this.res.onWritable(*@This(), onWritable, this);
|
||||
return true;
|
||||
}
|
||||
|
||||
this.handleWrote(@as(Blob.SizeType, @truncate(readable.len)));
|
||||
const initial_wrote = this.wrote;
|
||||
|
||||
if (this.buffer.len > 0 and !this.done) {
|
||||
this.res.onWritable(*@This(), onWritable, this);
|
||||
return true;
|
||||
}
|
||||
|
||||
// flush the javascript promise from calling .flush()
|
||||
@@ -2082,13 +2068,16 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
// pending_flush or callback could have caused another send()
|
||||
// so we check again if we should report readiness
|
||||
if (!this.done and !this.requested_end and !this.hasBackpressure()) {
|
||||
// no pending and total_written > 0
|
||||
if (total_written > 0 and this.readableSlice().len == 0) {
|
||||
this.signal.ready(@as(Blob.SizeType, @truncate(total_written)), null);
|
||||
const pending = @as(Blob.SizeType, @truncate(write_offset)) -| to_write;
|
||||
const written_after_flush = this.wrote - initial_wrote;
|
||||
const to_report = pending - @min(written_after_flush, pending);
|
||||
|
||||
if ((written_after_flush == initial_wrote and pending == 0) or to_report > 0) {
|
||||
this.signal.ready(to_report, null);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
pub fn start(this: *@This(), stream_start: StreamStart) JSC.Maybe(void) {
|
||||
@@ -2139,7 +2128,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
|
||||
fn flushFromJSNoWait(this: *@This()) JSC.Maybe(JSValue) {
|
||||
log("flushFromJSNoWait", .{});
|
||||
if (this.hasBackpressureAndIsTryEnd() or this.done) {
|
||||
if (this.hasBackpressure() or this.done) {
|
||||
return .{ .result = JSValue.jsNumberFromInt32(0) };
|
||||
}
|
||||
|
||||
@@ -2173,7 +2162,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
return .{ .result = JSC.JSPromise.resolvedPromiseValue(globalThis, JSValue.jsNumberFromInt32(0)) };
|
||||
}
|
||||
|
||||
if (!this.hasBackpressureAndIsTryEnd()) {
|
||||
if (!this.hasBackpressure()) {
|
||||
const slice = this.readableSlice();
|
||||
assert(slice.len > 0);
|
||||
const success = this.send(slice);
|
||||
@@ -2181,6 +2170,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
this.handleWrote(@as(Blob.SizeType, @truncate(slice.len)));
|
||||
return .{ .result = JSC.JSPromise.resolvedPromiseValue(globalThis, JSValue.jsNumber(slice.len)) };
|
||||
}
|
||||
|
||||
this.res.onWritable(*@This(), onWritable, this);
|
||||
}
|
||||
this.wrote_at_start_of_flush = this.wrote;
|
||||
this.pending_flush = JSC.JSPromise.create(globalThis);
|
||||
@@ -2228,8 +2219,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
_ = this.buffer.write(this.allocator, bytes) catch {
|
||||
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
|
||||
};
|
||||
this.registerAutoFlusher();
|
||||
} else if (this.buffer.len + len >= this.highWaterMark) {
|
||||
|
||||
// TODO: attempt to write both in a corked buffer?
|
||||
_ = this.buffer.write(this.allocator, bytes) catch {
|
||||
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
|
||||
@@ -2237,16 +2228,21 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
const slice = this.readableSlice();
|
||||
if (this.send(slice)) {
|
||||
this.handleWrote(slice.len);
|
||||
this.buffer.len = 0;
|
||||
return .{ .owned = len };
|
||||
}
|
||||
} else {
|
||||
// queue the data wait until highWaterMark is reached or the auto flusher kicks in
|
||||
// queue the data
|
||||
// do not send it
|
||||
_ = this.buffer.write(this.allocator, bytes) catch {
|
||||
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
|
||||
};
|
||||
this.registerAutoFlusher();
|
||||
return .{ .owned = len };
|
||||
}
|
||||
|
||||
this.registerAutoFlusher();
|
||||
this.res.onWritable(*@This(), onWritable, this);
|
||||
|
||||
return .{ .owned = len };
|
||||
}
|
||||
@@ -2306,9 +2302,12 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
_ = this.buffer.writeLatin1(this.allocator, bytes) catch {
|
||||
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
|
||||
};
|
||||
this.registerAutoFlusher();
|
||||
return .{ .owned = len };
|
||||
}
|
||||
|
||||
this.registerAutoFlusher();
|
||||
this.res.onWritable(*@This(), onWritable, this);
|
||||
|
||||
return .{ .owned = len };
|
||||
}
|
||||
@@ -2334,11 +2333,14 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
};
|
||||
|
||||
const readable = this.readableSlice();
|
||||
|
||||
if (readable.len >= this.highWaterMark or this.hasBackpressure()) {
|
||||
if (this.send(readable)) {
|
||||
this.handleWrote(readable.len);
|
||||
return .{ .owned = @as(Blob.SizeType, @intCast(written)) };
|
||||
}
|
||||
|
||||
this.res.onWritable(*@This(), onWritable, this);
|
||||
}
|
||||
|
||||
this.registerAutoFlusher();
|
||||
@@ -2377,6 +2379,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
this.finalize();
|
||||
return .{ .result = {} };
|
||||
}
|
||||
|
||||
return .{ .result = {} };
|
||||
}
|
||||
|
||||
@@ -2429,7 +2432,6 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
this.unregisterAutoFlusher();
|
||||
|
||||
this.aborted = true;
|
||||
|
||||
this.signal.close(null);
|
||||
|
||||
this.flushPromise();
|
||||
@@ -2455,13 +2457,14 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
|
||||
const readable = this.readableSlice();
|
||||
|
||||
if ((this.hasBackpressureAndIsTryEnd()) or readable.len == 0) {
|
||||
if (this.hasBackpressure() or readable.len == 0) {
|
||||
this.auto_flusher.registered = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!this.sendWithoutAutoFlusher(readable)) {
|
||||
this.auto_flusher.registered = true;
|
||||
this.res.onWritable(*@This(), onWritable, this);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -2479,6 +2482,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
}
|
||||
|
||||
this.unregisterAutoFlusher();
|
||||
|
||||
this.allocator.destroy(this);
|
||||
}
|
||||
|
||||
@@ -2490,7 +2494,6 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
if (!this.done) {
|
||||
this.done = true;
|
||||
this.unregisterAutoFlusher();
|
||||
this.res.clearOnWritable();
|
||||
this.res.endStream(false);
|
||||
}
|
||||
|
||||
|
||||
@@ -1238,16 +1238,6 @@ extern "C"
|
||||
{ return handler(res, a, opcional_data); });
|
||||
}
|
||||
}
|
||||
|
||||
void uws_res_clear_on_writable(int ssl, uws_res_t *res) {
|
||||
if (ssl) {
|
||||
uWS::HttpResponse<true> *uwsRes = (uWS::HttpResponse<true> *)res;
|
||||
uwsRes->clearOnWritable();
|
||||
} else {
|
||||
uWS::HttpResponse<false> *uwsRes = (uWS::HttpResponse<false> *)res;
|
||||
uwsRes->clearOnWritable();
|
||||
}
|
||||
}
|
||||
|
||||
void uws_res_on_aborted(int ssl, uws_res_t *res,
|
||||
void (*handler)(uws_res_t *res, void *opcional_data),
|
||||
|
||||
@@ -2036,10 +2036,6 @@ pub fn NewApp(comptime ssl: bool) type {
|
||||
};
|
||||
uws_res_on_writable(ssl_flag, res.downcast(), Wrapper.handle, user_data);
|
||||
}
|
||||
|
||||
pub fn clearOnWritable(res: *Response) void {
|
||||
uws_res_clear_on_writable(ssl_flag, res.downcast());
|
||||
}
|
||||
pub inline fn markNeedsMore(res: *Response) void {
|
||||
if (!ssl) {
|
||||
us_socket_mark_needs_more_not_ssl(res.downcast());
|
||||
@@ -2382,7 +2378,6 @@ extern fn uws_res_get_write_offset(ssl: i32, res: *uws_res) u64;
|
||||
extern fn uws_res_override_write_offset(ssl: i32, res: *uws_res, u64) void;
|
||||
extern fn uws_res_has_responded(ssl: i32, res: *uws_res) bool;
|
||||
extern fn uws_res_on_writable(ssl: i32, res: *uws_res, handler: ?*const fn (*uws_res, u64, ?*anyopaque) callconv(.C) bool, user_data: ?*anyopaque) void;
|
||||
extern fn uws_res_clear_on_writable(ssl: i32, res: *uws_res) void;
|
||||
extern fn uws_res_on_aborted(ssl: i32, res: *uws_res, handler: ?*const fn (*uws_res, ?*anyopaque) callconv(.C) void, opcional_data: ?*anyopaque) void;
|
||||
extern fn uws_res_on_data(
|
||||
ssl: i32,
|
||||
|
||||
Reference in New Issue
Block a user