diff --git a/src/sql/postgres/AnyPostgresError.zig b/src/sql/postgres/AnyPostgresError.zig index e76fd4c02c..8be278832b 100644 --- a/src/sql/postgres/AnyPostgresError.zig +++ b/src/sql/postgres/AnyPostgresError.zig @@ -60,7 +60,7 @@ pub fn createPostgresError( message: []const u8, options: PostgresErrorOptions, ) bun.JSError!JSValue { - const opts_obj = JSValue.createEmptyObject(globalObject, 18); + const opts_obj = JSValue.createEmptyObject(globalObject, 0); opts_obj.ensureStillAlive(); opts_obj.put(globalObject, jsc.ZigString.static("code"), try bun.String.createUTF8ForJS(globalObject, options.code)); inline for (std.meta.fields(PostgresErrorOptions)) |field| { diff --git a/src/sql/postgres/PostgresSQLConnection.zig b/src/sql/postgres/PostgresSQLConnection.zig index a7422f532f..be6f899198 100644 --- a/src/sql/postgres/PostgresSQLConnection.zig +++ b/src/sql/postgres/PostgresSQLConnection.zig @@ -326,7 +326,7 @@ pub fn failWithJSValue(this: *PostgresSQLConnection, value: JSValue) void { this.globalObject, this.js_value, &[_]JSValue{ - value, + value.toError() orelse value, this.getQueriesArray(), }, ) catch |e| this.globalObject.reportActiveExceptionAsUnhandled(e); @@ -484,7 +484,7 @@ fn drainInternal(this: *PostgresSQLConnection) void { this.flushData(); - if (!this.flags.has_backpressure) { + if (!this.flags.has_backpressure and this.flags.is_ready_for_query) { // no backpressure yet so pipeline more if possible and flush again this.advance(); this.flushData(); @@ -929,6 +929,7 @@ fn cleanUpRequests(this: *@This(), js_reason: ?jsc.JSValue) void { .running, .partial_response, => { + this.finishRequest(request); if (!this.vm.isShuttingDown()) { if (js_reason) |reason| { request.onJSError(reason, this.globalObject); @@ -1066,15 +1067,23 @@ pub fn bufferedReader(this: *PostgresSQLConnection) protocol.NewReader(Reader) { }; } -fn cleanupSuccessQuery(this: *PostgresSQLConnection, item: *PostgresSQLQuery) void { - if (item.flags.simple) { - this.nonpipelinable_requests -= 1; - } else if (item.flags.pipelined) { - this.pipelined_requests -= 1; - } else if (this.flags.waiting_to_prepare) { - this.flags.waiting_to_prepare = false; +fn finishRequest(this: *@This(), item: *PostgresSQLQuery) void { + switch (item.status) { + .running, .binding, .partial_response => { + if (item.flags.simple) { + this.nonpipelinable_requests -= 1; + } else if (item.flags.pipelined) { + this.pipelined_requests -= 1; + } + }, + .success, .fail, .pending => {}, } } + +pub fn canPrepareQuery(noalias this: *const @This()) bool { + return this.flags.is_ready_for_query and !this.flags.waiting_to_prepare and this.pipelined_requests == 0; +} + fn advance(this: *PostgresSQLConnection) void { var offset: usize = 0; debug("advance", .{}); @@ -1085,7 +1094,6 @@ fn advance(this: *PostgresSQLConnection) void { // so we do the cleanup her switch (result.status) { .success => { - this.cleanupSuccessQuery(result); result.deref(); this.requests.discard(1); continue; @@ -1115,7 +1123,11 @@ fn advance(this: *PostgresSQLConnection) void { defer query_str.deinit(); debug("execute simple query: {s}", .{query_str.slice()}); PostgresRequest.executeQuery(query_str.slice(), PostgresSQLConnection.Writer, this.writer()) catch |err| { - req.onWriteFail(err, this.globalObject, this.getQueriesArray()); + if (this.globalObject.tryTakeException()) |err_| { + req.onJSError(err_, this.globalObject); + } else { + req.onWriteFail(err, this.globalObject, this.getQueriesArray()); + } if (offset == 0) { req.deref(); this.requests.discard(1); @@ -1131,39 +1143,12 @@ fn advance(this: *PostgresSQLConnection) void { req.status = .running; return; } else { - const stmt = req.statement orelse { - debug("stmt is not set yet waiting it to RUN before actually doing anything", .{}); - // statement is not set yet waiting it to RUN before actually doing anything - offset += 1; - continue; - }; - - switch (stmt.status) { - .failed => { - debug("stmt failed", .{}); - bun.assert(stmt.error_response != null); - if (req.flags.simple) { - this.nonpipelinable_requests -= 1; - } else if (req.flags.pipelined) { - this.pipelined_requests -= 1; - } else if (this.flags.waiting_to_prepare) { - this.flags.waiting_to_prepare = false; - } - req.onError(stmt.error_response.?, this.globalObject); - if (offset == 0) { - req.deref(); - this.requests.discard(1); - } else { - // deinit later - req.status = .fail; - offset += 1; - } - - continue; - }, - .prepared => { - const thisValue = req.thisValue.tryGet() orelse { - bun.assertf(false, "query value was freed earlier than expected", .{}); + if (req.statement) |statement| { + switch (statement.status) { + .failed => { + debug("stmt failed", .{}); + bun.assert(statement.error_response != null); + req.onError(statement.error_response.?, this.globalObject); if (offset == 0) { req.deref(); this.requests.discard(1); @@ -1172,51 +1157,10 @@ fn advance(this: *PostgresSQLConnection) void { req.status = .fail; offset += 1; } - continue; - }; - const binding_value = PostgresSQLQuery.js.bindingGetCached(thisValue) orelse .zero; - const columns_value = PostgresSQLQuery.js.columnsGetCached(thisValue) orelse .zero; - req.flags.binary = stmt.fields.len > 0; - debug("binding and executing stmt", .{}); - PostgresRequest.bindAndExecute(this.globalObject, stmt, binding_value, columns_value, PostgresSQLConnection.Writer, this.writer()) catch |err| { - req.onWriteFail(err, this.globalObject, this.getQueriesArray()); - if (offset == 0) { - req.deref(); - this.requests.discard(1); - } else { - // deinit later - req.status = .fail; - offset += 1; - } - debug("bind and execute failed: {s}", .{@errorName(err)}); - continue; - }; - this.flags.is_ready_for_query = false; - req.status = .binding; - if (this.flags.use_unnamed_prepared_statements or !this.canPipeline()) { - debug("cannot pipeline more stmt", .{}); - return; - } - debug("pipelining more stmt", .{}); - // we can pipeline it - this.pipelined_requests += 1; - req.flags.pipelined = true; - offset += 1; - continue; - }, - .pending => { - if (this.pipelined_requests > 0 or !this.flags.is_ready_for_query) { - debug("need to wait to finish the pipeline before starting a new query preparation", .{}); - // need to wait to finish the pipeline before starting a new query preparation - return; - } - // statement is pending, lets write/parse it - var query_str = req.query.toUTF8(bun.default_allocator); - defer query_str.deinit(); - const has_params = stmt.signature.fields.len > 0; - // If it does not have params, we can write and execute immediately in one go - if (!has_params) { + continue; + }, + .prepared => { const thisValue = req.thisValue.tryGet() orelse { bun.assertf(false, "query value was freed earlier than expected", .{}); if (offset == 0) { @@ -1229,77 +1173,158 @@ fn advance(this: *PostgresSQLConnection) void { } continue; }; - // prepareAndQueryWithSignature will write + bind + execute, it will change to running after binding is complete const binding_value = PostgresSQLQuery.js.bindingGetCached(thisValue) orelse .zero; - debug("prepareAndQueryWithSignature", .{}); - PostgresRequest.prepareAndQueryWithSignature(this.globalObject, query_str.slice(), binding_value, PostgresSQLConnection.Writer, this.writer(), &stmt.signature) catch |err| { - stmt.status = .failed; - stmt.error_response = .{ .postgres_error = err }; - req.onWriteFail(err, this.globalObject, this.getQueriesArray()); + const columns_value = PostgresSQLQuery.js.columnsGetCached(thisValue) orelse .zero; + req.flags.binary = statement.fields.len > 0; + debug("binding and executing stmt", .{}); + PostgresRequest.bindAndExecute(this.globalObject, statement, binding_value, columns_value, PostgresSQLConnection.Writer, this.writer()) catch |err| { + if (this.globalObject.tryTakeException()) |err_| { + req.onJSError(err_, this.globalObject); + } else { + req.onWriteFail(err, this.globalObject, this.getQueriesArray()); + } if (offset == 0) { req.deref(); this.requests.discard(1); } else { // deinit later req.status = .fail; + offset += 1; } - debug("prepareAndQueryWithSignature failed: {s}", .{@errorName(err)}); - + debug("bind and execute failed: {s}", .{@errorName(err)}); continue; }; - this.flags.waiting_to_prepare = true; + this.flags.is_ready_for_query = false; req.status = .binding; - stmt.status = .parsing; + req.flags.pipelined = true; + this.pipelined_requests += 1; + if (this.flags.use_unnamed_prepared_statements or !this.canPipeline()) { + debug("cannot pipeline more stmt", .{}); + return; + } + + offset += 1; + continue; + }, + .pending => { + if (!this.canPrepareQuery()) { + debug("need to wait to finish the pipeline before starting a new query preparation", .{}); + // need to wait to finish the pipeline before starting a new query preparation + return; + } + // statement is pending, lets write/parse it + var query_str = req.query.toUTF8(bun.default_allocator); + defer query_str.deinit(); + const has_params = statement.signature.fields.len > 0; + // If it does not have params, we can write and execute immediately in one go + if (!has_params) { + const thisValue = req.thisValue.tryGet() orelse { + bun.assertf(false, "query value was freed earlier than expected", .{}); + if (offset == 0) { + req.deref(); + this.requests.discard(1); + } else { + // deinit later + req.status = .fail; + offset += 1; + } + continue; + }; + // prepareAndQueryWithSignature will write + bind + execute, it will change to running after binding is complete + const binding_value = PostgresSQLQuery.js.bindingGetCached(thisValue) orelse .zero; + debug("prepareAndQueryWithSignature", .{}); + PostgresRequest.prepareAndQueryWithSignature(this.globalObject, query_str.slice(), binding_value, PostgresSQLConnection.Writer, this.writer(), &statement.signature) catch |err| { + if (this.globalObject.tryTakeException()) |err_| { + req.onJSError(err_, this.globalObject); + } else { + statement.status = .failed; + statement.error_response = .{ .postgres_error = err }; + req.onWriteFail(err, this.globalObject, this.getQueriesArray()); + } + if (offset == 0) { + req.deref(); + this.requests.discard(1); + } else { + // deinit later + req.status = .fail; + } + debug("prepareAndQueryWithSignature failed: {s}", .{@errorName(err)}); + + continue; + }; + this.flags.is_ready_for_query = false; + this.flags.waiting_to_prepare = true; + req.status = .binding; + statement.status = .parsing; + this.flushDataAndResetTimeout(); + return; + } + + const connection_writer = this.writer(); + debug("writing query", .{}); + // write query and wait for it to be prepared + PostgresRequest.writeQuery(query_str.slice(), statement.signature.prepared_statement_name, statement.signature.fields, PostgresSQLConnection.Writer, connection_writer) catch |err| { + if (this.globalObject.tryTakeException()) |err_| { + req.onJSError(err_, this.globalObject); + } else { + statement.error_response = .{ .postgres_error = err }; + statement.status = .failed; + req.onWriteFail(err, this.globalObject, this.getQueriesArray()); + } + bun.assert(offset == 0); + req.deref(); + this.requests.discard(1); + debug("write query failed: {s}", .{@errorName(err)}); + continue; + }; + connection_writer.write(&protocol.Sync) catch |err| { + if (this.globalObject.tryTakeException()) |err_| { + req.onJSError(err_, this.globalObject); + } else { + statement.error_response = .{ .postgres_error = err }; + statement.status = .failed; + req.onWriteFail(err, this.globalObject, this.getQueriesArray()); + } + bun.assert(offset == 0); + req.deref(); + this.requests.discard(1); + debug("write query (sync) failed: {s}", .{@errorName(err)}); + continue; + }; + this.flags.is_ready_for_query = false; + this.flags.waiting_to_prepare = true; + statement.status = .parsing; + this.flushDataAndResetTimeout(); return; - } - - const connection_writer = this.writer(); - debug("writing query", .{}); - // write query and wait for it to be prepared - PostgresRequest.writeQuery(query_str.slice(), stmt.signature.prepared_statement_name, stmt.signature.fields, PostgresSQLConnection.Writer, connection_writer) catch |err| { - stmt.error_response = .{ .postgres_error = err }; - stmt.status = .failed; - - req.onWriteFail(err, this.globalObject, this.getQueriesArray()); - bun.assert(offset == 0); - req.deref(); - this.requests.discard(1); - debug("write query failed: {s}", .{@errorName(err)}); + }, + .parsing => { + // we are still parsing, lets wait for it to be prepared or failed + offset += 1; continue; - }; - connection_writer.write(&protocol.Sync) catch |err| { - stmt.error_response = .{ .postgres_error = err }; - stmt.status = .failed; - - req.onWriteFail(err, this.globalObject, this.getQueriesArray()); - bun.assert(offset == 0); - req.deref(); - this.requests.discard(1); - debug("write query (sync) failed: {s}", .{@errorName(err)}); - continue; - }; - this.flags.is_ready_for_query = false; - stmt.status = .parsing; - this.flags.waiting_to_prepare = true; - return; - }, - .parsing => { - // we are still parsing, lets wait for it to be prepared or failed - return; - }, + }, + } + } else { + offset += 1; + continue; } } }, .running, .binding, .partial_response => { - // if we are binding it will switch to running immediately - // if we are running, we need to wait for it to be success or fail - return; + if (this.flags.waiting_to_prepare or this.nonpipelinable_requests > 0) { + return; + } + const total_requests_running = this.pipelined_requests; + if (offset < total_requests_running) { + offset += total_requests_running; + } else { + offset += 1; + } + continue; }, .success => { - this.cleanupSuccessQuery(req); if (offset > 0) { // deinit later req.status = .fail; @@ -1427,12 +1452,14 @@ pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_litera try ready_for_query.decodeInternal(Context, reader); this.setStatus(.connected); + this.flags.waiting_to_prepare = false; this.flags.is_ready_for_query = true; this.socket.setTimeout(300); defer this.updateRef(); if (this.current()) |request| { if (request.status == .partial_response) { + this.finishRequest(request); // if is a partial response, just signal that the query is now complete request.onResult("", this.globalObject, this.js_value, true); } @@ -1695,7 +1722,6 @@ pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_litera defer { err.deinit(); } - this.failWithJSValue(err.toJS(this.globalObject)); // it shouldn't enqueue any requests while connecting @@ -1723,8 +1749,9 @@ pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_litera } } } - this.updateRef(); + this.finishRequest(request); + this.updateRef(); request.onError(.{ .protocol = err }, this.globalObject); }, .PortalSuspended => { @@ -1737,11 +1764,7 @@ pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_litera try reader.eatMessage(protocol.CloseComplete); var request = this.current() orelse return error.ExpectedRequest; defer this.updateRef(); - if (request.flags.simple) { - request.onResult("CLOSECOMPLETE", this.globalObject, this.js_value, false); - } else { - request.onResult("CLOSECOMPLETE", this.globalObject, this.js_value, true); - } + request.onResult("CLOSECOMPLETE", this.globalObject, this.js_value, false); }, .CopyInResponse => { debug("TODO CopyInResponse", .{}); @@ -1757,11 +1780,7 @@ pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.enum_litera try reader.eatMessage(protocol.EmptyQueryResponse); var request = this.current() orelse return error.ExpectedRequest; defer this.updateRef(); - if (request.flags.simple) { - request.onResult("", this.globalObject, this.js_value, false); - } else { - request.onResult("", this.globalObject, this.js_value, true); - } + request.onResult("", this.globalObject, this.js_value, false); }, .CopyOutResponse => { debug("TODO CopyOutResponse", .{}); diff --git a/src/sql/postgres/PostgresSQLQuery.zig b/src/sql/postgres/PostgresSQLQuery.zig index 35b1af4906..9860b0273a 100644 --- a/src/sql/postgres/PostgresSQLQuery.zig +++ b/src/sql/postgres/PostgresSQLQuery.zig @@ -94,9 +94,10 @@ pub fn onWriteFail( const vm = jsc.VirtualMachine.get(); const function = vm.rareData().postgresql_context.onQueryRejectFn.get().?; const event_loop = vm.eventLoop(); + const js_err = postgresErrorToJS(globalObject, null, err); event_loop.runCallback(function, globalObject, thisValue, &.{ targetValue, - postgresErrorToJS(globalObject, null, err), + js_err.toError() orelse js_err, queries_array, }); } @@ -116,7 +117,7 @@ pub fn onJSError(this: *@This(), err: jsc.JSValue, globalObject: *jsc.JSGlobalOb const event_loop = vm.eventLoop(); event_loop.runCallback(function, globalObject, thisValue, &.{ targetValue, - err, + err.toError() orelse err, }); } pub fn onError(this: *@This(), err: PostgresSQLStatement.Error, globalObject: *jsc.JSGlobalObject) void { diff --git a/test/js/sql/sql.test.ts b/test/js/sql/sql.test.ts index 57740fa7eb..1f063f468a 100644 --- a/test/js/sql/sql.test.ts +++ b/test/js/sql/sql.test.ts @@ -424,6 +424,61 @@ if (isDockerEnabled()) { expect(sql.options.database).toBe("bun@bun"); }); + test("Minimal reproduction of Bun.SQL PostgreSQL hang bug (#22395)", async () => { + for (let i = 0; i < 10; i++) { + await using sql = new SQL({ + ...options, + idleTimeout: 10, + connectionTimeout: 10, + maxLifetime: 10, + }); + + const random_id = randomUUIDv7() + "test_hang"; + // Setup: Create table with exclusion constraint + await sql`DROP TABLE IF EXISTS ${sql(random_id)} CASCADE`; + await sql`CREATE EXTENSION IF NOT EXISTS btree_gist`; + await sql` + CREATE TABLE ${sql(random_id)} ( + id SERIAL PRIMARY KEY, + start_time TIMESTAMPTZ NOT NULL, + end_time TIMESTAMPTZ NOT NULL, + resource_id INT NOT NULL, + EXCLUDE USING gist ( + resource_id WITH =, + tstzrange(start_time, end_time) WITH && + ) + ) + `; + + // Step 1: Insert a row (succeeds) + await sql` + INSERT INTO ${sql(random_id)} (start_time, end_time, resource_id) + VALUES ('2024-01-01 10:00:00', '2024-01-01 12:00:00', 1) + `; + + // Step 2: Try to insert conflicting row (throws expected error) + try { + await sql` + INSERT INTO ${sql(random_id)} (start_time, end_time, resource_id) + VALUES (${"2024-01-01 11:00:00"}, ${"2024-01-01 13:00:00"}, ${1}) + `; + expect.unreachable(); + } catch {} + + // Step 3: Try another query - THIS WILL HANG + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error("TIMEOUT")), 200); + }); + + try { + const result = await Promise.race([sql`SELECT COUNT(*) FROM ${sql(random_id)}`, timeoutPromise]); + expect(result[0].count).toBe("1"); + } catch (err: any) { + expect(err.message).not.toBe("TIMEOUT"); + } + } + }); + test("Connects with no options", async () => { // we need at least the usename and port await using sql = postgres({ max: 1, port: container.port, username: login.username });