fix(postgres) regression (#21466)

### What does this PR do?
Fix: https://github.com/oven-sh/bun/issues/21351

Relevant changes:
Fix advance to properly cleanup success and failed queries that could be
still be in the queue
Always ref before executing
Use stronger atomics for ref/deref and hasPendingActivity
Fallback when thisValue is freed/null/zero and check if vm is being
shutdown
The bug in --hot in `resolveRopeIfNeeded` Issue is not meant to be fixed
in this PR this is a fix for the postgres regression
Added assertions so this bug is easier to catch on CI
### How did you verify your code works?
Test added

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Ciro Spaciari
2025-07-31 16:26:35 -07:00
committed by GitHub
parent ce5152dd7a
commit 03afe6ef28
8 changed files with 604 additions and 142 deletions

View File

@@ -1,8 +1,8 @@
const PostgresSQLConnection = @This();
const RefCount = bun.ptr.RefCount(@This(), "ref_count", deinit, .{});
socket: Socket,
status: Status = Status.connecting,
ref_count: u32 = 1,
ref_count: RefCount = RefCount.init(),
write_buffer: bun.OffsetByteList = .{},
read_buffer: bun.OffsetByteList = .{},
@@ -15,7 +15,7 @@ nonpipelinable_requests: u32 = 0,
poll_ref: bun.Async.KeepAlive = .{},
globalObject: *jsc.JSGlobalObject,
vm: *jsc.VirtualMachine,
statements: PreparedStatementsMap,
prepared_statement_id: u64 = 0,
pending_activity_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(0),
@@ -66,6 +66,9 @@ max_lifetime_timer: bun.api.Timer.EventLoopTimer = .{
},
auto_flusher: AutoFlusher = .{},
pub const ref = RefCount.ref;
pub const deref = RefCount.deref;
pub fn onAutoFlush(this: *@This()) bool {
if (this.flags.has_backpressure) {
debug("onAutoFlush: has backpressure", .{});
@@ -95,7 +98,7 @@ fn registerAutoFlusher(this: *PostgresSQLConnection) void {
data_to_send > 0 and // we need data to send
this.status == .connected //and we need to be connected
) {
AutoFlusher.registerDeferredMicrotaskWithTypeUnchecked(@This(), this, this.globalObject.bunVM());
AutoFlusher.registerDeferredMicrotaskWithTypeUnchecked(@This(), this, this.vm);
this.auto_flusher.registered = true;
}
}
@@ -103,7 +106,7 @@ fn registerAutoFlusher(this: *PostgresSQLConnection) void {
fn unregisterAutoFlusher(this: *PostgresSQLConnection) void {
debug("unregisterAutoFlusher registered: {}", .{this.auto_flusher.registered});
if (this.auto_flusher.registered) {
AutoFlusher.unregisterDeferredMicrotaskWithType(@This(), this, this.globalObject.bunVM());
AutoFlusher.unregisterDeferredMicrotaskWithType(@This(), this, this.vm);
this.auto_flusher.registered = false;
}
}
@@ -117,7 +120,7 @@ fn getTimeoutInterval(this: *const PostgresSQLConnection) u32 {
}
pub fn disableConnectionTimeout(this: *PostgresSQLConnection) void {
if (this.timer.state == .ACTIVE) {
this.globalObject.bunVM().timer.remove(&this.timer);
this.vm.timer.remove(&this.timer);
}
this.timer.state = .CANCELLED;
}
@@ -126,14 +129,14 @@ pub fn resetConnectionTimeout(this: *PostgresSQLConnection) void {
if (this.flags.is_processing_data) return;
const interval = this.getTimeoutInterval();
if (this.timer.state == .ACTIVE) {
this.globalObject.bunVM().timer.remove(&this.timer);
this.vm.timer.remove(&this.timer);
}
if (interval == 0) {
return;
}
this.timer.next = bun.timespec.msFromNow(@intCast(interval));
this.globalObject.bunVM().timer.insert(&this.timer);
this.vm.timer.insert(&this.timer);
}
pub fn getQueries(_: *PostgresSQLConnection, thisValue: jsc.JSValue, globalObject: *jsc.JSGlobalObject) bun.JSError!jsc.JSValue {
@@ -192,7 +195,7 @@ fn setupMaxLifetimeTimerIfNecessary(this: *PostgresSQLConnection) void {
if (this.max_lifetime_timer.state == .ACTIVE) return;
this.max_lifetime_timer.next = bun.timespec.msFromNow(@intCast(this.max_lifetime_interval_ms));
this.globalObject.bunVM().timer.insert(&this.max_lifetime_timer);
this.vm.timer.insert(&this.max_lifetime_timer);
}
pub fn onConnectionTimeout(this: *PostgresSQLConnection) bun.api.Timer.EventLoopTimer.Arm {
@@ -254,6 +257,7 @@ pub fn setStatus(this: *PostgresSQLConnection, status: Status) void {
this.status = status;
this.resetConnectionTimeout();
if (this.vm.isShuttingDown()) return;
switch (status) {
.connected => {
@@ -261,7 +265,7 @@ pub fn setStatus(this: *PostgresSQLConnection, status: Status) void {
const js_value = this.js_value;
js_value.ensureStillAlive();
this.globalObject.queueMicrotask(on_connect, &[_]JSValue{ JSValue.jsNull(), js_value });
this.poll_ref.unref(this.globalObject.bunVM());
this.poll_ref.unref(this.vm);
},
else => {},
}
@@ -315,7 +319,7 @@ pub fn failWithJSValue(this: *PostgresSQLConnection, value: JSValue) void {
defer this.refAndClose(value);
const on_close = this.consumeOnCloseCallback(this.globalObject) orelse return;
const loop = this.globalObject.bunVM().eventLoop();
const loop = this.vm.eventLoop();
loop.enter();
defer loop.exit();
_ = on_close.call(
@@ -343,13 +347,21 @@ pub fn fail(this: *PostgresSQLConnection, message: []const u8, err: AnyPostgresE
pub fn onClose(this: *PostgresSQLConnection) void {
this.unregisterAutoFlusher();
var vm = this.globalObject.bunVM();
const loop = vm.eventLoop();
loop.enter();
defer loop.exit();
this.poll_ref.unref(this.globalObject.bunVM());
if (this.vm.isShuttingDown()) {
defer this.updateHasPendingActivity();
this.stopTimers();
if (this.status == .failed) return;
this.fail("Connection closed", error.ConnectionClosed);
this.status = .failed;
this.cleanUpRequests(null);
} else {
const loop = this.vm.eventLoop();
loop.enter();
defer loop.exit();
this.poll_ref.unref(this.vm);
this.fail("Connection closed", error.ConnectionClosed);
}
}
fn sendStartupMessage(this: *PostgresSQLConnection) void {
@@ -392,7 +404,7 @@ fn startTLS(this: *PostgresSQLConnection, socket: uws.AnySocket) void {
pub fn onOpen(this: *PostgresSQLConnection, socket: uws.AnySocket) void {
this.socket = socket;
this.poll_ref.ref(this.globalObject.bunVM());
this.poll_ref.ref(this.vm);
this.updateHasPendingActivity();
if (this.tls_status == .message_sent or this.tls_status == .pending) {
@@ -460,7 +472,9 @@ pub fn onDrain(this: *PostgresSQLConnection) void {
fn drainInternal(this: *PostgresSQLConnection) void {
debug("drainInternal", .{});
const event_loop = this.globalObject.bunVM().eventLoop();
if (this.vm.isShuttingDown()) return this.close();
const event_loop = this.vm.eventLoop();
event_loop.enter();
defer event_loop.exit();
@@ -476,7 +490,7 @@ fn drainInternal(this: *PostgresSQLConnection) void {
pub fn onData(this: *PostgresSQLConnection, data: []const u8) void {
this.ref();
this.flags.is_processing_data = true;
const vm = this.globalObject.bunVM();
const vm = this.vm;
this.disableConnectionTimeout();
defer {
@@ -681,7 +695,7 @@ pub fn call(globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JS
ptr.* = PostgresSQLConnection{
.globalObject = globalObject,
.vm = globalObject.bunVM(),
.database = database,
.user = username,
.password = password,
@@ -764,10 +778,20 @@ fn SocketHandler(comptime ssl: bool) type {
return Socket{ .SocketTCP = s };
}
pub fn onOpen(this: *PostgresSQLConnection, socket: SocketType) void {
if (this.vm.isShuttingDown()) {
@branchHint(.unlikely);
this.close();
return;
}
this.onOpen(_socket(socket));
}
fn onHandshake_(this: *PostgresSQLConnection, _: anytype, success: i32, ssl_error: uws.us_bun_verify_error_t) void {
if (this.vm.isShuttingDown()) {
@branchHint(.unlikely);
this.close();
return;
}
this.onHandshake(success, ssl_error);
}
@@ -785,39 +809,54 @@ fn SocketHandler(comptime ssl: bool) type {
pub fn onConnectError(this: *PostgresSQLConnection, socket: SocketType, _: i32) void {
_ = socket;
if (this.vm.isShuttingDown()) {
@branchHint(.unlikely);
this.close();
return;
}
this.onClose();
}
pub fn onTimeout(this: *PostgresSQLConnection, socket: SocketType) void {
_ = socket;
if (this.vm.isShuttingDown()) {
@branchHint(.unlikely);
this.close();
return;
}
this.onTimeout();
}
pub fn onData(this: *PostgresSQLConnection, socket: SocketType, data: []const u8) void {
_ = socket;
if (this.vm.isShuttingDown()) {
@branchHint(.unlikely);
this.close();
return;
}
this.onData(data);
}
pub fn onWritable(this: *PostgresSQLConnection, socket: SocketType) void {
_ = socket;
if (this.vm.isShuttingDown()) {
@branchHint(.unlikely);
this.close();
return;
}
this.onDrain();
}
};
}
pub fn ref(this: *@This()) void {
bun.assert(this.ref_count > 0);
this.ref_count += 1;
}
pub fn doRef(this: *@This(), _: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!JSValue {
this.poll_ref.ref(this.globalObject.bunVM());
this.poll_ref.ref(this.vm);
this.updateHasPendingActivity();
return .js_undefined;
}
pub fn doUnref(this: *@This(), _: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!JSValue {
this.poll_ref.unref(this.globalObject.bunVM());
this.poll_ref.unref(this.vm);
this.updateHasPendingActivity();
return .js_undefined;
}
@@ -826,35 +865,29 @@ pub fn doFlush(this: *PostgresSQLConnection, _: *jsc.JSGlobalObject, _: *jsc.Cal
return .js_undefined;
}
pub fn deref(this: *@This()) void {
const ref_count = this.ref_count;
this.ref_count -= 1;
if (ref_count == 1) {
this.disconnect();
this.deinit();
}
fn close(this: *@This()) void {
this.disconnect();
this.unregisterAutoFlusher();
this.write_buffer.deinit(bun.default_allocator);
}
pub fn doClose(this: *@This(), globalObject: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!JSValue {
_ = globalObject;
this.disconnect();
this.unregisterAutoFlusher();
this.write_buffer.deinit(bun.default_allocator);
this.close();
return .js_undefined;
}
pub fn stopTimers(this: *PostgresSQLConnection) void {
if (this.timer.state == .ACTIVE) {
this.globalObject.bunVM().timer.remove(&this.timer);
this.vm.timer.remove(&this.timer);
}
if (this.max_lifetime_timer.state == .ACTIVE) {
this.globalObject.bunVM().timer.remove(&this.max_lifetime_timer);
this.vm.timer.remove(&this.max_lifetime_timer);
}
}
pub fn deinit(this: *@This()) void {
this.disconnect();
this.stopTimers();
var iter = this.statements.valueIterator();
while (iter.next()) |stmt_ptr| {
@@ -872,17 +905,7 @@ pub fn deinit(this: *@This()) void {
bun.default_allocator.destroy(this);
}
fn refAndClose(this: *@This(), js_reason: ?jsc.JSValue) void {
// refAndClose is always called when we wanna to disconnect or when we are closed
if (!this.socket.isClosed()) {
// event loop need to be alive to close the socket
this.poll_ref.ref(this.globalObject.bunVM());
// will unref on socket close
this.socket.close();
}
// cleanup requests
fn cleanUpRequests(this: *@This(), js_reason: ?jsc.JSValue) void {
while (this.current()) |request| {
switch (request.status) {
// pending we will fail the request and the stmt will be marked as error ConnectionClosed too
@@ -890,10 +913,12 @@ fn refAndClose(this: *@This(), js_reason: ?jsc.JSValue) void {
const stmt = request.statement orelse continue;
stmt.error_response = .{ .postgres_error = AnyPostgresError.ConnectionClosed };
stmt.status = .failed;
if (js_reason) |reason| {
request.onJSError(reason, this.globalObject);
} else {
request.onError(.{ .postgres_error = AnyPostgresError.ConnectionClosed }, this.globalObject);
if (!this.vm.isShuttingDown()) {
if (js_reason) |reason| {
request.onJSError(reason, this.globalObject);
} else {
request.onError(.{ .postgres_error = AnyPostgresError.ConnectionClosed }, this.globalObject);
}
}
},
// in the middle of running
@@ -901,10 +926,12 @@ fn refAndClose(this: *@This(), js_reason: ?jsc.JSValue) void {
.running,
.partial_response,
=> {
if (js_reason) |reason| {
request.onJSError(reason, this.globalObject);
} else {
request.onError(.{ .postgres_error = AnyPostgresError.ConnectionClosed }, this.globalObject);
if (!this.vm.isShuttingDown()) {
if (js_reason) |reason| {
request.onJSError(reason, this.globalObject);
} else {
request.onError(.{ .postgres_error = AnyPostgresError.ConnectionClosed }, this.globalObject);
}
}
},
// just ignore success and fail cases
@@ -914,6 +941,19 @@ fn refAndClose(this: *@This(), js_reason: ?jsc.JSValue) void {
this.requests.discard(1);
}
}
fn refAndClose(this: *@This(), js_reason: ?jsc.JSValue) void {
// refAndClose is always called when we wanna to disconnect or when we are closed
if (!this.socket.isClosed()) {
// event loop need to be alive to close the socket
this.poll_ref.ref(this.vm);
// will unref on socket close
this.socket.close();
}
// cleanup requests
this.cleanUpRequests(js_reason);
}
pub fn disconnect(this: *@This()) void {
this.stopTimers();
@@ -928,6 +968,7 @@ fn current(this: *PostgresSQLConnection) ?*PostgresSQLQuery {
if (this.requests.readableLength() == 0) {
return null;
}
return this.requests.peekItem(0);
}
@@ -1022,10 +1063,42 @@ pub fn bufferedReader(this: *PostgresSQLConnection) protocol.NewReader(Reader) {
};
}
fn cleanupSuccessQuery(this: *PostgresSQLConnection, item: *PostgresSQLQuery) void {
if (item.flags.simple) {
this.nonpipelinable_requests -= 1;
} else if (item.flags.pipelined) {
this.pipelined_requests -= 1;
} else if (this.flags.waiting_to_prepare) {
this.flags.waiting_to_prepare = false;
}
}
fn advance(this: *PostgresSQLConnection) void {
var offset: usize = 0;
debug("advance", .{});
defer {
while (this.requests.readableLength() > 0) {
const result = this.requests.peekItem(0);
// An item may be in the success or failed state and still be inside the queue (see deinit later comments)
// so we do the cleanup her
switch (result.status) {
.success => {
this.cleanupSuccessQuery(result);
result.deref();
this.requests.discard(1);
continue;
},
.fail => {
result.deref();
this.requests.discard(1);
continue;
},
else => break, // trully current item
}
}
}
while (this.requests.readableLength() > offset and !this.flags.has_backpressure) {
if (this.vm.isShuttingDown()) return this.close();
var req: *PostgresSQLQuery = this.requests.peekItem(offset);
switch (req.status) {
.pending => {
@@ -1084,8 +1157,18 @@ fn advance(this: *PostgresSQLConnection) void {
continue;
},
.prepared => {
const thisValue = req.thisValue.get();
bun.assert(thisValue != .zero);
const thisValue = req.thisValue.tryGet() orelse {
bun.assertf(false, "query value was freed earlier than expected", .{});
if (offset == 0) {
req.deref();
this.requests.discard(1);
} else {
// deinit later
req.status = .fail;
offset += 1;
}
continue;
};
const binding_value = PostgresSQLQuery.js.bindingGetCached(thisValue) orelse .zero;
const columns_value = PostgresSQLQuery.js.columnsGetCached(thisValue) orelse .zero;
req.flags.binary = stmt.fields.len > 0;
@@ -1129,8 +1212,18 @@ fn advance(this: *PostgresSQLConnection) void {
const has_params = stmt.signature.fields.len > 0;
// If it does not have params, we can write and execute immediately in one go
if (!has_params) {
const thisValue = req.thisValue.get();
bun.assert(thisValue != .zero);
const thisValue = req.thisValue.tryGet() orelse {
bun.assertf(false, "query value was freed earlier than expected", .{});
if (offset == 0) {
req.deref();
this.requests.discard(1);
} else {
// deinit later
req.status = .fail;
offset += 1;
}
continue;
};
// prepareAndQueryWithSignature will write + bind + execute, it will change to running after binding is complete
const binding_value = PostgresSQLQuery.js.bindingGetCached(thisValue) orelse .zero;
debug("prepareAndQueryWithSignature", .{});
@@ -1201,13 +1294,7 @@ fn advance(this: *PostgresSQLConnection) void {
return;
},
.success => {
if (req.flags.simple) {
this.nonpipelinable_requests -= 1;
} else if (req.flags.pipelined) {
this.pipelined_requests -= 1;
} else if (this.flags.waiting_to_prepare) {
this.flags.waiting_to_prepare = false;
}
this.cleanupSuccessQuery(req);
if (offset > 0) {
// deinit later
req.status = .fail;
@@ -1242,6 +1329,7 @@ pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_litera
switch (comptime MessageType) {
.DataRow => {
const request = this.current() orelse return error.ExpectedRequest;
var statement = request.statement orelse return error.ExpectedStatement;
var structure: JSValue = .js_undefined;
var cached_structure: ?PostgresCachedStructure = null;
@@ -1297,8 +1385,10 @@ pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_litera
DataCell.Putter.put,
);
}
const thisValue = request.thisValue.get();
bun.assert(thisValue != .zero);
const thisValue = request.thisValue.tryGet() orelse return {
bun.assertf(false, "query value was freed earlier than expected", .{});
return error.ExpectedRequest;
};
const pending_value = PostgresSQLQuery.js.pendingValueGetCached(thisValue) orelse .zero;
pending_value.ensureStillAlive();
const result = putter.toJS(this.globalObject, pending_value, structure, statement.fields_flags, request.flags.result_mode, cached_structure);
@@ -1682,9 +1772,9 @@ pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_litera
pub fn updateRef(this: *PostgresSQLConnection) void {
this.updateHasPendingActivity();
if (this.pending_activity_count.raw > 0) {
this.poll_ref.ref(this.globalObject.bunVM());
this.poll_ref.ref(this.vm);
} else {
this.poll_ref.unref(this.globalObject.bunVM());
this.poll_ref.unref(this.vm);
}
}