Compare commits

...

1 Commits

Author SHA1 Message Date
Jarred Sumner
720089929a Try to fix #22040 2025-08-23 19:47:54 -07:00
5 changed files with 59 additions and 31 deletions

View File

@@ -1,7 +1,7 @@
import { define } from "../../codegen/class-definitions";
import { type ClassDefinition, define } from "../../codegen/class-definitions";
const types = ["PostgresSQL", "MySQL"];
const classes = [];
const classes: ClassDefinition[] = [];
for (const type of types) {
classes.push(
define({

View File

@@ -6,4 +6,16 @@ pub const ConnectionState = enum {
authentication_awaiting_pk,
connected,
failed,
pub fn hasPendingActivity(this: ConnectionState) bool {
return switch (this) {
.connected => false,
.connecting => true,
.handshaking => true,
.authenticating => true,
.authentication_awaiting_pk => true,
.failed => false,
.disconnected => false,
};
}
};

View File

@@ -127,9 +127,17 @@ pub fn hasPendingActivity(this: *MySQLConnection) bool {
}
fn updateHasPendingActivity(this: *MySQLConnection) void {
const a: u32 = if (this.requests.readableLength() > 0) 1 else 0;
const b: u32 = if (this.status != .disconnected) 1 else 0;
this.pending_activity_count.store(a + b, .release);
var counter: u32 = 0;
if (this.status.hasPendingActivity()) {
counter += 1;
}
if (this.hasDataToSend()) {
counter += 1;
}
if (this.read_buffer.len() > 0) {
counter += 1;
}
this.pending_activity_count.store(counter, .release);
}
fn hasDataToSend(this: *@This()) bool {
@@ -927,9 +935,8 @@ pub fn call(globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JS
}
}
ptr.setStatus(.connecting);
ptr.updateHasPendingActivity();
ptr.updateRef();
ptr.resetConnectionTimeout();
ptr.poll_ref.ref(vm);
const js_value = ptr.toJS(globalObject);
js_value.ensureStillAlive();
ptr.js_value = js_value;
@@ -974,8 +981,7 @@ pub fn onOpen(this: *MySQLConnection, socket: Socket) void {
this.resetConnectionTimeout();
this.socket = socket;
this.setStatus(.handshaking);
this.poll_ref.ref(this.vm);
this.updateHasPendingActivity();
this.updateRef();
}
pub fn onHandshake(this: *MySQLConnection, success: i32, ssl_error: uws.us_bun_verify_error_t) void {
@@ -1021,12 +1027,8 @@ pub fn onData(this: *MySQLConnection, data: []const u8) void {
this.socket.setTimeout(0);
defer {
if (this.status == .connected and this.requests.readableLength() == 0 and this.write_buffer.remaining().len == 0) {
// Don't keep the process alive when there's nothixng to do.
this.poll_ref.unref(vm);
} else if (this.status == .connected) {
// Keep the process alive if there's something to do.
this.poll_ref.ref(vm);
if (this.status == .connected) {
this.updateRef();
}
// reset the connection timeout after we're done processing the data
this.flags.is_processing_data = false;
@@ -1226,7 +1228,7 @@ pub fn consumeOnCloseCallback(this: *const @This(), globalObject: *jsc.JSGlobalO
pub fn setStatus(this: *@This(), status: ConnectionState) void {
if (this.status == status) return;
defer this.updateHasPendingActivity();
defer this.updateRef();
this.status = status;
this.resetConnectionTimeout();
@@ -1238,7 +1240,6 @@ pub fn setStatus(this: *@This(), status: ConnectionState) void {
const js_value = this.js_value;
js_value.ensureStillAlive();
this.globalObject.queueMicrotask(on_connect, &[_]JSValue{ JSValue.jsNull(), js_value });
this.poll_ref.unref(this.vm);
},
else => {},
}

View File

@@ -246,14 +246,24 @@ pub fn hasPendingActivity(this: *PostgresSQLConnection) bool {
}
fn updateHasPendingActivity(this: *PostgresSQLConnection) void {
const a: u32 = if (this.requests.readableLength() > 0) 1 else 0;
const b: u32 = if (this.status != .disconnected) 1 else 0;
this.pending_activity_count.store(a + b, .release);
var counter: u32 = 0;
if (this.status.hasPendingActivity()) {
counter += 1;
}
if (this.hasQueryRunning()) {
counter += 1;
}
if (this.read_buffer.len() > 0 or this.write_buffer.len() > 0) {
counter += 1;
}
this.pending_activity_count.store(counter, .release);
}
pub fn setStatus(this: *PostgresSQLConnection, status: Status) void {
if (this.status == status) return;
defer this.updateHasPendingActivity();
defer {
this.updateRef();
}
this.status = status;
this.resetConnectionTimeout();
@@ -265,7 +275,6 @@ pub fn setStatus(this: *PostgresSQLConnection, status: Status) void {
const js_value = this.js_value;
js_value.ensureStillAlive();
this.globalObject.queueMicrotask(on_connect, &[_]JSValue{ JSValue.jsNull(), js_value });
this.poll_ref.unref(this.vm);
},
else => {},
}
@@ -409,8 +418,7 @@ fn startTLS(this: *PostgresSQLConnection, socket: uws.AnySocket) void {
pub fn onOpen(this: *PostgresSQLConnection, socket: uws.AnySocket) void {
this.socket = socket;
this.poll_ref.ref(this.vm);
this.updateHasPendingActivity();
this.updateRef();
if (this.tls_status == .message_sent or this.tls_status == .pending) {
this.startTLS(socket);
@@ -462,6 +470,9 @@ pub fn onTimeout(this: *PostgresSQLConnection) void {
pub fn onDrain(this: *PostgresSQLConnection) void {
debug("onDrain", .{});
this.ref();
defer this.deref();
defer this.updateRef();
this.flags.has_backpressure = false;
// Don't send any other messages while we're waiting for TLS.
if (this.tls_status == .message_sent) {
@@ -499,13 +510,7 @@ pub fn onData(this: *PostgresSQLConnection, data: []const u8) void {
this.disableConnectionTimeout();
defer {
if (this.status == .connected and !this.hasQueryRunning() and this.write_buffer.remaining().len == 0) {
// Don't keep the process alive when there's nothing to do.
this.poll_ref.unref(vm);
} else if (this.status == .connected) {
// Keep the process alive if there's something to do.
this.poll_ref.ref(vm);
}
this.updateRef();
this.flags.is_processing_data = false;
// reset the connection timeout after we're done processing the data

View File

@@ -6,4 +6,14 @@ pub const Status = enum {
sent_startup_message,
connected,
failed,
pub fn hasPendingActivity(this: Status) bool {
return switch (this) {
.connected => false,
.connecting => true,
.sent_startup_message => true,
.failed => false,
.disconnected => false,
};
}
};