feature(postgres) add pipelining support (#20986)

Co-authored-by: cirospaciari <6379399+cirospaciari@users.noreply.github.com>
This commit is contained in:
Ciro Spaciari
2025-07-14 21:59:16 -07:00
committed by GitHub
parent 5fe0c034e2
commit 6efb346e68
5 changed files with 244 additions and 93 deletions

View File

@@ -6,6 +6,10 @@ write_buffer: bun.OffsetByteList = .{},
read_buffer: bun.OffsetByteList = .{},
last_message_start: u32 = 0,
requests: PostgresRequest.Queue,
// number of pipelined requests (Bind/Execute/Prepared statements)
pipelined_requests: u32 = 0,
// number of non-pipelined requests (Simple/Copy)
nonpipelinable_requests: u32 = 0,
poll_ref: bun.Async.KeepAlive = .{},
globalObject: *JSC.JSGlobalObject,
@@ -58,6 +62,49 @@ max_lifetime_timer: bun.api.Timer.EventLoopTimer = .{
.nsec = 0,
},
},
auto_flusher: AutoFlusher = .{},
pub fn onAutoFlush(this: *@This()) bool {
if (this.flags.has_backpressure) {
debug("onAutoFlush: has backpressure", .{});
this.auto_flusher.registered = false;
// if we have backpressure, wait for onWritable
return false;
}
this.ref();
defer this.deref();
debug("onAutoFlush: draining", .{});
// drain as much as we can
this.drainInternal();
// if we dont have backpressure and if we still have data to send, return true otherwise return false and wait for onWritable
const keep_flusher_registered = !this.flags.has_backpressure and this.write_buffer.len() > 0;
debug("onAutoFlush: keep_flusher_registered: {}", .{keep_flusher_registered});
this.auto_flusher.registered = keep_flusher_registered;
return keep_flusher_registered;
}
fn registerAutoFlusher(this: *PostgresSQLConnection) void {
const data_to_send = this.write_buffer.len();
debug("registerAutoFlusher: backpressure: {} registered: {} data_to_send: {}", .{ this.flags.has_backpressure, this.auto_flusher.registered, data_to_send });
if (!this.auto_flusher.registered and // should not be registered
!this.flags.has_backpressure and // if has backpressure we need to wait for onWritable event
data_to_send > 0 and // we need data to send
this.status == .connected //and we need to be connected
) {
AutoFlusher.registerDeferredMicrotaskWithTypeUnchecked(@This(), this, this.globalObject.bunVM());
this.auto_flusher.registered = true;
}
}
fn unregisterAutoFlusher(this: *PostgresSQLConnection) void {
debug("unregisterAutoFlusher registered: {}", .{this.auto_flusher.registered});
if (this.auto_flusher.registered) {
AutoFlusher.unregisterDeferredMicrotaskWithType(@This(), this, this.globalObject.bunVM());
this.auto_flusher.registered = false;
}
}
fn getTimeoutInterval(this: *const PostgresSQLConnection) u32 {
return switch (this.status) {
@@ -186,10 +233,7 @@ fn start(this: *PostgresSQLConnection) void {
this.resetConnectionTimeout();
this.sendStartupMessage();
const event_loop = this.globalObject.bunVM().eventLoop();
event_loop.enter();
defer event_loop.exit();
this.flushData();
this.drainInternal();
}
pub fn hasPendingActivity(this: *PostgresSQLConnection) bool {
@@ -230,13 +274,26 @@ pub fn finalize(this: *PostgresSQLConnection) void {
pub fn flushDataAndResetTimeout(this: *PostgresSQLConnection) void {
this.resetConnectionTimeout();
this.flushData();
// defer flushing, so if many queries are running in parallel in the same connection, we don't flush more than once
this.registerAutoFlusher();
}
pub fn flushData(this: *PostgresSQLConnection) void {
// we know we still have backpressure so just return we will flush later
if (this.flags.has_backpressure) {
debug("flushData: has backpressure", .{});
return;
}
const chunk = this.write_buffer.remaining();
if (chunk.len == 0) return;
if (chunk.len == 0) {
debug("flushData: no data to flush", .{});
return;
}
const wrote = this.socket.write(chunk);
this.flags.has_backpressure = wrote < chunk.len;
debug("flushData: wrote {d}/{d} bytes", .{ wrote, chunk.len });
if (wrote > 0) {
SocketMonitor.write(chunk[0..@intCast(wrote)]);
this.write_buffer.consume(@intCast(wrote));
@@ -282,6 +339,8 @@ pub fn fail(this: *PostgresSQLConnection, message: []const u8, err: AnyPostgresE
}
pub fn onClose(this: *PostgresSQLConnection) void {
this.unregisterAutoFlusher();
var vm = this.globalObject.bunVM();
const loop = vm.eventLoop();
loop.enter();
@@ -383,7 +442,8 @@ pub fn onTimeout(this: *PostgresSQLConnection) void {
}
pub fn onDrain(this: *PostgresSQLConnection) void {
debug("onDrain", .{});
this.flags.has_backpressure = false;
// Don't send any other messages while we're waiting for TLS.
if (this.tls_status == .message_sent) {
if (this.tls_status.message_sent < 8) {
@@ -393,10 +453,22 @@ pub fn onDrain(this: *PostgresSQLConnection) void {
return;
}
this.drainInternal();
}
fn drainInternal(this: *PostgresSQLConnection) void {
debug("drainInternal", .{});
const event_loop = this.globalObject.bunVM().eventLoop();
event_loop.enter();
defer event_loop.exit();
this.flushData();
if (!this.flags.has_backpressure) {
// no backpressure yet so pipeline more if possible and flush again
this.advance();
this.flushData();
}
}
pub fn onData(this: *PostgresSQLConnection, data: []const u8) void {
@@ -748,7 +820,7 @@ pub fn doUnref(this: *@This(), _: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JS
return .js_undefined;
}
pub fn doFlush(this: *PostgresSQLConnection, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JSError!JSC.JSValue {
this.flushData();
this.registerAutoFlusher();
return .js_undefined;
}
@@ -765,6 +837,7 @@ pub fn deref(this: *@This()) void {
pub fn doClose(this: *@This(), globalObject: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JSError!JSValue {
_ = globalObject;
this.disconnect();
this.unregisterAutoFlusher();
this.write_buffer.deinit(bun.default_allocator);
return .js_undefined;
@@ -842,7 +915,7 @@ fn refAndClose(this: *@This(), js_reason: ?JSC.JSValue) void {
pub fn disconnect(this: *@This()) void {
this.stopTimers();
this.unregisterAutoFlusher();
if (this.status == .connected) {
this.status = .disconnected;
this.refAndClose(null);
@@ -853,7 +926,6 @@ fn current(this: *PostgresSQLConnection) ?*PostgresSQLQuery {
if (this.requests.readableLength() == 0) {
return null;
}
return this.requests.peekItem(0);
}
@@ -861,6 +933,14 @@ pub fn hasQueryRunning(this: *PostgresSQLConnection) bool {
return !this.flags.is_ready_for_query or this.current() != null;
}
pub fn canPipeline(this: *PostgresSQLConnection) bool {
return this.nonpipelinable_requests == 0 and // need to wait for non pipelinable requests to finish
!this.flags.use_unnamed_prepared_statements and // unnamed statements are not pipelinable
!this.flags.waiting_to_prepare and // cannot pipeline when waiting prepare
!this.flags.has_backpressure and // dont make sense to buffer more if we have backpressure
this.write_buffer.len() < MAX_PIPELINE_SIZE; // buffer is too big need to flush before pipeline more
}
pub const Writer = struct {
connection: *PostgresSQLConnection,
@@ -935,35 +1015,65 @@ pub fn bufferedReader(this: *PostgresSQLConnection) protocol.NewReader(Reader) {
};
}
fn advance(this: *PostgresSQLConnection) !void {
while (this.requests.readableLength() > 0) {
var req: *PostgresSQLQuery = this.requests.peekItem(0);
fn advance(this: *PostgresSQLConnection) void {
var offset: usize = 0;
debug("advance", .{});
while (this.requests.readableLength() > offset and !this.flags.has_backpressure) {
var req: *PostgresSQLQuery = this.requests.peekItem(offset);
switch (req.status) {
.pending => {
if (req.flags.simple) {
debug("executeQuery", .{});
if (this.pipelined_requests > 0 or !this.flags.is_ready_for_query) {
debug("cannot execute simple query, pipelined_requests: {d}, is_ready_for_query: {}", .{ this.pipelined_requests, this.flags.is_ready_for_query });
// need to wait for the previous request to finish before starting simple queries
return;
}
var query_str = req.query.toUTF8(bun.default_allocator);
defer query_str.deinit();
debug("execute simple query: {s}", .{query_str.slice()});
PostgresRequest.executeQuery(query_str.slice(), PostgresSQLConnection.Writer, this.writer()) catch |err| {
req.onWriteFail(err, this.globalObject, this.getQueriesArray());
req.deref();
this.requests.discard(1);
if (offset == 0) {
req.deref();
this.requests.discard(1);
} else {
// deinit later
req.status = .fail;
}
debug("executeQuery failed: {s}", .{@errorName(err)});
continue;
};
this.nonpipelinable_requests += 1;
this.flags.is_ready_for_query = false;
req.status = .running;
return;
} else {
const stmt = req.statement orelse return error.ExpectedStatement;
const stmt = req.statement orelse {
debug("stmt is not set yet waiting it to RUN before actually doing anything", .{});
// statement is not set yet waiting it to RUN before actually doing anything
offset += 1;
continue;
};
switch (stmt.status) {
.failed => {
debug("stmt failed", .{});
bun.assert(stmt.error_response != null);
if (req.flags.simple) {
this.nonpipelinable_requests -= 1;
} else if (req.flags.pipelined) {
this.pipelined_requests -= 1;
} else if (this.flags.waiting_to_prepare) {
this.flags.waiting_to_prepare = false;
}
req.onError(stmt.error_response.?, this.globalObject);
req.deref();
this.requests.discard(1);
if (offset == 0) {
req.deref();
this.requests.discard(1);
} else {
// deinit later
req.status = .fail;
}
continue;
},
.prepared => {
@@ -972,19 +1082,40 @@ fn advance(this: *PostgresSQLConnection) !void {
const binding_value = PostgresSQLQuery.js.bindingGetCached(thisValue) orelse .zero;
const columns_value = PostgresSQLQuery.js.columnsGetCached(thisValue) orelse .zero;
req.flags.binary = stmt.fields.len > 0;
debug("binding and executing stmt", .{});
PostgresRequest.bindAndExecute(this.globalObject, stmt, binding_value, columns_value, PostgresSQLConnection.Writer, this.writer()) catch |err| {
req.onWriteFail(err, this.globalObject, this.getQueriesArray());
req.deref();
this.requests.discard(1);
if (offset == 0) {
req.deref();
this.requests.discard(1);
} else {
// deinit later
req.status = .fail;
}
debug("bind and execute failed: {s}", .{@errorName(err)});
continue;
};
this.flags.is_ready_for_query = false;
req.status = .binding;
return;
if (this.flags.use_unnamed_prepared_statements or !this.canPipeline()) {
debug("cannot pipeline more stmt", .{});
return;
}
debug("pipelining more stmt", .{});
// we can pipeline it
this.pipelined_requests += 1;
req.flags.pipelined = true;
offset += 1;
continue;
},
.pending => {
if (this.pipelined_requests > 0 or !this.flags.is_ready_for_query) {
debug("need to wait to finish the pipeline before starting a new query preparation", .{});
// need to wait to finish the pipeline before starting a new query preparation
return;
}
// statement is pending, lets write/parse it
var query_str = req.query.toUTF8(bun.default_allocator);
defer query_str.deinit();
@@ -995,31 +1126,42 @@ fn advance(this: *PostgresSQLConnection) !void {
bun.assert(thisValue != .zero);
// prepareAndQueryWithSignature will write + bind + execute, it will change to running after binding is complete
const binding_value = PostgresSQLQuery.js.bindingGetCached(thisValue) orelse .zero;
debug("prepareAndQueryWithSignature", .{});
PostgresRequest.prepareAndQueryWithSignature(this.globalObject, query_str.slice(), binding_value, PostgresSQLConnection.Writer, this.writer(), &stmt.signature) catch |err| {
stmt.status = .failed;
stmt.error_response = .{ .postgres_error = err };
req.onWriteFail(err, this.globalObject, this.getQueriesArray());
req.deref();
this.requests.discard(1);
if (offset == 0) {
req.deref();
this.requests.discard(1);
} else {
// deinit later
req.status = .fail;
}
debug("prepareAndQueryWithSignature failed: {s}", .{@errorName(err)});
continue;
};
this.flags.waiting_to_prepare = true;
this.flags.is_ready_for_query = false;
req.status = .binding;
stmt.status = .parsing;
return;
}
const connection_writer = this.writer();
debug("writing query", .{});
// write query and wait for it to be prepared
PostgresRequest.writeQuery(query_str.slice(), stmt.signature.prepared_statement_name, stmt.signature.fields, PostgresSQLConnection.Writer, connection_writer) catch |err| {
stmt.error_response = .{ .postgres_error = err };
stmt.status = .failed;
req.onWriteFail(err, this.globalObject, this.getQueriesArray());
bun.assert(offset == 0);
req.deref();
this.requests.discard(1);
debug("write query failed: {s}", .{@errorName(err)});
continue;
};
connection_writer.write(&protocol.Sync) catch |err| {
@@ -1027,13 +1169,15 @@ fn advance(this: *PostgresSQLConnection) !void {
stmt.status = .failed;
req.onWriteFail(err, this.globalObject, this.getQueriesArray());
bun.assert(offset == 0);
req.deref();
this.requests.discard(1);
debug("write query (sync) failed: {s}", .{@errorName(err)});
continue;
};
this.flags.is_ready_for_query = false;
stmt.status = .parsing;
this.flags.waiting_to_prepare = true;
return;
},
.parsing => {
@@ -1049,7 +1193,30 @@ fn advance(this: *PostgresSQLConnection) !void {
// if we are running, we need to wait for it to be success or fail
return;
},
.success, .fail => {
.success => {
if (req.flags.simple) {
this.nonpipelinable_requests -= 1;
} else if (req.flags.pipelined) {
this.pipelined_requests -= 1;
} else if (this.flags.waiting_to_prepare) {
this.flags.waiting_to_prepare = false;
}
if (offset > 0) {
// deinit later
req.status = .fail;
offset += 1;
continue;
}
req.deref();
this.requests.discard(1);
continue;
},
.fail => {
if (offset > 0) {
// deinit later
offset += 1;
continue;
}
req.deref();
this.requests.discard(1);
continue;
@@ -1161,9 +1328,9 @@ pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_litera
request.onResult("", this.globalObject, this.js_value, true);
}
}
try this.advance();
this.advance();
this.flushData();
this.registerAutoFlusher();
},
.CommandComplete => {
var request = this.current() orelse return error.ExpectedRequest;
@@ -1197,6 +1364,7 @@ pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_litera
// if we have params wait for parameter description
if (statement.status == .parsing and statement.signature.fields.len == 0) {
statement.status = .prepared;
this.flags.waiting_to_prepare = false;
}
}
},
@@ -1208,6 +1376,7 @@ pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_litera
statement.parameters = description.parameters;
if (statement.status == .parsing) {
statement.status = .prepared;
this.flags.waiting_to_prepare = false;
}
},
.RowDescription => {
@@ -1537,6 +1706,8 @@ const PreparedStatementsMap = std.HashMapUnmanaged(u64, *PostgresSQLStatement, b
const debug = bun.Output.scoped(.Postgres, false);
const MAX_PIPELINE_SIZE = std.math.maxInt(u16); // about 64KB per connection
// @sortImports
const PostgresCachedStructure = @import("./PostgresCachedStructure.zig");
@@ -1564,6 +1735,7 @@ const assert = bun.assert;
const JSC = bun.JSC;
const JSValue = JSC.JSValue;
const AutoFlusher = JSC.WebCore.AutoFlusher;
pub const js = JSC.Codegen.JSPostgresSQLConnection;
pub const fromJS = js.fromJS;