From 00a5c4af5a36b58617348d0a574df92032284db9 Mon Sep 17 00:00:00 2001 From: Ciro Spaciari Date: Sun, 2 Feb 2025 21:27:22 -0800 Subject: [PATCH] fix(sql) disable idle timeout when still processing data (#16984) --- src/sql/postgres.zig | 157 +++++++++++++++++++--------------------- test/js/sql/sql.test.ts | 24 +++++- 2 files changed, 96 insertions(+), 85 deletions(-) diff --git a/src/sql/postgres.zig b/src/sql/postgres.zig index cacf849657..a48858f7c0 100644 --- a/src/sql/postgres.zig +++ b/src/sql/postgres.zig @@ -330,7 +330,6 @@ pub const PostgresSQLQuery = struct { } = .{}, pub usingnamespace JSC.Codegen.JSPostgresSQLQuery; - const log = bun.Output.scoped(.PostgresSQLQuery, false); pub fn getTarget(this: *PostgresSQLQuery, globalObject: *JSC.JSGlobalObject) JSC.JSValue { const thisValue = this.thisValue.get(); if (thisValue == .zero) { @@ -700,7 +699,7 @@ pub const PostgresSQLQuery = struct { }; const has_params = signature.fields.len > 0; - var reset_timeout = false; + var did_write = false; enqueue: { if (entry.found_existing) { this.statement = entry.value_ptr.*; @@ -715,7 +714,7 @@ pub const PostgresSQLQuery = struct { .prepared => { if (!connection.hasQueryRunning()) { this.flags.binary = this.statement.?.fields.len > 0; - log("bindAndExecute", .{}); + debug("bindAndExecute", .{}); // bindAndExecute will bind + execute, it will change to running after binding is complete PostgresRequest.bindAndExecute(globalObject, this.statement.?, binding_value, columns_value, PostgresSQLConnection.Writer, writer) catch |err| { if (!globalObject.hasException()) @@ -724,7 +723,7 @@ pub const PostgresSQLQuery = struct { }; this.status = .binding; - reset_timeout = true; + did_write = true; } }, .parsing, .pending => {}, @@ -738,7 +737,7 @@ pub const PostgresSQLQuery = struct { if (can_execute) { // If it does not have params, we can write and execute immediately in one go if (!has_params) { - log("prepareAndQueryWithSignature", .{}); + debug("prepareAndQueryWithSignature", .{}); // prepareAndQueryWithSignature will write + bind + execute, it will change to running after binding is complete PostgresRequest.prepareAndQueryWithSignature(globalObject, query_str.slice(), binding_value, PostgresSQLConnection.Writer, writer, &signature) catch |err| { signature.deinit(); @@ -747,9 +746,9 @@ pub const PostgresSQLQuery = struct { return error.JSError; }; this.status = .binding; - reset_timeout = true; + did_write = true; } else { - log("writeQuery", .{}); + debug("writeQuery", .{}); PostgresRequest.writeQuery(query_str.slice(), signature.prepared_statement_name, signature.fields, PostgresSQLConnection.Writer, writer) catch |err| { signature.deinit(); if (!globalObject.hasException()) @@ -762,7 +761,7 @@ pub const PostgresSQLQuery = struct { return globalObject.throwValue(postgresErrorToJS(globalObject, "failed to flush", err)); return error.JSError; }; - reset_timeout = true; + did_write = true; } } { @@ -781,12 +780,11 @@ pub const PostgresSQLQuery = struct { this.thisValue.upgrade(globalObject); PostgresSQLQuery.targetSetCached(this_value, globalObject, query); - - if (connection.is_ready_for_query) - connection.flushDataAndResetTimeout() - else if (reset_timeout) + if (did_write) { + connection.flushDataAndResetTimeout(); + } else { connection.resetConnectionTimeout(); - + } return .undefined; } @@ -1046,8 +1044,9 @@ pub const PostgresRequest = struct { ) !void { while (true) { reader.markMessageStart(); - - switch (try reader.int(u8)) { + const c = try reader.int(u8); + debug("read: {c}", .{c}); + switch (c) { 'D' => try connection.on(.DataRow, Context, reader), 'd' => try connection.on(.CopyData, Context, reader), 'S' => { @@ -1091,9 +1090,10 @@ pub const PostgresRequest = struct { 'c' => try connection.on(.CopyDone, Context, reader), 'W' => try connection.on(.CopyBothResponse, Context, reader), - else => |c| { - debug("Unknown message: {d}", .{c}); + else => { + debug("Unknown message: {c}", .{c}); const to_skip = try reader.length() -| 1; + debug("to_skip: {d}", .{to_skip}); try reader.skip(@intCast(@max(to_skip, 0))); }, } @@ -1121,13 +1121,9 @@ pub const PostgresSQLConnection = struct { pending_activity_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), js_value: JSValue = JSValue.undefined, - is_ready_for_query: bool = false, - backend_parameters: bun.StringMap = bun.StringMap.init(bun.default_allocator, true), backend_key_data: protocol.BackendKeyData = .{}, - pending_disconnect: bool = false, - database: []const u8 = "", user: []const u8 = "", password: []const u8 = "", @@ -1144,6 +1140,8 @@ pub const PostgresSQLConnection = struct { idle_timeout_interval_ms: u32 = 0, connection_timeout_ms: u32 = 0, + flags: ConnectionFlags = .{}, + /// Before being connected, this is a connection timeout timer. /// After being connected, this is an idle timeout timer. timer: JSC.BunTimer.EventLoopTimer = .{ @@ -1166,6 +1164,11 @@ pub const PostgresSQLConnection = struct { }, }, + pub const ConnectionFlags = packed struct { + is_ready_for_query: bool = false, + is_processing_data: bool = false, + }; + pub const TLSStatus = union(enum) { none, pending, @@ -1302,8 +1305,15 @@ pub const PostgresSQLConnection = struct { else => this.connection_timeout_ms, }; } - + pub fn disableConnectionTimeout(this: *PostgresSQLConnection) void { + if (this.timer.state == .ACTIVE) { + this.globalObject.bunVM().timer.remove(&this.timer); + } + this.timer.state = .CANCELLED; + } pub fn resetConnectionTimeout(this: *PostgresSQLConnection) void { + // if we are processing data, don't reset the timeout, wait for the data to be processed + if (this.flags.is_processing_data) return; const interval = this.getTimeoutInterval(); if (this.timer.state == .ACTIVE) { this.globalObject.bunVM().timer.remove(&this.timer); @@ -1379,7 +1389,12 @@ pub const PostgresSQLConnection = struct { pub fn onConnectionTimeout(this: *PostgresSQLConnection) JSC.BunTimer.EventLoopTimer.Arm { debug("onConnectionTimeout", .{}); + this.timer.state = .FIRED; + if (this.flags.is_processing_data) { + return .disarm; + } + if (this.getTimeoutInterval() == 0) { this.resetConnectionTimeout(); return .disarm; @@ -1429,9 +1444,8 @@ pub const PostgresSQLConnection = struct { } pub fn setStatus(this: *PostgresSQLConnection, status: Status) void { - defer this.updateHasPendingActivity(); - if (this.status == status) return; + defer this.updateHasPendingActivity(); this.status = status; this.resetConnectionTimeout(); @@ -1443,7 +1457,6 @@ pub const PostgresSQLConnection = struct { js_value.ensureStillAlive(); this.globalObject.queueMicrotask(on_connect, &[_]JSValue{ JSValue.jsNull(), js_value }); this.poll_ref.unref(this.globalObject.bunVM()); - this.updateHasPendingActivity(); }, else => {}, } @@ -1630,16 +1643,21 @@ pub const PostgresSQLConnection = struct { pub fn onData(this: *PostgresSQLConnection, data: []const u8) void { this.ref(); + this.flags.is_processing_data = true; const vm = this.globalObject.bunVM(); + + this.disableConnectionTimeout(); defer { - if (this.status == .connected and this.requests.readableLength() == 0 and this.write_buffer.remaining().len == 0) { + if (this.status == .connected and !this.hasQueryRunning() and this.write_buffer.remaining().len == 0) { // Don't keep the process alive when there's nothing to do. this.poll_ref.unref(vm); } else if (this.status == .connected) { // Keep the process alive if there's something to do. this.poll_ref.ref(vm); } + this.flags.is_processing_data = false; + // reset the connection timeout after we're done processing the data this.resetConnectionTimeout(); this.deref(); } @@ -1648,6 +1666,9 @@ pub const PostgresSQLConnection = struct { event_loop.enter(); defer event_loop.exit(); SocketMonitor.read(data); + // reset the head to the last message so remaining reflects the right amount of bytes + this.read_buffer.head = this.last_message_start; + if (this.read_buffer.remaining().len == 0) { var consumed: usize = 0; var offset: usize = 0; @@ -1655,20 +1676,11 @@ pub const PostgresSQLConnection = struct { PostgresRequest.onData(this, protocol.StackReader, reader) catch |err| { if (err == error.ShortRead) { if (comptime bun.Environment.allow_assert) { - // if (@errorReturnTrace()) |trace| { - // debug("Received short read: last_message_start: {d}, head: {d}, len: {d}\n{}", .{ - // offset, - // consumed, - // data.len, - // trace, - // }); - // } else { - debug("Received short read: last_message_start: {d}, head: {d}, len: {d}", .{ + debug("read_buffer: empty and received short read: last_message_start: {d}, head: {d}, len: {d}", .{ offset, consumed, data.len, }); - // } } this.read_buffer.head = 0; @@ -1681,42 +1693,32 @@ pub const PostgresSQLConnection = struct { this.fail("Failed to read data", err); } }; + // no need to reset anything, its already empty return; } - - { - this.read_buffer.head = this.last_message_start; - this.read_buffer.write(bun.default_allocator, data) catch @panic("failed to write to read buffer"); - PostgresRequest.onData(this, Reader, this.bufferedReader()) catch |err| { - if (err != error.ShortRead) { - bun.handleErrorReturnTrace(err, @errorReturnTrace()); - this.fail("Failed to read data", err); - return; - } - - if (comptime bun.Environment.allow_assert) { - // if (@errorReturnTrace()) |trace| { - // debug("Received short read: last_message_start: {d}, head: {d}, len: {d}\n{}", .{ - // this.last_message_start, - // this.read_buffer.head, - // this.read_buffer.byte_list.len, - // trace, - // }); - // } else { - debug("Received short read: last_message_start: {d}, head: {d}, len: {d}", .{ - this.last_message_start, - this.read_buffer.head, - this.read_buffer.byte_list.len, - }); - // } - } - + // read buffer is not empty, so we need to write the data to the buffer and then read it + this.read_buffer.write(bun.default_allocator, data) catch @panic("failed to write to read buffer"); + PostgresRequest.onData(this, Reader, this.bufferedReader()) catch |err| { + if (err != error.ShortRead) { + bun.handleErrorReturnTrace(err, @errorReturnTrace()); + this.fail("Failed to read data", err); return; - }; + } - this.last_message_start = 0; - this.read_buffer.head = 0; - } + if (comptime bun.Environment.allow_assert) { + debug("read_buffer: not empty and received short read: last_message_start: {d}, head: {d}, len: {d}", .{ + this.last_message_start, + this.read_buffer.head, + this.read_buffer.byte_list.len, + }); + } + return; + }; + + debug("clean read_buffer", .{}); + // success, we read everything! let's reset the last message start and the head + this.last_message_start = 0; + this.read_buffer.head = 0; } pub fn constructor(globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!*PostgresSQLConnection { @@ -2065,7 +2067,7 @@ pub const PostgresSQLConnection = struct { } fn hasQueryRunning(this: *PostgresSQLConnection) bool { - return !this.is_ready_for_query or this.current() != null; + return !this.flags.is_ready_for_query or this.current() != null; } pub const Writer = struct { @@ -2550,9 +2552,6 @@ pub const PostgresSQLConnection = struct { }; fn advance(this: *PostgresSQLConnection) !void { - defer this.updateRef(); - var any = false; - defer if (any) this.resetConnectionTimeout(); while (this.requests.readableLength() > 0) { var req: *PostgresSQLQuery = this.requests.peekItem(0); switch (req.status) { @@ -2566,7 +2565,6 @@ pub const PostgresSQLConnection = struct { req.deref(); this.requests.discard(1); - any = true; continue; }, .prepared => { @@ -2584,7 +2582,6 @@ pub const PostgresSQLConnection = struct { continue; }; req.status = .binding; - any = true; return; }, .pending => { @@ -2611,7 +2608,6 @@ pub const PostgresSQLConnection = struct { req.status = .binding; stmt.status = .parsing; - any = true; return; } const connection_writer = this.writer(); @@ -2637,7 +2633,6 @@ pub const PostgresSQLConnection = struct { continue; }; stmt.status = .parsing; - any = true; return; }, .parsing => { @@ -2655,7 +2650,6 @@ pub const PostgresSQLConnection = struct { .success, .fail => { req.deref(); this.requests.discard(1); - any = true; continue; }, } @@ -2669,7 +2663,7 @@ pub const PostgresSQLConnection = struct { pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_literal), comptime Context: type, reader: protocol.NewReader(Context)) AnyPostgresError!void { debug("on({s})", .{@tagName(MessageType)}); if (comptime MessageType != .ReadyForQuery) { - this.is_ready_for_query = false; + this.flags.is_ready_for_query = false; } switch (comptime MessageType) { @@ -2757,13 +2751,8 @@ pub const PostgresSQLConnection = struct { var ready_for_query: protocol.ReadyForQuery = undefined; try ready_for_query.decodeInternal(Context, reader); - if (this.pending_disconnect) { - this.disconnect(); - return; - } - this.setStatus(.connected); - this.is_ready_for_query = true; + this.flags.is_ready_for_query = true; this.socket.setTimeout(300); try this.advance(); diff --git a/test/js/sql/sql.test.ts b/test/js/sql/sql.test.ts index 42adec7416..84e86cca3c 100644 --- a/test/js/sql/sql.test.ts +++ b/test/js/sql/sql.test.ts @@ -1,4 +1,4 @@ -import { sql, SQL } from "bun"; +import { sql, SQL, randomUUIDv7 } from "bun"; const postgres = (...args) => new sql(...args); import { expect, test, mock, beforeAll, afterAll } from "bun:test"; import { $ } from "bun"; @@ -266,6 +266,28 @@ if (isDockerEnabled()) { expect(result).toEqual([{ x: 3 }]); }); + test("should not timeout in long results", async () => { + await using db = postgres({ ...options, max: 1, idleTimeout: 5 }); + using sql = await db.reserve(); + const random_name = "test_" + randomUUIDv7("hex").replaceAll("-", ""); + + await sql`CREATE TEMPORARY TABLE ${sql(random_name)} (id int, name text)`; + const promises: Promise[] = []; + for (let i = 0; i < 10_000; i++) { + promises.push(sql`INSERT INTO ${sql(random_name)} VALUES (${i}, ${"test" + i})`); + if (i % 50 === 0 && i > 0) { + await Promise.all(promises); + promises.length = 0; + } + } + await Promise.all(promises); + await sql`SELECT * FROM ${sql(random_name)}`; + await sql`SELECT * FROM ${sql(random_name)}`; + await sql`SELECT * FROM ${sql(random_name)}`; + + expect().pass(); + }, 10_000); + test("Handles numeric column names", async () => { // deliberately out of order const result = await sql`select 1 as "1", 2 as "2", 3 as "3", 0 as "0"`;