fix(Bun.SQL) fix postgres error handling when pipelining and state reset (#22505)

### What does this PR do?
Fixes: https://github.com/oven-sh/bun/issues/22395

### How did you verify your code works?
Test
This commit is contained in:
Ciro Spaciari
2025-09-08 21:00:39 -07:00
committed by GitHub
parent e63608fced
commit 1e4935cf3e
4 changed files with 227 additions and 152 deletions

View File

@@ -60,7 +60,7 @@ pub fn createPostgresError(
message: []const u8,
options: PostgresErrorOptions,
) bun.JSError!JSValue {
const opts_obj = JSValue.createEmptyObject(globalObject, 18);
const opts_obj = JSValue.createEmptyObject(globalObject, 0);
opts_obj.ensureStillAlive();
opts_obj.put(globalObject, jsc.ZigString.static("code"), try bun.String.createUTF8ForJS(globalObject, options.code));
inline for (std.meta.fields(PostgresErrorOptions)) |field| {

View File

@@ -326,7 +326,7 @@ pub fn failWithJSValue(this: *PostgresSQLConnection, value: JSValue) void {
this.globalObject,
this.js_value,
&[_]JSValue{
value,
value.toError() orelse value,
this.getQueriesArray(),
},
) catch |e| this.globalObject.reportActiveExceptionAsUnhandled(e);
@@ -484,7 +484,7 @@ fn drainInternal(this: *PostgresSQLConnection) void {
this.flushData();
if (!this.flags.has_backpressure) {
if (!this.flags.has_backpressure and this.flags.is_ready_for_query) {
// no backpressure yet so pipeline more if possible and flush again
this.advance();
this.flushData();
@@ -929,6 +929,7 @@ fn cleanUpRequests(this: *@This(), js_reason: ?jsc.JSValue) void {
.running,
.partial_response,
=> {
this.finishRequest(request);
if (!this.vm.isShuttingDown()) {
if (js_reason) |reason| {
request.onJSError(reason, this.globalObject);
@@ -1066,15 +1067,23 @@ pub fn bufferedReader(this: *PostgresSQLConnection) protocol.NewReader(Reader) {
};
}
fn cleanupSuccessQuery(this: *PostgresSQLConnection, item: *PostgresSQLQuery) void {
if (item.flags.simple) {
this.nonpipelinable_requests -= 1;
} else if (item.flags.pipelined) {
this.pipelined_requests -= 1;
} else if (this.flags.waiting_to_prepare) {
this.flags.waiting_to_prepare = false;
fn finishRequest(this: *@This(), item: *PostgresSQLQuery) void {
switch (item.status) {
.running, .binding, .partial_response => {
if (item.flags.simple) {
this.nonpipelinable_requests -= 1;
} else if (item.flags.pipelined) {
this.pipelined_requests -= 1;
}
},
.success, .fail, .pending => {},
}
}
pub fn canPrepareQuery(noalias this: *const @This()) bool {
return this.flags.is_ready_for_query and !this.flags.waiting_to_prepare and this.pipelined_requests == 0;
}
fn advance(this: *PostgresSQLConnection) void {
var offset: usize = 0;
debug("advance", .{});
@@ -1085,7 +1094,6 @@ fn advance(this: *PostgresSQLConnection) void {
// so we do the cleanup her
switch (result.status) {
.success => {
this.cleanupSuccessQuery(result);
result.deref();
this.requests.discard(1);
continue;
@@ -1115,7 +1123,11 @@ fn advance(this: *PostgresSQLConnection) void {
defer query_str.deinit();
debug("execute simple query: {s}", .{query_str.slice()});
PostgresRequest.executeQuery(query_str.slice(), PostgresSQLConnection.Writer, this.writer()) catch |err| {
req.onWriteFail(err, this.globalObject, this.getQueriesArray());
if (this.globalObject.tryTakeException()) |err_| {
req.onJSError(err_, this.globalObject);
} else {
req.onWriteFail(err, this.globalObject, this.getQueriesArray());
}
if (offset == 0) {
req.deref();
this.requests.discard(1);
@@ -1131,39 +1143,12 @@ fn advance(this: *PostgresSQLConnection) void {
req.status = .running;
return;
} else {
const stmt = req.statement orelse {
debug("stmt is not set yet waiting it to RUN before actually doing anything", .{});
// statement is not set yet waiting it to RUN before actually doing anything
offset += 1;
continue;
};
switch (stmt.status) {
.failed => {
debug("stmt failed", .{});
bun.assert(stmt.error_response != null);
if (req.flags.simple) {
this.nonpipelinable_requests -= 1;
} else if (req.flags.pipelined) {
this.pipelined_requests -= 1;
} else if (this.flags.waiting_to_prepare) {
this.flags.waiting_to_prepare = false;
}
req.onError(stmt.error_response.?, this.globalObject);
if (offset == 0) {
req.deref();
this.requests.discard(1);
} else {
// deinit later
req.status = .fail;
offset += 1;
}
continue;
},
.prepared => {
const thisValue = req.thisValue.tryGet() orelse {
bun.assertf(false, "query value was freed earlier than expected", .{});
if (req.statement) |statement| {
switch (statement.status) {
.failed => {
debug("stmt failed", .{});
bun.assert(statement.error_response != null);
req.onError(statement.error_response.?, this.globalObject);
if (offset == 0) {
req.deref();
this.requests.discard(1);
@@ -1172,51 +1157,10 @@ fn advance(this: *PostgresSQLConnection) void {
req.status = .fail;
offset += 1;
}
continue;
};
const binding_value = PostgresSQLQuery.js.bindingGetCached(thisValue) orelse .zero;
const columns_value = PostgresSQLQuery.js.columnsGetCached(thisValue) orelse .zero;
req.flags.binary = stmt.fields.len > 0;
debug("binding and executing stmt", .{});
PostgresRequest.bindAndExecute(this.globalObject, stmt, binding_value, columns_value, PostgresSQLConnection.Writer, this.writer()) catch |err| {
req.onWriteFail(err, this.globalObject, this.getQueriesArray());
if (offset == 0) {
req.deref();
this.requests.discard(1);
} else {
// deinit later
req.status = .fail;
offset += 1;
}
debug("bind and execute failed: {s}", .{@errorName(err)});
continue;
};
this.flags.is_ready_for_query = false;
req.status = .binding;
if (this.flags.use_unnamed_prepared_statements or !this.canPipeline()) {
debug("cannot pipeline more stmt", .{});
return;
}
debug("pipelining more stmt", .{});
// we can pipeline it
this.pipelined_requests += 1;
req.flags.pipelined = true;
offset += 1;
continue;
},
.pending => {
if (this.pipelined_requests > 0 or !this.flags.is_ready_for_query) {
debug("need to wait to finish the pipeline before starting a new query preparation", .{});
// need to wait to finish the pipeline before starting a new query preparation
return;
}
// statement is pending, lets write/parse it
var query_str = req.query.toUTF8(bun.default_allocator);
defer query_str.deinit();
const has_params = stmt.signature.fields.len > 0;
// If it does not have params, we can write and execute immediately in one go
if (!has_params) {
continue;
},
.prepared => {
const thisValue = req.thisValue.tryGet() orelse {
bun.assertf(false, "query value was freed earlier than expected", .{});
if (offset == 0) {
@@ -1229,77 +1173,158 @@ fn advance(this: *PostgresSQLConnection) void {
}
continue;
};
// prepareAndQueryWithSignature will write + bind + execute, it will change to running after binding is complete
const binding_value = PostgresSQLQuery.js.bindingGetCached(thisValue) orelse .zero;
debug("prepareAndQueryWithSignature", .{});
PostgresRequest.prepareAndQueryWithSignature(this.globalObject, query_str.slice(), binding_value, PostgresSQLConnection.Writer, this.writer(), &stmt.signature) catch |err| {
stmt.status = .failed;
stmt.error_response = .{ .postgres_error = err };
req.onWriteFail(err, this.globalObject, this.getQueriesArray());
const columns_value = PostgresSQLQuery.js.columnsGetCached(thisValue) orelse .zero;
req.flags.binary = statement.fields.len > 0;
debug("binding and executing stmt", .{});
PostgresRequest.bindAndExecute(this.globalObject, statement, binding_value, columns_value, PostgresSQLConnection.Writer, this.writer()) catch |err| {
if (this.globalObject.tryTakeException()) |err_| {
req.onJSError(err_, this.globalObject);
} else {
req.onWriteFail(err, this.globalObject, this.getQueriesArray());
}
if (offset == 0) {
req.deref();
this.requests.discard(1);
} else {
// deinit later
req.status = .fail;
offset += 1;
}
debug("prepareAndQueryWithSignature failed: {s}", .{@errorName(err)});
debug("bind and execute failed: {s}", .{@errorName(err)});
continue;
};
this.flags.waiting_to_prepare = true;
this.flags.is_ready_for_query = false;
req.status = .binding;
stmt.status = .parsing;
req.flags.pipelined = true;
this.pipelined_requests += 1;
if (this.flags.use_unnamed_prepared_statements or !this.canPipeline()) {
debug("cannot pipeline more stmt", .{});
return;
}
offset += 1;
continue;
},
.pending => {
if (!this.canPrepareQuery()) {
debug("need to wait to finish the pipeline before starting a new query preparation", .{});
// need to wait to finish the pipeline before starting a new query preparation
return;
}
// statement is pending, lets write/parse it
var query_str = req.query.toUTF8(bun.default_allocator);
defer query_str.deinit();
const has_params = statement.signature.fields.len > 0;
// If it does not have params, we can write and execute immediately in one go
if (!has_params) {
const thisValue = req.thisValue.tryGet() orelse {
bun.assertf(false, "query value was freed earlier than expected", .{});
if (offset == 0) {
req.deref();
this.requests.discard(1);
} else {
// deinit later
req.status = .fail;
offset += 1;
}
continue;
};
// prepareAndQueryWithSignature will write + bind + execute, it will change to running after binding is complete
const binding_value = PostgresSQLQuery.js.bindingGetCached(thisValue) orelse .zero;
debug("prepareAndQueryWithSignature", .{});
PostgresRequest.prepareAndQueryWithSignature(this.globalObject, query_str.slice(), binding_value, PostgresSQLConnection.Writer, this.writer(), &statement.signature) catch |err| {
if (this.globalObject.tryTakeException()) |err_| {
req.onJSError(err_, this.globalObject);
} else {
statement.status = .failed;
statement.error_response = .{ .postgres_error = err };
req.onWriteFail(err, this.globalObject, this.getQueriesArray());
}
if (offset == 0) {
req.deref();
this.requests.discard(1);
} else {
// deinit later
req.status = .fail;
}
debug("prepareAndQueryWithSignature failed: {s}", .{@errorName(err)});
continue;
};
this.flags.is_ready_for_query = false;
this.flags.waiting_to_prepare = true;
req.status = .binding;
statement.status = .parsing;
this.flushDataAndResetTimeout();
return;
}
const connection_writer = this.writer();
debug("writing query", .{});
// write query and wait for it to be prepared
PostgresRequest.writeQuery(query_str.slice(), statement.signature.prepared_statement_name, statement.signature.fields, PostgresSQLConnection.Writer, connection_writer) catch |err| {
if (this.globalObject.tryTakeException()) |err_| {
req.onJSError(err_, this.globalObject);
} else {
statement.error_response = .{ .postgres_error = err };
statement.status = .failed;
req.onWriteFail(err, this.globalObject, this.getQueriesArray());
}
bun.assert(offset == 0);
req.deref();
this.requests.discard(1);
debug("write query failed: {s}", .{@errorName(err)});
continue;
};
connection_writer.write(&protocol.Sync) catch |err| {
if (this.globalObject.tryTakeException()) |err_| {
req.onJSError(err_, this.globalObject);
} else {
statement.error_response = .{ .postgres_error = err };
statement.status = .failed;
req.onWriteFail(err, this.globalObject, this.getQueriesArray());
}
bun.assert(offset == 0);
req.deref();
this.requests.discard(1);
debug("write query (sync) failed: {s}", .{@errorName(err)});
continue;
};
this.flags.is_ready_for_query = false;
this.flags.waiting_to_prepare = true;
statement.status = .parsing;
this.flushDataAndResetTimeout();
return;
}
const connection_writer = this.writer();
debug("writing query", .{});
// write query and wait for it to be prepared
PostgresRequest.writeQuery(query_str.slice(), stmt.signature.prepared_statement_name, stmt.signature.fields, PostgresSQLConnection.Writer, connection_writer) catch |err| {
stmt.error_response = .{ .postgres_error = err };
stmt.status = .failed;
req.onWriteFail(err, this.globalObject, this.getQueriesArray());
bun.assert(offset == 0);
req.deref();
this.requests.discard(1);
debug("write query failed: {s}", .{@errorName(err)});
},
.parsing => {
// we are still parsing, lets wait for it to be prepared or failed
offset += 1;
continue;
};
connection_writer.write(&protocol.Sync) catch |err| {
stmt.error_response = .{ .postgres_error = err };
stmt.status = .failed;
req.onWriteFail(err, this.globalObject, this.getQueriesArray());
bun.assert(offset == 0);
req.deref();
this.requests.discard(1);
debug("write query (sync) failed: {s}", .{@errorName(err)});
continue;
};
this.flags.is_ready_for_query = false;
stmt.status = .parsing;
this.flags.waiting_to_prepare = true;
return;
},
.parsing => {
// we are still parsing, lets wait for it to be prepared or failed
return;
},
},
}
} else {
offset += 1;
continue;
}
}
},
.running, .binding, .partial_response => {
// if we are binding it will switch to running immediately
// if we are running, we need to wait for it to be success or fail
return;
if (this.flags.waiting_to_prepare or this.nonpipelinable_requests > 0) {
return;
}
const total_requests_running = this.pipelined_requests;
if (offset < total_requests_running) {
offset += total_requests_running;
} else {
offset += 1;
}
continue;
},
.success => {
this.cleanupSuccessQuery(req);
if (offset > 0) {
// deinit later
req.status = .fail;
@@ -1427,12 +1452,14 @@ pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_litera
try ready_for_query.decodeInternal(Context, reader);
this.setStatus(.connected);
this.flags.waiting_to_prepare = false;
this.flags.is_ready_for_query = true;
this.socket.setTimeout(300);
defer this.updateRef();
if (this.current()) |request| {
if (request.status == .partial_response) {
this.finishRequest(request);
// if is a partial response, just signal that the query is now complete
request.onResult("", this.globalObject, this.js_value, true);
}
@@ -1695,7 +1722,6 @@ pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_litera
defer {
err.deinit();
}
this.failWithJSValue(err.toJS(this.globalObject));
// it shouldn't enqueue any requests while connecting
@@ -1723,8 +1749,9 @@ pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_litera
}
}
}
this.updateRef();
this.finishRequest(request);
this.updateRef();
request.onError(.{ .protocol = err }, this.globalObject);
},
.PortalSuspended => {
@@ -1737,11 +1764,7 @@ pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_litera
try reader.eatMessage(protocol.CloseComplete);
var request = this.current() orelse return error.ExpectedRequest;
defer this.updateRef();
if (request.flags.simple) {
request.onResult("CLOSECOMPLETE", this.globalObject, this.js_value, false);
} else {
request.onResult("CLOSECOMPLETE", this.globalObject, this.js_value, true);
}
request.onResult("CLOSECOMPLETE", this.globalObject, this.js_value, false);
},
.CopyInResponse => {
debug("TODO CopyInResponse", .{});
@@ -1757,11 +1780,7 @@ pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_litera
try reader.eatMessage(protocol.EmptyQueryResponse);
var request = this.current() orelse return error.ExpectedRequest;
defer this.updateRef();
if (request.flags.simple) {
request.onResult("", this.globalObject, this.js_value, false);
} else {
request.onResult("", this.globalObject, this.js_value, true);
}
request.onResult("", this.globalObject, this.js_value, false);
},
.CopyOutResponse => {
debug("TODO CopyOutResponse", .{});

View File

@@ -94,9 +94,10 @@ pub fn onWriteFail(
const vm = jsc.VirtualMachine.get();
const function = vm.rareData().postgresql_context.onQueryRejectFn.get().?;
const event_loop = vm.eventLoop();
const js_err = postgresErrorToJS(globalObject, null, err);
event_loop.runCallback(function, globalObject, thisValue, &.{
targetValue,
postgresErrorToJS(globalObject, null, err),
js_err.toError() orelse js_err,
queries_array,
});
}
@@ -116,7 +117,7 @@ pub fn onJSError(this: *@This(), err: jsc.JSValue, globalObject: *jsc.JSGlobalOb
const event_loop = vm.eventLoop();
event_loop.runCallback(function, globalObject, thisValue, &.{
targetValue,
err,
err.toError() orelse err,
});
}
pub fn onError(this: *@This(), err: PostgresSQLStatement.Error, globalObject: *jsc.JSGlobalObject) void {