fix(sql) disable idle timeout when still processing data (#16984)

This commit is contained in:
Ciro Spaciari
2025-02-02 21:27:22 -08:00
committed by GitHub
parent cc68f4f025
commit 00a5c4af5a
2 changed files with 96 additions and 85 deletions

View File

@@ -330,7 +330,6 @@ pub const PostgresSQLQuery = struct {
} = .{},
pub usingnamespace JSC.Codegen.JSPostgresSQLQuery;
const log = bun.Output.scoped(.PostgresSQLQuery, false);
pub fn getTarget(this: *PostgresSQLQuery, globalObject: *JSC.JSGlobalObject) JSC.JSValue {
const thisValue = this.thisValue.get();
if (thisValue == .zero) {
@@ -700,7 +699,7 @@ pub const PostgresSQLQuery = struct {
};
const has_params = signature.fields.len > 0;
var reset_timeout = false;
var did_write = false;
enqueue: {
if (entry.found_existing) {
this.statement = entry.value_ptr.*;
@@ -715,7 +714,7 @@ pub const PostgresSQLQuery = struct {
.prepared => {
if (!connection.hasQueryRunning()) {
this.flags.binary = this.statement.?.fields.len > 0;
log("bindAndExecute", .{});
debug("bindAndExecute", .{});
// bindAndExecute will bind + execute, it will change to running after binding is complete
PostgresRequest.bindAndExecute(globalObject, this.statement.?, binding_value, columns_value, PostgresSQLConnection.Writer, writer) catch |err| {
if (!globalObject.hasException())
@@ -724,7 +723,7 @@ pub const PostgresSQLQuery = struct {
};
this.status = .binding;
reset_timeout = true;
did_write = true;
}
},
.parsing, .pending => {},
@@ -738,7 +737,7 @@ pub const PostgresSQLQuery = struct {
if (can_execute) {
// If it does not have params, we can write and execute immediately in one go
if (!has_params) {
log("prepareAndQueryWithSignature", .{});
debug("prepareAndQueryWithSignature", .{});
// prepareAndQueryWithSignature will write + bind + execute, it will change to running after binding is complete
PostgresRequest.prepareAndQueryWithSignature(globalObject, query_str.slice(), binding_value, PostgresSQLConnection.Writer, writer, &signature) catch |err| {
signature.deinit();
@@ -747,9 +746,9 @@ pub const PostgresSQLQuery = struct {
return error.JSError;
};
this.status = .binding;
reset_timeout = true;
did_write = true;
} else {
log("writeQuery", .{});
debug("writeQuery", .{});
PostgresRequest.writeQuery(query_str.slice(), signature.prepared_statement_name, signature.fields, PostgresSQLConnection.Writer, writer) catch |err| {
signature.deinit();
if (!globalObject.hasException())
@@ -762,7 +761,7 @@ pub const PostgresSQLQuery = struct {
return globalObject.throwValue(postgresErrorToJS(globalObject, "failed to flush", err));
return error.JSError;
};
reset_timeout = true;
did_write = true;
}
}
{
@@ -781,12 +780,11 @@ pub const PostgresSQLQuery = struct {
this.thisValue.upgrade(globalObject);
PostgresSQLQuery.targetSetCached(this_value, globalObject, query);
if (connection.is_ready_for_query)
connection.flushDataAndResetTimeout()
else if (reset_timeout)
if (did_write) {
connection.flushDataAndResetTimeout();
} else {
connection.resetConnectionTimeout();
}
return .undefined;
}
@@ -1046,8 +1044,9 @@ pub const PostgresRequest = struct {
) !void {
while (true) {
reader.markMessageStart();
switch (try reader.int(u8)) {
const c = try reader.int(u8);
debug("read: {c}", .{c});
switch (c) {
'D' => try connection.on(.DataRow, Context, reader),
'd' => try connection.on(.CopyData, Context, reader),
'S' => {
@@ -1091,9 +1090,10 @@ pub const PostgresRequest = struct {
'c' => try connection.on(.CopyDone, Context, reader),
'W' => try connection.on(.CopyBothResponse, Context, reader),
else => |c| {
debug("Unknown message: {d}", .{c});
else => {
debug("Unknown message: {c}", .{c});
const to_skip = try reader.length() -| 1;
debug("to_skip: {d}", .{to_skip});
try reader.skip(@intCast(@max(to_skip, 0)));
},
}
@@ -1121,13 +1121,9 @@ pub const PostgresSQLConnection = struct {
pending_activity_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(0),
js_value: JSValue = JSValue.undefined,
is_ready_for_query: bool = false,
backend_parameters: bun.StringMap = bun.StringMap.init(bun.default_allocator, true),
backend_key_data: protocol.BackendKeyData = .{},
pending_disconnect: bool = false,
database: []const u8 = "",
user: []const u8 = "",
password: []const u8 = "",
@@ -1144,6 +1140,8 @@ pub const PostgresSQLConnection = struct {
idle_timeout_interval_ms: u32 = 0,
connection_timeout_ms: u32 = 0,
flags: ConnectionFlags = .{},
/// Before being connected, this is a connection timeout timer.
/// After being connected, this is an idle timeout timer.
timer: JSC.BunTimer.EventLoopTimer = .{
@@ -1166,6 +1164,11 @@ pub const PostgresSQLConnection = struct {
},
},
pub const ConnectionFlags = packed struct {
is_ready_for_query: bool = false,
is_processing_data: bool = false,
};
pub const TLSStatus = union(enum) {
none,
pending,
@@ -1302,8 +1305,15 @@ pub const PostgresSQLConnection = struct {
else => this.connection_timeout_ms,
};
}
pub fn disableConnectionTimeout(this: *PostgresSQLConnection) void {
if (this.timer.state == .ACTIVE) {
this.globalObject.bunVM().timer.remove(&this.timer);
}
this.timer.state = .CANCELLED;
}
pub fn resetConnectionTimeout(this: *PostgresSQLConnection) void {
// if we are processing data, don't reset the timeout, wait for the data to be processed
if (this.flags.is_processing_data) return;
const interval = this.getTimeoutInterval();
if (this.timer.state == .ACTIVE) {
this.globalObject.bunVM().timer.remove(&this.timer);
@@ -1379,7 +1389,12 @@ pub const PostgresSQLConnection = struct {
pub fn onConnectionTimeout(this: *PostgresSQLConnection) JSC.BunTimer.EventLoopTimer.Arm {
debug("onConnectionTimeout", .{});
this.timer.state = .FIRED;
if (this.flags.is_processing_data) {
return .disarm;
}
if (this.getTimeoutInterval() == 0) {
this.resetConnectionTimeout();
return .disarm;
@@ -1429,9 +1444,8 @@ pub const PostgresSQLConnection = struct {
}
pub fn setStatus(this: *PostgresSQLConnection, status: Status) void {
defer this.updateHasPendingActivity();
if (this.status == status) return;
defer this.updateHasPendingActivity();
this.status = status;
this.resetConnectionTimeout();
@@ -1443,7 +1457,6 @@ pub const PostgresSQLConnection = struct {
js_value.ensureStillAlive();
this.globalObject.queueMicrotask(on_connect, &[_]JSValue{ JSValue.jsNull(), js_value });
this.poll_ref.unref(this.globalObject.bunVM());
this.updateHasPendingActivity();
},
else => {},
}
@@ -1630,16 +1643,21 @@ pub const PostgresSQLConnection = struct {
pub fn onData(this: *PostgresSQLConnection, data: []const u8) void {
this.ref();
this.flags.is_processing_data = true;
const vm = this.globalObject.bunVM();
this.disableConnectionTimeout();
defer {
if (this.status == .connected and this.requests.readableLength() == 0 and this.write_buffer.remaining().len == 0) {
if (this.status == .connected and !this.hasQueryRunning() and this.write_buffer.remaining().len == 0) {
// Don't keep the process alive when there's nothing to do.
this.poll_ref.unref(vm);
} else if (this.status == .connected) {
// Keep the process alive if there's something to do.
this.poll_ref.ref(vm);
}
this.flags.is_processing_data = false;
// reset the connection timeout after we're done processing the data
this.resetConnectionTimeout();
this.deref();
}
@@ -1648,6 +1666,9 @@ pub const PostgresSQLConnection = struct {
event_loop.enter();
defer event_loop.exit();
SocketMonitor.read(data);
// reset the head to the last message so remaining reflects the right amount of bytes
this.read_buffer.head = this.last_message_start;
if (this.read_buffer.remaining().len == 0) {
var consumed: usize = 0;
var offset: usize = 0;
@@ -1655,20 +1676,11 @@ pub const PostgresSQLConnection = struct {
PostgresRequest.onData(this, protocol.StackReader, reader) catch |err| {
if (err == error.ShortRead) {
if (comptime bun.Environment.allow_assert) {
// if (@errorReturnTrace()) |trace| {
// debug("Received short read: last_message_start: {d}, head: {d}, len: {d}\n{}", .{
// offset,
// consumed,
// data.len,
// trace,
// });
// } else {
debug("Received short read: last_message_start: {d}, head: {d}, len: {d}", .{
debug("read_buffer: empty and received short read: last_message_start: {d}, head: {d}, len: {d}", .{
offset,
consumed,
data.len,
});
// }
}
this.read_buffer.head = 0;
@@ -1681,42 +1693,32 @@ pub const PostgresSQLConnection = struct {
this.fail("Failed to read data", err);
}
};
// no need to reset anything, its already empty
return;
}
{
this.read_buffer.head = this.last_message_start;
this.read_buffer.write(bun.default_allocator, data) catch @panic("failed to write to read buffer");
PostgresRequest.onData(this, Reader, this.bufferedReader()) catch |err| {
if (err != error.ShortRead) {
bun.handleErrorReturnTrace(err, @errorReturnTrace());
this.fail("Failed to read data", err);
return;
}
if (comptime bun.Environment.allow_assert) {
// if (@errorReturnTrace()) |trace| {
// debug("Received short read: last_message_start: {d}, head: {d}, len: {d}\n{}", .{
// this.last_message_start,
// this.read_buffer.head,
// this.read_buffer.byte_list.len,
// trace,
// });
// } else {
debug("Received short read: last_message_start: {d}, head: {d}, len: {d}", .{
this.last_message_start,
this.read_buffer.head,
this.read_buffer.byte_list.len,
});
// }
}
// read buffer is not empty, so we need to write the data to the buffer and then read it
this.read_buffer.write(bun.default_allocator, data) catch @panic("failed to write to read buffer");
PostgresRequest.onData(this, Reader, this.bufferedReader()) catch |err| {
if (err != error.ShortRead) {
bun.handleErrorReturnTrace(err, @errorReturnTrace());
this.fail("Failed to read data", err);
return;
};
}
this.last_message_start = 0;
this.read_buffer.head = 0;
}
if (comptime bun.Environment.allow_assert) {
debug("read_buffer: not empty and received short read: last_message_start: {d}, head: {d}, len: {d}", .{
this.last_message_start,
this.read_buffer.head,
this.read_buffer.byte_list.len,
});
}
return;
};
debug("clean read_buffer", .{});
// success, we read everything! let's reset the last message start and the head
this.last_message_start = 0;
this.read_buffer.head = 0;
}
pub fn constructor(globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!*PostgresSQLConnection {
@@ -2065,7 +2067,7 @@ pub const PostgresSQLConnection = struct {
}
fn hasQueryRunning(this: *PostgresSQLConnection) bool {
return !this.is_ready_for_query or this.current() != null;
return !this.flags.is_ready_for_query or this.current() != null;
}
pub const Writer = struct {
@@ -2550,9 +2552,6 @@ pub const PostgresSQLConnection = struct {
};
fn advance(this: *PostgresSQLConnection) !void {
defer this.updateRef();
var any = false;
defer if (any) this.resetConnectionTimeout();
while (this.requests.readableLength() > 0) {
var req: *PostgresSQLQuery = this.requests.peekItem(0);
switch (req.status) {
@@ -2566,7 +2565,6 @@ pub const PostgresSQLConnection = struct {
req.deref();
this.requests.discard(1);
any = true;
continue;
},
.prepared => {
@@ -2584,7 +2582,6 @@ pub const PostgresSQLConnection = struct {
continue;
};
req.status = .binding;
any = true;
return;
},
.pending => {
@@ -2611,7 +2608,6 @@ pub const PostgresSQLConnection = struct {
req.status = .binding;
stmt.status = .parsing;
any = true;
return;
}
const connection_writer = this.writer();
@@ -2637,7 +2633,6 @@ pub const PostgresSQLConnection = struct {
continue;
};
stmt.status = .parsing;
any = true;
return;
},
.parsing => {
@@ -2655,7 +2650,6 @@ pub const PostgresSQLConnection = struct {
.success, .fail => {
req.deref();
this.requests.discard(1);
any = true;
continue;
},
}
@@ -2669,7 +2663,7 @@ pub const PostgresSQLConnection = struct {
pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_literal), comptime Context: type, reader: protocol.NewReader(Context)) AnyPostgresError!void {
debug("on({s})", .{@tagName(MessageType)});
if (comptime MessageType != .ReadyForQuery) {
this.is_ready_for_query = false;
this.flags.is_ready_for_query = false;
}
switch (comptime MessageType) {
@@ -2757,13 +2751,8 @@ pub const PostgresSQLConnection = struct {
var ready_for_query: protocol.ReadyForQuery = undefined;
try ready_for_query.decodeInternal(Context, reader);
if (this.pending_disconnect) {
this.disconnect();
return;
}
this.setStatus(.connected);
this.is_ready_for_query = true;
this.flags.is_ready_for_query = true;
this.socket.setTimeout(300);
try this.advance();