diff --git a/src/bun.js/api/postgres.classes.ts b/src/bun.js/api/postgres.classes.ts index 3e7e8706a2..a210706462 100644 --- a/src/bun.js/api/postgres.classes.ts +++ b/src/bun.js/api/postgres.classes.ts @@ -32,7 +32,6 @@ export default [ flush: { fn: "doFlush", }, - queries: { getter: "getQueries", this: true, diff --git a/src/js/bun/sql.ts b/src/js/bun/sql.ts index 84aa23211e..f4f92050cb 100644 --- a/src/js/bun/sql.ts +++ b/src/js/bun/sql.ts @@ -794,6 +794,7 @@ class PooledConnection { // @ts-ignore query.finally(onQueryFinish.bind(this, onClose)); } + #doRetry() { if (this.pool.closed) { return; @@ -855,6 +856,7 @@ class ConnectionPool { poolStarted: boolean = false; closed: boolean = false; + totalQueries: number = 0; onAllQueriesFinished: (() => void) | null = null; constructor(connectionInfo) { this.connectionInfo = connectionInfo; @@ -862,57 +864,48 @@ class ConnectionPool { this.readyConnections = new Set(); } + maxDistribution() { + if (!this.waitingQueue.length) return 0; + const result = Math.ceil((this.waitingQueue.length + this.totalQueries) / this.connections.length); + return result ? result : 1; + } + flushConcurrentQueries() { - if (this.waitingQueue.length === 0) { + const maxDistribution = this.maxDistribution(); + if (maxDistribution === 0) { return; } - while (this.waitingQueue.length > 0) { - let endReached = true; - // no need to filter for reserved connections because there are not in the readyConnections - // preReserved only shows that we wanna avoiding adding more queries to it + + while (true) { const nonReservedConnections = Array.from(this.readyConnections).filter( - c => !(c.flags & PooledConnectionFlags.preReserved), + c => !(c.flags & PooledConnectionFlags.preReserved) && c.queryCount < maxDistribution, ); if (nonReservedConnections.length === 0) { return; } - // kinda balance the load between connections const orderedConnections = nonReservedConnections.sort((a, b) => a.queryCount - b.queryCount); - const leastQueries = orderedConnections[0].queryCount; - for (const connection of orderedConnections) { - if (connection.queryCount > leastQueries) { - endReached = false; - break; - } - const pending = this.waitingQueue.shift(); - if (pending) { - connection.queryCount++; - pending(null, connection); + if (!pending) { + return; } + connection.queryCount++; + this.totalQueries++; + pending(null, connection); } - const halfPoolSize = Math.ceil(this.connections.length / 2); - if (endReached || orderedConnections.length < halfPoolSize) { - // we are able to distribute the load between connections but the connection pool is less than half of the pool size - // so we can stop here and wait for the next tick to flush the waiting queue - break; - } - } - if (this.waitingQueue.length > 0) { - // we still wanna to flush the waiting queue but lets wait for the next tick because some connections might be released - // this is better for query performance - process.nextTick(this.flushConcurrentQueries.bind(this)); } } release(connection: PooledConnection, connectingEvent: boolean = false) { if (!connectingEvent) { connection.queryCount--; + this.totalQueries--; + } + const currentQueryCount = connection.queryCount; + if (currentQueryCount == 0) { + connection.flags &= ~PooledConnectionFlags.reserved; + connection.flags &= ~PooledConnectionFlags.preReserved; } - const was_reserved = connection.flags & PooledConnectionFlags.reserved; - connection.flags &= ~PooledConnectionFlags.reserved; - connection.flags &= ~PooledConnectionFlags.preReserved; if (this.onAllQueriesFinished) { // we are waiting for all queries to finish, lets check if we can call it if (!this.hasPendingQueries()) { @@ -945,36 +938,19 @@ class ConnectionPool { return; } - if (was_reserved) { - if (this.waitingQueue.length > 0 || this.reservedQueue.length > 0) { - const pendingReserved = this.reservedQueue.shift(); - if (pendingReserved) { - connection.flags |= PooledConnectionFlags.reserved; - connection.queryCount++; - // we have a connection waiting for a reserved connection lets prioritize it - pendingReserved(connection.storedError, connection); - return; - } - } - - this.readyConnections.add(connection); - this.flushConcurrentQueries(); - return; - } - if (connection.queryCount === 0) { + if (currentQueryCount == 0) { // ok we can actually bind reserved queries to it const pendingReserved = this.reservedQueue.shift(); if (pendingReserved) { connection.flags |= PooledConnectionFlags.reserved; connection.queryCount++; + this.totalQueries++; // we have a connection waiting for a reserved connection lets prioritize it pendingReserved(connection.storedError, connection); return; } } - this.readyConnections.add(connection); - this.flushConcurrentQueries(); } @@ -992,16 +968,11 @@ class ConnectionPool { } return false; } + hasPendingQueries() { if (this.waitingQueue.length > 0 || this.reservedQueue.length > 0) return true; if (this.poolStarted) { - const pollSize = this.connections.length; - for (let i = 0; i < pollSize; i++) { - const connection = this.connections[i]; - if (connection.queryCount > 0) { - return true; - } - } + return this.totalQueries > 0; } return false; } @@ -1215,11 +1186,12 @@ class ConnectionPool { if (queryCount < leastQueries) { leastQueries = queryCount; connectionWithLeastQueries = connection; - continue; } + continue; } connection.flags |= PooledConnectionFlags.reserved; connection.queryCount++; + this.totalQueries++; this.readyConnections.delete(connection); onConnected(null, connection); return; diff --git a/src/sql/postgres/ConnectionFlags.zig b/src/sql/postgres/ConnectionFlags.zig index 49ad9d6f90..eb087cb30b 100644 --- a/src/sql/postgres/ConnectionFlags.zig +++ b/src/sql/postgres/ConnectionFlags.zig @@ -2,6 +2,8 @@ pub const ConnectionFlags = packed struct { is_ready_for_query: bool = false, is_processing_data: bool = false, use_unnamed_prepared_statements: bool = false, + waiting_to_prepare: bool = false, + has_backpressure: bool = false, }; // @sortImports diff --git a/src/sql/postgres/PostgresSQLConnection.zig b/src/sql/postgres/PostgresSQLConnection.zig index 4ee0e3c8f2..b1e3b3ac65 100644 --- a/src/sql/postgres/PostgresSQLConnection.zig +++ b/src/sql/postgres/PostgresSQLConnection.zig @@ -6,6 +6,10 @@ write_buffer: bun.OffsetByteList = .{}, read_buffer: bun.OffsetByteList = .{}, last_message_start: u32 = 0, requests: PostgresRequest.Queue, +// number of pipelined requests (Bind/Execute/Prepared statements) +pipelined_requests: u32 = 0, +// number of non-pipelined requests (Simple/Copy) +nonpipelinable_requests: u32 = 0, poll_ref: bun.Async.KeepAlive = .{}, globalObject: *JSC.JSGlobalObject, @@ -58,6 +62,49 @@ max_lifetime_timer: bun.api.Timer.EventLoopTimer = .{ .nsec = 0, }, }, +auto_flusher: AutoFlusher = .{}, + +pub fn onAutoFlush(this: *@This()) bool { + if (this.flags.has_backpressure) { + debug("onAutoFlush: has backpressure", .{}); + this.auto_flusher.registered = false; + // if we have backpressure, wait for onWritable + return false; + } + this.ref(); + defer this.deref(); + debug("onAutoFlush: draining", .{}); + // drain as much as we can + this.drainInternal(); + + // if we dont have backpressure and if we still have data to send, return true otherwise return false and wait for onWritable + const keep_flusher_registered = !this.flags.has_backpressure and this.write_buffer.len() > 0; + debug("onAutoFlush: keep_flusher_registered: {}", .{keep_flusher_registered}); + this.auto_flusher.registered = keep_flusher_registered; + return keep_flusher_registered; +} + +fn registerAutoFlusher(this: *PostgresSQLConnection) void { + const data_to_send = this.write_buffer.len(); + debug("registerAutoFlusher: backpressure: {} registered: {} data_to_send: {}", .{ this.flags.has_backpressure, this.auto_flusher.registered, data_to_send }); + + if (!this.auto_flusher.registered and // should not be registered + !this.flags.has_backpressure and // if has backpressure we need to wait for onWritable event + data_to_send > 0 and // we need data to send + this.status == .connected //and we need to be connected + ) { + AutoFlusher.registerDeferredMicrotaskWithTypeUnchecked(@This(), this, this.globalObject.bunVM()); + this.auto_flusher.registered = true; + } +} + +fn unregisterAutoFlusher(this: *PostgresSQLConnection) void { + debug("unregisterAutoFlusher registered: {}", .{this.auto_flusher.registered}); + if (this.auto_flusher.registered) { + AutoFlusher.unregisterDeferredMicrotaskWithType(@This(), this, this.globalObject.bunVM()); + this.auto_flusher.registered = false; + } +} fn getTimeoutInterval(this: *const PostgresSQLConnection) u32 { return switch (this.status) { @@ -186,10 +233,7 @@ fn start(this: *PostgresSQLConnection) void { this.resetConnectionTimeout(); this.sendStartupMessage(); - const event_loop = this.globalObject.bunVM().eventLoop(); - event_loop.enter(); - defer event_loop.exit(); - this.flushData(); + this.drainInternal(); } pub fn hasPendingActivity(this: *PostgresSQLConnection) bool { @@ -230,13 +274,26 @@ pub fn finalize(this: *PostgresSQLConnection) void { pub fn flushDataAndResetTimeout(this: *PostgresSQLConnection) void { this.resetConnectionTimeout(); - this.flushData(); + // defer flushing, so if many queries are running in parallel in the same connection, we don't flush more than once + this.registerAutoFlusher(); } pub fn flushData(this: *PostgresSQLConnection) void { + // we know we still have backpressure so just return we will flush later + if (this.flags.has_backpressure) { + debug("flushData: has backpressure", .{}); + return; + } + const chunk = this.write_buffer.remaining(); - if (chunk.len == 0) return; + if (chunk.len == 0) { + debug("flushData: no data to flush", .{}); + return; + } + const wrote = this.socket.write(chunk); + this.flags.has_backpressure = wrote < chunk.len; + debug("flushData: wrote {d}/{d} bytes", .{ wrote, chunk.len }); if (wrote > 0) { SocketMonitor.write(chunk[0..@intCast(wrote)]); this.write_buffer.consume(@intCast(wrote)); @@ -282,6 +339,8 @@ pub fn fail(this: *PostgresSQLConnection, message: []const u8, err: AnyPostgresE } pub fn onClose(this: *PostgresSQLConnection) void { + this.unregisterAutoFlusher(); + var vm = this.globalObject.bunVM(); const loop = vm.eventLoop(); loop.enter(); @@ -383,7 +442,8 @@ pub fn onTimeout(this: *PostgresSQLConnection) void { } pub fn onDrain(this: *PostgresSQLConnection) void { - + debug("onDrain", .{}); + this.flags.has_backpressure = false; // Don't send any other messages while we're waiting for TLS. if (this.tls_status == .message_sent) { if (this.tls_status.message_sent < 8) { @@ -393,10 +453,22 @@ pub fn onDrain(this: *PostgresSQLConnection) void { return; } + this.drainInternal(); +} + +fn drainInternal(this: *PostgresSQLConnection) void { + debug("drainInternal", .{}); const event_loop = this.globalObject.bunVM().eventLoop(); event_loop.enter(); defer event_loop.exit(); + this.flushData(); + + if (!this.flags.has_backpressure) { + // no backpressure yet so pipeline more if possible and flush again + this.advance(); + this.flushData(); + } } pub fn onData(this: *PostgresSQLConnection, data: []const u8) void { @@ -748,7 +820,7 @@ pub fn doUnref(this: *@This(), _: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JS return .js_undefined; } pub fn doFlush(this: *PostgresSQLConnection, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JSError!JSC.JSValue { - this.flushData(); + this.registerAutoFlusher(); return .js_undefined; } @@ -765,6 +837,7 @@ pub fn deref(this: *@This()) void { pub fn doClose(this: *@This(), globalObject: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JSError!JSValue { _ = globalObject; this.disconnect(); + this.unregisterAutoFlusher(); this.write_buffer.deinit(bun.default_allocator); return .js_undefined; @@ -842,7 +915,7 @@ fn refAndClose(this: *@This(), js_reason: ?JSC.JSValue) void { pub fn disconnect(this: *@This()) void { this.stopTimers(); - + this.unregisterAutoFlusher(); if (this.status == .connected) { this.status = .disconnected; this.refAndClose(null); @@ -853,7 +926,6 @@ fn current(this: *PostgresSQLConnection) ?*PostgresSQLQuery { if (this.requests.readableLength() == 0) { return null; } - return this.requests.peekItem(0); } @@ -861,6 +933,14 @@ pub fn hasQueryRunning(this: *PostgresSQLConnection) bool { return !this.flags.is_ready_for_query or this.current() != null; } +pub fn canPipeline(this: *PostgresSQLConnection) bool { + return this.nonpipelinable_requests == 0 and // need to wait for non pipelinable requests to finish + !this.flags.use_unnamed_prepared_statements and // unnamed statements are not pipelinable + !this.flags.waiting_to_prepare and // cannot pipeline when waiting prepare + !this.flags.has_backpressure and // dont make sense to buffer more if we have backpressure + this.write_buffer.len() < MAX_PIPELINE_SIZE; // buffer is too big need to flush before pipeline more +} + pub const Writer = struct { connection: *PostgresSQLConnection, @@ -935,35 +1015,65 @@ pub fn bufferedReader(this: *PostgresSQLConnection) protocol.NewReader(Reader) { }; } -fn advance(this: *PostgresSQLConnection) !void { - while (this.requests.readableLength() > 0) { - var req: *PostgresSQLQuery = this.requests.peekItem(0); +fn advance(this: *PostgresSQLConnection) void { + var offset: usize = 0; + debug("advance", .{}); + while (this.requests.readableLength() > offset and !this.flags.has_backpressure) { + var req: *PostgresSQLQuery = this.requests.peekItem(offset); switch (req.status) { .pending => { if (req.flags.simple) { - debug("executeQuery", .{}); + if (this.pipelined_requests > 0 or !this.flags.is_ready_for_query) { + debug("cannot execute simple query, pipelined_requests: {d}, is_ready_for_query: {}", .{ this.pipelined_requests, this.flags.is_ready_for_query }); + // need to wait for the previous request to finish before starting simple queries + return; + } var query_str = req.query.toUTF8(bun.default_allocator); defer query_str.deinit(); + debug("execute simple query: {s}", .{query_str.slice()}); PostgresRequest.executeQuery(query_str.slice(), PostgresSQLConnection.Writer, this.writer()) catch |err| { req.onWriteFail(err, this.globalObject, this.getQueriesArray()); - req.deref(); - this.requests.discard(1); - + if (offset == 0) { + req.deref(); + this.requests.discard(1); + } else { + // deinit later + req.status = .fail; + } + debug("executeQuery failed: {s}", .{@errorName(err)}); continue; }; + this.nonpipelinable_requests += 1; this.flags.is_ready_for_query = false; req.status = .running; return; } else { - const stmt = req.statement orelse return error.ExpectedStatement; + const stmt = req.statement orelse { + debug("stmt is not set yet waiting it to RUN before actually doing anything", .{}); + // statement is not set yet waiting it to RUN before actually doing anything + offset += 1; + continue; + }; switch (stmt.status) { .failed => { + debug("stmt failed", .{}); bun.assert(stmt.error_response != null); + if (req.flags.simple) { + this.nonpipelinable_requests -= 1; + } else if (req.flags.pipelined) { + this.pipelined_requests -= 1; + } else if (this.flags.waiting_to_prepare) { + this.flags.waiting_to_prepare = false; + } req.onError(stmt.error_response.?, this.globalObject); - req.deref(); - this.requests.discard(1); - + if (offset == 0) { + req.deref(); + this.requests.discard(1); + } else { + // deinit later + req.status = .fail; + } continue; }, .prepared => { @@ -972,19 +1082,40 @@ fn advance(this: *PostgresSQLConnection) !void { const binding_value = PostgresSQLQuery.js.bindingGetCached(thisValue) orelse .zero; const columns_value = PostgresSQLQuery.js.columnsGetCached(thisValue) orelse .zero; req.flags.binary = stmt.fields.len > 0; - + debug("binding and executing stmt", .{}); PostgresRequest.bindAndExecute(this.globalObject, stmt, binding_value, columns_value, PostgresSQLConnection.Writer, this.writer()) catch |err| { req.onWriteFail(err, this.globalObject, this.getQueriesArray()); - req.deref(); - this.requests.discard(1); + if (offset == 0) { + req.deref(); + this.requests.discard(1); + } else { + // deinit later + req.status = .fail; + } + debug("bind and execute failed: {s}", .{@errorName(err)}); continue; }; + this.flags.is_ready_for_query = false; req.status = .binding; - return; + if (this.flags.use_unnamed_prepared_statements or !this.canPipeline()) { + debug("cannot pipeline more stmt", .{}); + return; + } + debug("pipelining more stmt", .{}); + // we can pipeline it + this.pipelined_requests += 1; + req.flags.pipelined = true; + offset += 1; + continue; }, .pending => { + if (this.pipelined_requests > 0 or !this.flags.is_ready_for_query) { + debug("need to wait to finish the pipeline before starting a new query preparation", .{}); + // need to wait to finish the pipeline before starting a new query preparation + return; + } // statement is pending, lets write/parse it var query_str = req.query.toUTF8(bun.default_allocator); defer query_str.deinit(); @@ -995,31 +1126,42 @@ fn advance(this: *PostgresSQLConnection) !void { bun.assert(thisValue != .zero); // prepareAndQueryWithSignature will write + bind + execute, it will change to running after binding is complete const binding_value = PostgresSQLQuery.js.bindingGetCached(thisValue) orelse .zero; + debug("prepareAndQueryWithSignature", .{}); PostgresRequest.prepareAndQueryWithSignature(this.globalObject, query_str.slice(), binding_value, PostgresSQLConnection.Writer, this.writer(), &stmt.signature) catch |err| { stmt.status = .failed; stmt.error_response = .{ .postgres_error = err }; req.onWriteFail(err, this.globalObject, this.getQueriesArray()); - req.deref(); - this.requests.discard(1); + if (offset == 0) { + req.deref(); + this.requests.discard(1); + } else { + // deinit later + req.status = .fail; + } + debug("prepareAndQueryWithSignature failed: {s}", .{@errorName(err)}); continue; }; + this.flags.waiting_to_prepare = true; this.flags.is_ready_for_query = false; req.status = .binding; stmt.status = .parsing; return; } + const connection_writer = this.writer(); + debug("writing query", .{}); // write query and wait for it to be prepared PostgresRequest.writeQuery(query_str.slice(), stmt.signature.prepared_statement_name, stmt.signature.fields, PostgresSQLConnection.Writer, connection_writer) catch |err| { stmt.error_response = .{ .postgres_error = err }; stmt.status = .failed; req.onWriteFail(err, this.globalObject, this.getQueriesArray()); + bun.assert(offset == 0); req.deref(); this.requests.discard(1); - + debug("write query failed: {s}", .{@errorName(err)}); continue; }; connection_writer.write(&protocol.Sync) catch |err| { @@ -1027,13 +1169,15 @@ fn advance(this: *PostgresSQLConnection) !void { stmt.status = .failed; req.onWriteFail(err, this.globalObject, this.getQueriesArray()); + bun.assert(offset == 0); req.deref(); this.requests.discard(1); - + debug("write query (sync) failed: {s}", .{@errorName(err)}); continue; }; this.flags.is_ready_for_query = false; stmt.status = .parsing; + this.flags.waiting_to_prepare = true; return; }, .parsing => { @@ -1049,7 +1193,30 @@ fn advance(this: *PostgresSQLConnection) !void { // if we are running, we need to wait for it to be success or fail return; }, - .success, .fail => { + .success => { + if (req.flags.simple) { + this.nonpipelinable_requests -= 1; + } else if (req.flags.pipelined) { + this.pipelined_requests -= 1; + } else if (this.flags.waiting_to_prepare) { + this.flags.waiting_to_prepare = false; + } + if (offset > 0) { + // deinit later + req.status = .fail; + offset += 1; + continue; + } + req.deref(); + this.requests.discard(1); + continue; + }, + .fail => { + if (offset > 0) { + // deinit later + offset += 1; + continue; + } req.deref(); this.requests.discard(1); continue; @@ -1161,9 +1328,9 @@ pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_litera request.onResult("", this.globalObject, this.js_value, true); } } - try this.advance(); + this.advance(); - this.flushData(); + this.registerAutoFlusher(); }, .CommandComplete => { var request = this.current() orelse return error.ExpectedRequest; @@ -1197,6 +1364,7 @@ pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_litera // if we have params wait for parameter description if (statement.status == .parsing and statement.signature.fields.len == 0) { statement.status = .prepared; + this.flags.waiting_to_prepare = false; } } }, @@ -1208,6 +1376,7 @@ pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_litera statement.parameters = description.parameters; if (statement.status == .parsing) { statement.status = .prepared; + this.flags.waiting_to_prepare = false; } }, .RowDescription => { @@ -1537,6 +1706,8 @@ const PreparedStatementsMap = std.HashMapUnmanaged(u64, *PostgresSQLStatement, b const debug = bun.Output.scoped(.Postgres, false); +const MAX_PIPELINE_SIZE = std.math.maxInt(u16); // about 64KB per connection + // @sortImports const PostgresCachedStructure = @import("./PostgresCachedStructure.zig"); @@ -1564,6 +1735,7 @@ const assert = bun.assert; const JSC = bun.JSC; const JSValue = JSC.JSValue; +const AutoFlusher = JSC.WebCore.AutoFlusher; pub const js = JSC.Codegen.JSPostgresSQLConnection; pub const fromJS = js.fromJS; diff --git a/src/sql/postgres/PostgresSQLQuery.zig b/src/sql/postgres/PostgresSQLQuery.zig index 3aaa4d3920..4dacf33f84 100644 --- a/src/sql/postgres/PostgresSQLQuery.zig +++ b/src/sql/postgres/PostgresSQLQuery.zig @@ -13,8 +13,9 @@ flags: packed struct(u8) { binary: bool = false, bigint: bool = false, simple: bool = false, + pipelined: bool = false, result_mode: PostgresSQLQueryResultMode = .objects, - _padding: u2 = 0, + _padding: u1 = 0, } = .{}, pub fn getTarget(this: *PostgresSQLQuery, globalObject: *JSC.JSGlobalObject, clean_target: bool) JSC.JSValue { @@ -312,6 +313,7 @@ pub fn doRun(this: *PostgresSQLQuery, globalObject: *JSC.JSGlobalObject, callfra return error.JSError; }; connection.flags.is_ready_for_query = false; + connection.nonpipelinable_requests += 1; this.status = .running; } else { this.status = .pending; @@ -369,7 +371,7 @@ pub fn doRun(this: *PostgresSQLQuery, globalObject: *JSC.JSGlobalObject, callfra return globalObject.throwValue(this.statement.?.error_response.?.toJS(globalObject)); }, .prepared => { - if (!connection.hasQueryRunning()) { + if (!connection.hasQueryRunning() or connection.canPipeline()) { this.flags.binary = this.statement.?.fields.len > 0; debug("bindAndExecute", .{}); @@ -381,6 +383,8 @@ pub fn doRun(this: *PostgresSQLQuery, globalObject: *JSC.JSGlobalObject, callfra }; connection.flags.is_ready_for_query = false; this.status = .binding; + this.flags.pipelined = true; + connection.pipelined_requests += 1; did_write = true; } @@ -407,6 +411,7 @@ pub fn doRun(this: *PostgresSQLQuery, globalObject: *JSC.JSGlobalObject, callfra connection.flags.is_ready_for_query = false; this.status = .binding; did_write = true; + connection.flags.waiting_to_prepare = true; } else { debug("writeQuery", .{}); @@ -424,6 +429,7 @@ pub fn doRun(this: *PostgresSQLQuery, globalObject: *JSC.JSGlobalObject, callfra }; connection.flags.is_ready_for_query = false; did_write = true; + connection.flags.waiting_to_prepare = true; } } {