diff --git a/src/js/bun/sql.ts b/src/js/bun/sql.ts index bf5604104d..bcfa7865ed 100644 --- a/src/js/bun/sql.ts +++ b/src/js/bun/sql.ts @@ -92,6 +92,7 @@ enum SQLQueryFlags { unsafe = 1 << 1, bigint = 1 << 2, } + function getQueryHandle(query) { let handle = query[_handle]; if (!handle) { @@ -145,21 +146,27 @@ class Query extends PublicPromise { this[_flags] = allowUnsafeTransaction; } - async [_run]() { + async [_run](async: boolean) { const { [_handler]: handler, [_queryStatus]: status } = this; if (status & (QueryStatus.executed | QueryStatus.error | QueryStatus.cancelled | QueryStatus.invalidHandle)) { return; } + this[_queryStatus] |= QueryStatus.executed; const handle = getQueryHandle(this); if (!handle) return this; - this[_queryStatus] |= QueryStatus.executed; - // this avoids a infinite loop - await 1; + if (async) { + await 1; + } - return handler(this, handle); + try { + return handler(this, handle); + } catch (err) { + this[_queryStatus] |= QueryStatus.error; + this.reject(err); + } } get active() { @@ -219,7 +226,7 @@ class Query extends PublicPromise { } execute() { - this[_run](); + this[_run](false); return this; } @@ -238,21 +245,21 @@ class Query extends PublicPromise { } then() { - this[_run](); + this[_run](true); const result = super.$then.$apply(this, arguments); $markPromiseAsHandled(result); return result; } catch() { - this[_run](); + this[_run](true); const result = super.catch.$apply(this, arguments); $markPromiseAsHandled(result); return result; } finally() { - this[_run](); + this[_run](true); return super.finally.$apply(this, arguments); } } @@ -402,11 +409,7 @@ class PooledConnection { this.storedError = null; this.state = PooledConnectionState.pending; // retry connection - this.connection = createConnection( - this.connectionInfo, - this.#onConnected.bind(this, this.connectionInfo), - this.#onClose.bind(this, this.connectionInfo), - ); + this.connection = createConnection(this.connectionInfo, this.#onConnected.bind(this), this.#onClose.bind(this)); } close() { try { @@ -444,9 +447,9 @@ class PooledConnection { default: // we can retry this.#doRetry(); - return true; } } + return true; } } class ConnectionPool { @@ -783,9 +786,9 @@ class ConnectionPool { } else { this.waitingQueue.push(onConnected); } - } else { + } else if (!retry_in_progress) { // impossible to connect or retry - onConnected(storedError, null); + onConnected(storedError ?? connectionClosedError(), null); } return; } @@ -1298,6 +1301,7 @@ function SQL(o, e = {}) { pool.release(pooledConnection); // release the connection back to the pool return query.reject($ERR_POSTGRES_QUERY_CANCELLED("Query cancelled")); } + // bind close event to the query (will unbind and auto release the connection when the query is finished) pooledConnection.bindQuery(query, onQueryDisconnected.bind(query)); handle.run(pooledConnection.connection, query); @@ -1906,7 +1910,7 @@ function SQL(o, e = {}) { // mssql dont have release savepoint await run_internal_transaction_sql(`${RELEASE_SAVEPOINT_COMMAND} ${save_point_name}`); } - if (Array.isArray(result)) { + if ($isArray(result)) { result = await Promise.all(result); } return result; @@ -1952,7 +1956,7 @@ function SQL(o, e = {}) { await run_internal_transaction_sql(BEGIN_COMMAND); needs_rollback = true; let transaction_result = await callback(transaction_sql); - if (Array.isArray(transaction_result)) { + if ($isArray(transaction_result)) { transaction_result = await Promise.all(transaction_result); } // at this point we dont need to rollback anymore diff --git a/src/sql/postgres.zig b/src/sql/postgres.zig index c65375a22b..4ca76484e4 100644 --- a/src/sql/postgres.zig +++ b/src/sql/postgres.zig @@ -45,6 +45,51 @@ pub const AnyPostgresError = error{ UnsupportedIntegerSize, }; +pub fn postgresErrorToJS(globalObject: *JSC.JSGlobalObject, message: ?[]const u8, err: AnyPostgresError) JSValue { + const error_code: JSC.Error = switch (err) { + error.ConnectionClosed => JSC.Error.ERR_POSTGRES_CONNECTION_CLOSED, + error.ExpectedRequest => JSC.Error.ERR_POSTGRES_EXPECTED_REQUEST, + error.ExpectedStatement => JSC.Error.ERR_POSTGRES_EXPECTED_STATEMENT, + error.InvalidBackendKeyData => JSC.Error.ERR_POSTGRES_INVALID_BACKEND_KEY_DATA, + error.InvalidBinaryData => JSC.Error.ERR_POSTGRES_INVALID_BINARY_DATA, + error.InvalidByteSequence => JSC.Error.ERR_POSTGRES_INVALID_BYTE_SEQUENCE, + error.InvalidByteSequenceForEncoding => JSC.Error.ERR_POSTGRES_INVALID_BYTE_SEQUENCE_FOR_ENCODING, + error.InvalidCharacter => JSC.Error.ERR_POSTGRES_INVALID_CHARACTER, + error.InvalidMessage => JSC.Error.ERR_POSTGRES_INVALID_MESSAGE, + error.InvalidMessageLength => JSC.Error.ERR_POSTGRES_INVALID_MESSAGE_LENGTH, + error.InvalidQueryBinding => JSC.Error.ERR_POSTGRES_INVALID_QUERY_BINDING, + error.InvalidServerKey => JSC.Error.ERR_POSTGRES_INVALID_SERVER_KEY, + error.InvalidServerSignature => JSC.Error.ERR_POSTGRES_INVALID_SERVER_SIGNATURE, + error.MultidimensionalArrayNotSupportedYet => JSC.Error.ERR_POSTGRES_MULTIDIMENSIONAL_ARRAY_NOT_SUPPORTED_YET, + error.NullsInArrayNotSupportedYet => JSC.Error.ERR_POSTGRES_NULLS_IN_ARRAY_NOT_SUPPORTED_YET, + error.Overflow => JSC.Error.ERR_POSTGRES_OVERFLOW, + error.PBKDFD2 => JSC.Error.ERR_POSTGRES_AUTHENTICATION_FAILED_PBKDF2, + error.SASL_SIGNATURE_MISMATCH => JSC.Error.ERR_POSTGRES_SASL_SIGNATURE_MISMATCH, + error.SASL_SIGNATURE_INVALID_BASE64 => JSC.Error.ERR_POSTGRES_SASL_SIGNATURE_INVALID_BASE64, + error.TLSNotAvailable => JSC.Error.ERR_POSTGRES_TLS_NOT_AVAILABLE, + error.TLSUpgradeFailed => JSC.Error.ERR_POSTGRES_TLS_UPGRADE_FAILED, + error.UnexpectedMessage => JSC.Error.ERR_POSTGRES_UNEXPECTED_MESSAGE, + error.UNKNOWN_AUTHENTICATION_METHOD => JSC.Error.ERR_POSTGRES_UNKNOWN_AUTHENTICATION_METHOD, + error.UNSUPPORTED_AUTHENTICATION_METHOD => JSC.Error.ERR_POSTGRES_UNSUPPORTED_AUTHENTICATION_METHOD, + error.UnsupportedByteaFormat => JSC.Error.ERR_POSTGRES_UNSUPPORTED_BYTEA_FORMAT, + error.UnsupportedIntegerSize => JSC.Error.ERR_POSTGRES_UNSUPPORTED_INTEGER_SIZE, + error.JSError => { + return globalObject.takeException(error.JSError); + }, + error.OutOfMemory => { + // TODO: add binding for creating an out of memory error? + return globalObject.takeException(globalObject.throwOutOfMemory()); + }, + error.ShortRead => { + bun.unreachablePanic("Assertion failed: ShortRead should be handled by the caller in postgres", .{}); + }, + }; + if (message) |msg| { + return error_code.fmt(globalObject, "{s}", .{msg}); + } + return error_code.fmt(globalObject, "Failed to bind query: {s}", .{@errorName(err)}); +} + pub const SSLMode = enum(u8) { disable = 0, prefer = 1, @@ -207,13 +252,71 @@ pub const PostgresSQLQueryResultMode = enum(u8) { values = 1, raw = 2, }; + +const JSRef = union(enum) { + weak: JSC.JSValue, + strong: JSC.Strong, + + pub fn initWeak(value: JSC.JSValue) @This() { + return .{ .weak = value }; + } + + pub fn initStrong(value: JSC.JSValue, globalThis: *JSC.JSGlobalObject) @This() { + return .{ .strong = JSC.Strong.create(value, globalThis) }; + } + + pub fn empty() @This() { + return .{ .weak = .zero }; + } + + pub fn get(this: *@This()) JSC.JSValue { + return switch (this.*) { + .weak => this.weak, + .strong => this.strong.get() orelse .zero, + }; + } + pub fn setWeak(this: *@This(), value: JSC.JSValue) void { + if (this == .strong) { + this.strong.deinit(); + } + this.* = .{ .weak = value }; + } + + pub fn setStrong(this: *@This(), value: JSC.JSValue, globalThis: *JSC.JSGlobalObject) void { + if (this == .strong) { + this.strong.set(globalThis, value); + return; + } + this.* = .{ .strong = JSC.Strong.create(value, globalThis) }; + } + + pub fn upgrade(this: *@This(), globalThis: *JSC.JSGlobalObject) void { + switch (this.*) { + .weak => { + bun.assert(this.weak != .zero); + this.* = .{ .strong = JSC.Strong.create(this.weak, globalThis) }; + }, + .strong => {}, + } + } + + pub fn deinit(this: *@This()) void { + switch (this.*) { + .weak => { + this.weak = .zero; + }, + .strong => { + this.strong.deinit(); + }, + } + } +}; pub const PostgresSQLQuery = struct { statement: ?*PostgresSQLStatement = null, query: bun.String = bun.String.empty, cursor_name: bun.String = bun.String.empty, - // Kept alive by being in the "queries" array from JS. - thisValue: JSValue = .undefined, + thisValue: JSRef = JSRef.empty(), status: Status = Status.pending, @@ -229,24 +332,29 @@ pub const PostgresSQLQuery = struct { pub usingnamespace JSC.Codegen.JSPostgresSQLQuery; const log = bun.Output.scoped(.PostgresSQLQuery, false); pub fn getTarget(this: *PostgresSQLQuery, globalObject: *JSC.JSGlobalObject) JSC.JSValue { - if (this.thisValue == .zero) { + const thisValue = this.thisValue.get(); + if (thisValue == .zero) { return .zero; } - const target = PostgresSQLQuery.targetGetCached(this.thisValue) orelse return .zero; - PostgresSQLQuery.targetSetCached(this.thisValue, globalObject, .zero); + const target = PostgresSQLQuery.targetGetCached(thisValue) orelse return .zero; + PostgresSQLQuery.targetSetCached(thisValue, globalObject, .zero); return target; } pub const Status = enum(u8) { + /// The query was just enqueued, statement status can be checked for more details pending, - written, - running, + /// The query is being bound to the statement binding, + /// The query is running + running, + /// The query was successful success, + /// The query failed fail, pub fn isRunning(this: Status) bool { - return this == .running or this == .binding; + return @intFromEnum(this) > @intFromEnum(Status.pending) and @intFromEnum(this) < @intFromEnum(Status.success); } }; @@ -255,6 +363,7 @@ pub const PostgresSQLQuery = struct { } pub fn deinit(this: *@This()) void { + this.thisValue.deinit(); if (this.statement) |statement| { statement.deref(); } @@ -265,7 +374,11 @@ pub const PostgresSQLQuery = struct { pub fn finalize(this: *@This()) void { debug("PostgresSQLQuery finalize", .{}); - this.thisValue = .zero; + if (this.thisValue == .weak) { + // clean up if is a weak reference, if is a strong reference we need to wait until the query is done + // if we are a strong reference, here is probably a bug because GC'd should not happen + this.thisValue.weak = .zero; + } this.deref(); } @@ -281,27 +394,6 @@ pub const PostgresSQLQuery = struct { bun.assert(this.ref_count.fetchAdd(1, .monotonic) > 0); } - pub fn onNoData(this: *@This(), globalObject: *JSC.JSGlobalObject, queries_array: JSValue) void { - this.status = .success; - defer this.deref(); - - const thisValue = this.thisValue; - const targetValue = this.getTarget(globalObject); - if (thisValue == .zero or targetValue == .zero) { - return; - } - - const vm = JSC.VirtualMachine.get(); - const function = vm.rareData().postgresql_context.onQueryResolveFn.get().?; - const event_loop = vm.eventLoop(); - event_loop.runCallback(function, globalObject, thisValue, &.{ - targetValue, - this.pending_value.trySwap() orelse .undefined, - JSValue.jsNumber(0), - JSValue.jsNumber(0), - queries_array, - }); - } pub fn onWriteFail( this: *@This(), err: AnyPostgresError, @@ -309,30 +401,29 @@ pub const PostgresSQLQuery = struct { queries_array: JSValue, ) void { this.status = .fail; - const thisValue = this.thisValue; + const thisValue = this.thisValue.get(); + defer this.thisValue.deinit(); const targetValue = this.getTarget(globalObject); if (thisValue == .zero or targetValue == .zero) { return; } - const instance = globalObject.createErrorInstance("Failed to bind query: {s}", .{@errorName(err)}); - - // TODO: error handling const vm = JSC.VirtualMachine.get(); const function = vm.rareData().postgresql_context.onQueryRejectFn.get().?; const event_loop = vm.eventLoop(); event_loop.runCallback(function, globalObject, thisValue, &.{ targetValue, - instance, + postgresErrorToJS(globalObject, null, err), queries_array, }); } - - pub fn onError(this: *@This(), err: protocol.ErrorResponse, globalObject: *JSC.JSGlobalObject) void { + pub fn onJSError(this: *@This(), err: JSC.JSValue, globalObject: *JSC.JSGlobalObject) void { this.status = .fail; + this.ref(); defer this.deref(); - const thisValue = this.thisValue; + const thisValue = this.thisValue.get(); + defer this.thisValue.deinit(); const targetValue = this.getTarget(globalObject); if (thisValue == .zero or targetValue == .zero) { return; @@ -343,9 +434,12 @@ pub const PostgresSQLQuery = struct { const event_loop = vm.eventLoop(); event_loop.runCallback(function, globalObject, thisValue, &.{ targetValue, - err.toJS(globalObject), + err, }); } + pub fn onError(this: *@This(), err: PostgresSQLStatement.Error, globalObject: *JSC.JSGlobalObject) void { + this.onJSError(err.toJS(globalObject), globalObject); + } const CommandTag = union(enum) { // For an INSERT command, the tag is INSERT oid rows, where rows is the @@ -464,9 +558,11 @@ pub const PostgresSQLQuery = struct { pub fn onSuccess(this: *@This(), command_tag_str: []const u8, globalObject: *JSC.JSGlobalObject, connection: JSC.JSValue) void { this.status = .success; + this.ref(); defer this.deref(); - const thisValue = this.thisValue; + const thisValue = this.thisValue.get(); + defer this.thisValue.deinit(); const targetValue = this.getTarget(globalObject); defer allowGC(thisValue, globalObject); if (thisValue == .zero or targetValue == .zero) { @@ -533,7 +629,7 @@ pub const PostgresSQLQuery = struct { ptr.* = .{ .query = query.toBunString(globalThis), - .thisValue = this_value, + .thisValue = JSRef.initWeak(this_value), .flags = .{ .bigint = bigint, }, @@ -604,78 +700,91 @@ pub const PostgresSQLQuery = struct { }; const has_params = signature.fields.len > 0; - var did_write = false; - + var reset_timeout = false; enqueue: { if (entry.found_existing) { this.statement = entry.value_ptr.*; this.statement.?.ref(); signature.deinit(); - if (has_params and this.statement.?.status == .parsing) { + switch (this.statement.?.status) { + .failed => { + // If the statement failed, we need to throw the error + return globalObject.throwValue(this.statement.?.error_response.?.toJS(globalObject)); + }, + .prepared => { + if (!connection.hasQueryRunning()) { + this.flags.binary = this.statement.?.fields.len > 0; + log("bindAndExecute", .{}); + // bindAndExecute will bind + execute, it will change to running after binding is complete + PostgresRequest.bindAndExecute(globalObject, this.statement.?, binding_value, columns_value, PostgresSQLConnection.Writer, writer) catch |err| { + if (!globalObject.hasException()) + return globalObject.throwValue(postgresErrorToJS(globalObject, "failed to bind and execute query", err)); + return error.JSError; + }; + this.status = .binding; - // if it has params, we need to wait for ParamDescription to be received before we can write the data - } else { - this.flags.binary = this.statement.?.fields.len > 0; - log("bindAndExecute", .{}); - PostgresRequest.bindAndExecute(globalObject, this.statement.?, binding_value, columns_value, PostgresSQLConnection.Writer, writer) catch |err| { - if (!globalObject.hasException()) - return globalObject.throwError(err, "failed to bind and execute query"); - return error.JSError; - }; - did_write = true; + reset_timeout = true; + } + }, + .parsing, .pending => {}, } break :enqueue; } - // If it does not have params, we can write and execute immediately in one go - if (!has_params) { - log("prepareAndQueryWithSignature", .{}); + const can_execute = !connection.hasQueryRunning(); - PostgresRequest.prepareAndQueryWithSignature(globalObject, query_str.slice(), binding_value, PostgresSQLConnection.Writer, writer, &signature) catch |err| { - signature.deinit(); - if (!globalObject.hasException()) - return globalObject.throwError(err, "failed to prepare and query"); - return error.JSError; - }; - did_write = true; - } else { - log("writeQuery", .{}); - - PostgresRequest.writeQuery(query_str.slice(), signature.prepared_statement_name, signature.fields, PostgresSQLConnection.Writer, writer) catch |err| { - signature.deinit(); - if (!globalObject.hasException()) - return globalObject.throwError(err, "failed to write query"); - return error.JSError; - }; - writer.write(&protocol.Sync) catch |err| { - signature.deinit(); - if (!globalObject.hasException()) - return globalObject.throwError(err, "failed to flush"); - return error.JSError; - }; + if (can_execute) { + // If it does not have params, we can write and execute immediately in one go + if (!has_params) { + log("prepareAndQueryWithSignature", .{}); + // prepareAndQueryWithSignature will write + bind + execute, it will change to running after binding is complete + PostgresRequest.prepareAndQueryWithSignature(globalObject, query_str.slice(), binding_value, PostgresSQLConnection.Writer, writer, &signature) catch |err| { + signature.deinit(); + if (!globalObject.hasException()) + return globalObject.throwValue(postgresErrorToJS(globalObject, "failed to prepare and query", err)); + return error.JSError; + }; + this.status = .binding; + reset_timeout = true; + } else { + log("writeQuery", .{}); + PostgresRequest.writeQuery(query_str.slice(), signature.prepared_statement_name, signature.fields, PostgresSQLConnection.Writer, writer) catch |err| { + signature.deinit(); + if (!globalObject.hasException()) + return globalObject.throwValue(postgresErrorToJS(globalObject, "failed to write query", err)); + return error.JSError; + }; + writer.write(&protocol.Sync) catch |err| { + signature.deinit(); + if (!globalObject.hasException()) + return globalObject.throwValue(postgresErrorToJS(globalObject, "failed to flush", err)); + return error.JSError; + }; + reset_timeout = true; + } } - { - const stmt = bun.default_allocator.create(PostgresSQLStatement) catch |err| { - return globalObject.throwError(err, "failed to allocate statement"); + const stmt = bun.default_allocator.create(PostgresSQLStatement) catch { + return globalObject.throwOutOfMemory(); }; connection.prepared_statement_id += 1; - stmt.* = .{ .signature = signature, .ref_count = 2, .status = PostgresSQLStatement.Status.parsing }; + stmt.* = .{ .signature = signature, .ref_count = 2, .status = if (can_execute) .parsing else .pending }; this.statement = stmt; entry.value_ptr.* = stmt; } } - - connection.requests.writeItem(this) catch {}; + // We need a strong reference to the query so that it doesn't get GC'd + connection.requests.writeItem(this) catch return globalObject.throwOutOfMemory(); this.ref(); - this.status = if (did_write) .binding else .pending; + this.thisValue.upgrade(globalObject); + PostgresSQLQuery.targetSetCached(this_value, globalObject, query); if (connection.is_ready_for_query) connection.flushDataAndResetTimeout() - else if (did_write) + else if (reset_timeout) connection.resetConnectionTimeout(); return .undefined; @@ -1373,7 +1482,8 @@ pub const PostgresSQLConnection = struct { this.ref(); defer this.deref(); - this.refAndClose(); + // we defer the refAndClose so the on_close will be called first before we reject the pending requests + defer this.refAndClose(value); const on_close = this.consumeOnCloseCallback(this.globalObject) orelse return; const loop = this.globalObject.bunVM().eventLoop(); @@ -1397,47 +1507,8 @@ pub const PostgresSQLConnection = struct { debug("failed: {s}: {s}", .{ message, @errorName(err) }); const globalObject = this.globalObject; - const error_code: JSC.Error = switch (err) { - error.ConnectionClosed => JSC.Error.ERR_POSTGRES_CONNECTION_CLOSED, - error.ExpectedRequest => JSC.Error.ERR_POSTGRES_EXPECTED_REQUEST, - error.ExpectedStatement => JSC.Error.ERR_POSTGRES_EXPECTED_STATEMENT, - error.InvalidBackendKeyData => JSC.Error.ERR_POSTGRES_INVALID_BACKEND_KEY_DATA, - error.InvalidBinaryData => JSC.Error.ERR_POSTGRES_INVALID_BINARY_DATA, - error.InvalidByteSequence => JSC.Error.ERR_POSTGRES_INVALID_BYTE_SEQUENCE, - error.InvalidByteSequenceForEncoding => JSC.Error.ERR_POSTGRES_INVALID_BYTE_SEQUENCE_FOR_ENCODING, - error.InvalidCharacter => JSC.Error.ERR_POSTGRES_INVALID_CHARACTER, - error.InvalidMessage => JSC.Error.ERR_POSTGRES_INVALID_MESSAGE, - error.InvalidMessageLength => JSC.Error.ERR_POSTGRES_INVALID_MESSAGE_LENGTH, - error.InvalidQueryBinding => JSC.Error.ERR_POSTGRES_INVALID_QUERY_BINDING, - error.InvalidServerKey => JSC.Error.ERR_POSTGRES_INVALID_SERVER_KEY, - error.InvalidServerSignature => JSC.Error.ERR_POSTGRES_INVALID_SERVER_SIGNATURE, - error.MultidimensionalArrayNotSupportedYet => JSC.Error.ERR_POSTGRES_MULTIDIMENSIONAL_ARRAY_NOT_SUPPORTED_YET, - error.NullsInArrayNotSupportedYet => JSC.Error.ERR_POSTGRES_NULLS_IN_ARRAY_NOT_SUPPORTED_YET, - error.Overflow => JSC.Error.ERR_POSTGRES_OVERFLOW, - error.PBKDFD2 => JSC.Error.ERR_POSTGRES_AUTHENTICATION_FAILED_PBKDF2, - error.SASL_SIGNATURE_MISMATCH => JSC.Error.ERR_POSTGRES_SASL_SIGNATURE_MISMATCH, - error.SASL_SIGNATURE_INVALID_BASE64 => JSC.Error.ERR_POSTGRES_SASL_SIGNATURE_INVALID_BASE64, - error.TLSNotAvailable => JSC.Error.ERR_POSTGRES_TLS_NOT_AVAILABLE, - error.TLSUpgradeFailed => JSC.Error.ERR_POSTGRES_TLS_UPGRADE_FAILED, - error.UnexpectedMessage => JSC.Error.ERR_POSTGRES_UNEXPECTED_MESSAGE, - error.UNKNOWN_AUTHENTICATION_METHOD => JSC.Error.ERR_POSTGRES_UNKNOWN_AUTHENTICATION_METHOD, - error.UNSUPPORTED_AUTHENTICATION_METHOD => JSC.Error.ERR_POSTGRES_UNSUPPORTED_AUTHENTICATION_METHOD, - error.UnsupportedByteaFormat => JSC.Error.ERR_POSTGRES_UNSUPPORTED_BYTEA_FORMAT, - error.UnsupportedIntegerSize => JSC.Error.ERR_POSTGRES_UNSUPPORTED_INTEGER_SIZE, - error.JSError => { - this.failWithJSValue(globalObject.takeException(error.JSError)); - return; - }, - error.OutOfMemory => { - // TODO: add binding for creating an out of memory error? - this.failWithJSValue(globalObject.takeException(globalObject.throwOutOfMemory())); - return; - }, - error.ShortRead => { - bun.unreachablePanic("Assertion failed: ShortRead should be handled by the caller in postgres", .{}); - }, - }; - this.failWithJSValue(error_code.fmt(globalObject, "{s}", .{message})); + + this.failWithJSValue(postgresErrorToJS(globalObject, message, err)); } pub fn onClose(this: *PostgresSQLConnection) void { @@ -1460,7 +1531,6 @@ pub const PostgresSQLConnection = struct { .options = Data{ .temporary = this.options }, }; msg.writeInternal(Writer, this.writer()) catch |err| { - this.refAndClose(); this.fail("Failed to write startup message", err); }; } @@ -1937,13 +2007,46 @@ pub const PostgresSQLConnection = struct { bun.default_allocator.destroy(this); } - fn refAndClose(this: *@This()) void { + fn refAndClose(this: *@This(), js_reason: ?JSC.JSValue) void { + // refAndClose is always called when we wanna to disconnect or when we are closed + if (!this.socket.isClosed()) { // event loop need to be alive to close the socket this.poll_ref.ref(this.globalObject.bunVM()); // will unref on socket close this.socket.close(); } + + // cleanup requests + while (this.current()) |request| { + switch (request.status) { + // pending we will fail the request and the stmt will be marked as error ConnectionClosed too + .pending => { + const stmt = request.statement orelse continue; + stmt.error_response = .{ .postgres_error = AnyPostgresError.ConnectionClosed }; + stmt.status = .failed; + if (js_reason) |reason| { + request.onJSError(reason, this.globalObject); + } else { + request.onError(.{ .postgres_error = AnyPostgresError.ConnectionClosed }, this.globalObject); + } + }, + // in the middle of running + .binding, + .running, + => { + if (js_reason) |reason| { + request.onJSError(reason, this.globalObject); + } else { + request.onError(.{ .postgres_error = AnyPostgresError.ConnectionClosed }, this.globalObject); + } + }, + // just ignore success and fail cases + .success, .fail => {}, + } + request.deref(); + this.requests.discard(1); + } } pub fn disconnect(this: *@This()) void { @@ -1951,7 +2054,7 @@ pub const PostgresSQLConnection = struct { if (this.status == .connected) { this.status = .disconnected; - this.refAndClose(); + this.refAndClose(null); } } @@ -1963,6 +2066,10 @@ pub const PostgresSQLConnection = struct { return this.requests.peekItem(0); } + fn hasQueryRunning(this: *PostgresSQLConnection) bool { + return !this.is_ready_for_query or this.current() != null; + } + pub const Writer = struct { connection: *PostgresSQLConnection, @@ -2444,59 +2551,117 @@ pub const PostgresSQLConnection = struct { }; }; - fn advance(this: *PostgresSQLConnection) !bool { + fn advance(this: *PostgresSQLConnection) !void { defer this.updateRef(); var any = false; - + defer if (any) this.resetConnectionTimeout(); while (this.requests.readableLength() > 0) { var req: *PostgresSQLQuery = this.requests.peekItem(0); switch (req.status) { .pending => { const stmt = req.statement orelse return error.ExpectedStatement; - if (stmt.status == .failed) { - req.onError(stmt.error_response, this.globalObject); - this.requests.discard(1); - any = true; - } else { - break; - } - }, - .success, .fail => { - this.requests.discard(1); - req.deref(); - any = true; - }, - else => break, - } - } - while (this.requests.readableLength() > 0) { - var req: *PostgresSQLQuery = this.requests.peekItem(0); - const stmt = req.statement orelse return error.ExpectedStatement; - - switch (stmt.status) { - .prepared => { - if (req.status == .pending and stmt.status == .prepared) { - const binding_value = PostgresSQLQuery.bindingGetCached(req.thisValue) orelse .zero; - const columns_value = PostgresSQLQuery.columnsGetCached(req.thisValue) orelse .zero; - PostgresRequest.bindAndExecute(this.globalObject, stmt, binding_value, columns_value, PostgresSQLConnection.Writer, this.writer()) catch |err| { - req.onWriteFail(err, this.globalObject, this.getQueriesArray()); + switch (stmt.status) { + .failed => { + bun.assert(stmt.error_response != null); + req.onError(stmt.error_response.?, this.globalObject); req.deref(); this.requests.discard(1); + + any = true; continue; - }; - req.status = .binding; - req.flags.binary = stmt.fields.len > 0; - any = true; - } else { - break; + }, + .prepared => { + const thisValue = req.thisValue.get(); + bun.assert(thisValue != .zero); + const binding_value = PostgresSQLQuery.bindingGetCached(thisValue) orelse .zero; + const columns_value = PostgresSQLQuery.columnsGetCached(thisValue) orelse .zero; + req.flags.binary = stmt.fields.len > 0; + + PostgresRequest.bindAndExecute(this.globalObject, stmt, binding_value, columns_value, PostgresSQLConnection.Writer, this.writer()) catch |err| { + req.onWriteFail(err, this.globalObject, this.getQueriesArray()); + req.deref(); + this.requests.discard(1); + + continue; + }; + req.status = .binding; + any = true; + return; + }, + .pending => { + // statement is pending, lets write/parse it + var query_str = req.query.toUTF8(bun.default_allocator); + defer query_str.deinit(); + const has_params = stmt.signature.fields.len > 0; + // If it does not have params, we can write and execute immediately in one go + if (!has_params) { + const thisValue = req.thisValue.get(); + bun.assert(thisValue != .zero); + // prepareAndQueryWithSignature will write + bind + execute, it will change to running after binding is complete + const binding_value = PostgresSQLQuery.bindingGetCached(thisValue) orelse .zero; + + PostgresRequest.prepareAndQueryWithSignature(this.globalObject, query_str.slice(), binding_value, PostgresSQLConnection.Writer, this.writer(), &stmt.signature) catch |err| { + stmt.status = .failed; + stmt.error_response = .{ .postgres_error = err }; + req.onWriteFail(err, this.globalObject, this.getQueriesArray()); + req.deref(); + this.requests.discard(1); + + continue; + }; + req.status = .binding; + stmt.status = .parsing; + + any = true; + return; + } + const connection_writer = this.writer(); + // write query and wait for it to be prepared + PostgresRequest.writeQuery(query_str.slice(), stmt.signature.prepared_statement_name, stmt.signature.fields, PostgresSQLConnection.Writer, connection_writer) catch |err| { + stmt.error_response = .{ .postgres_error = err }; + stmt.status = .failed; + + req.onWriteFail(err, this.globalObject, this.getQueriesArray()); + req.deref(); + this.requests.discard(1); + + continue; + }; + connection_writer.write(&protocol.Sync) catch |err| { + stmt.error_response = .{ .postgres_error = err }; + stmt.status = .failed; + + req.onWriteFail(err, this.globalObject, this.getQueriesArray()); + req.deref(); + this.requests.discard(1); + + continue; + }; + stmt.status = .parsing; + any = true; + return; + }, + .parsing => { + // we are still parsing, lets wait for it to be prepared or failed + return; + }, } }, - else => break, + + .running, .binding => { + // if we are binding it will switch to running immediately + // if we are running, we need to wait for it to be success or fail + return; + }, + .success, .fail => { + req.deref(); + this.requests.discard(1); + any = true; + continue; + }, } } - - return any; } pub fn getQueriesArray(this: *const PostgresSQLConnection) JSValue { @@ -2567,13 +2732,14 @@ pub const PostgresSQLConnection = struct { DataCell.Putter.put, ); } - - const pending_value = if (request.thisValue == .zero) .zero else PostgresSQLQuery.pendingValueGetCached(request.thisValue) orelse .zero; + const thisValue = request.thisValue.get(); + bun.assert(thisValue != .zero); + const pending_value = PostgresSQLQuery.pendingValueGetCached(thisValue) orelse .zero; pending_value.ensureStillAlive(); const result = putter.toJS(this.globalObject, pending_value, structure, statement.fields_flags, request.flags.result_mode); if (pending_value == .zero) { - PostgresSQLQuery.pendingValueSetCached(request.thisValue, this.globalObject, result); + PostgresSQLQuery.pendingValueSetCached(thisValue, this.globalObject, result); } }, .CopyData => { @@ -2602,9 +2768,9 @@ pub const PostgresSQLConnection = struct { this.is_ready_for_query = true; this.socket.setTimeout(300); - if (try this.advance() or this.is_ready_for_query) { - this.flushData(); - } + try this.advance(); + + this.flushData(); }, .CommandComplete => { var request = this.current() orelse return error.ExpectedRequest; @@ -2615,7 +2781,6 @@ pub const PostgresSQLConnection = struct { cmd.deinit(); } debug("-> {s}", .{cmd.command_tag.slice()}); - _ = this.requests.discard(1); defer this.updateRef(); request.onSuccess(cmd.command_tag.slice(), this.globalObject, this.js_value); }, @@ -2875,29 +3040,26 @@ pub const PostgresSQLConnection = struct { if (request.statement) |stmt| { if (stmt.status == PostgresSQLStatement.Status.parsing) { stmt.status = PostgresSQLStatement.Status.failed; - stmt.error_response = err; + stmt.error_response = .{ .protocol = err }; is_error_owned = false; if (this.statements.remove(bun.hash(stmt.signature.name))) { stmt.deref(); } } } - _ = this.requests.discard(1); this.updateRef(); - request.onError(err, this.globalObject); + request.onError(.{ .protocol = err }, this.globalObject); }, .PortalSuspended => { // try reader.eatMessage(&protocol.PortalSuspended); // var request = this.current() orelse return error.ExpectedRequest; // _ = request; - // _ = this.requests.discard(1); debug("TODO PortalSuspended", .{}); }, .CloseComplete => { try reader.eatMessage(protocol.CloseComplete); var request = this.current() orelse return error.ExpectedRequest; - _ = this.requests.discard(1); request.onSuccess("CLOSECOMPLETE", this.globalObject, this.getQueriesArray()); }, .CopyInResponse => { @@ -2913,7 +3075,6 @@ pub const PostgresSQLConnection = struct { .EmptyQueryResponse => { try reader.eatMessage(protocol.EmptyQueryResponse); var request = this.current() orelse return error.ExpectedRequest; - _ = this.requests.discard(1); this.updateRef(); request.onSuccess("", this.globalObject, this.getQueriesArray()); }, @@ -2967,15 +3128,38 @@ pub const PostgresSQLStatement = struct { fields: []protocol.FieldDescription = &[_]protocol.FieldDescription{}, parameters: []const int4 = &[_]int4{}, signature: Signature, - status: Status = Status.parsing, - error_response: protocol.ErrorResponse = .{}, + status: Status = Status.pending, + error_response: ?Error = null, needs_duplicate_check: bool = true, fields_flags: PostgresSQLConnection.DataCell.Flags = .{}, + pub const Error = union(enum) { + protocol: protocol.ErrorResponse, + postgres_error: AnyPostgresError, + + pub fn deinit(this: *@This()) void { + switch (this.*) { + .protocol => |*err| err.deinit(), + .postgres_error => {}, + } + } + + pub fn toJS(this: *const @This(), globalObject: *JSC.JSGlobalObject) JSValue { + return switch (this.*) { + .protocol => |err| err.toJS(globalObject), + .postgres_error => |err| postgresErrorToJS(globalObject, null, err), + }; + } + }; pub const Status = enum { + pending, parsing, prepared, failed, + + pub fn isRunning(this: @This()) bool { + return this == .parsing; + } }; pub fn ref(this: *@This()) void { bun.assert(this.ref_count > 0); @@ -3047,7 +3231,11 @@ pub const PostgresSQLStatement = struct { bun.default_allocator.free(this.fields); bun.default_allocator.free(this.parameters); this.cached_structure.deinit(); - this.error_response.deinit(); + if (this.error_response) |err| { + this.error_response = null; + var _error = err; + _error.deinit(); + } this.signature.deinit(); bun.default_allocator.destroy(this); } diff --git a/test/js/sql/docker/Dockerfile b/test/js/sql/docker/Dockerfile index 923a232e9f..210a53f847 100644 --- a/test/js/sql/docker/Dockerfile +++ b/test/js/sql/docker/Dockerfile @@ -59,8 +59,7 @@ RUN mkdir -p /etc/postgresql && touch /etc/postgresql/pg_hba.conf && \ echo "host replication all 127.0.0.1/32 trust" >> /etc/postgresql/pg_hba.conf && \ echo "host replication all ::1/128 trust" >> /etc/postgresql/pg_hba.conf RUN mkdir -p /docker-entrypoint-initdb.d && \ - echo "ALTER SYSTEM SET max_prepared_transactions = '100';" > /docker-entrypoint-initdb.d/configure-postgres.sql - + echo "ALTER SYSTEM SET max_prepared_transactions = '1000';ALTER SYSTEM SET max_connections = '2000';" > /docker-entrypoint-initdb.d/configure-postgres.sql # Set environment variables ENV POSTGRES_HOST_AUTH_METHOD=trust ENV POSTGRES_USER=postgres diff --git a/test/js/sql/sql.test.ts b/test/js/sql/sql.test.ts index f79c769278..42adec7416 100644 --- a/test/js/sql/sql.test.ts +++ b/test/js/sql/sql.test.ts @@ -61,9 +61,7 @@ async function startContainer(): Promise<{ port: number; containerName: string } } // Start the container - await execAsync( - `${dockerCLI} run -d --name ${containerName} -p ${port}:5432 custom-postgres`, - ); + await execAsync(`${dockerCLI} run -d --name ${containerName} -p ${port}:5432 custom-postgres`); // Wait for PostgreSQL to be ready await waitForPostgres(port); @@ -99,6 +97,9 @@ if (isDockerEnabled()) { afterAll(async () => { try { await execAsync(`${dockerCLI} stop -t 0 ${container.containerName}`); + } catch (error) {} + + try { await execAsync(`${dockerCLI} rm -f ${container.containerName}`); } catch (error) {} }); @@ -625,7 +626,7 @@ if (isDockerEnabled()) { }); test("Transaction requests are executed implicitly", async () => { - const sql = postgres({ ...options, debug: true, idle_timeout: 1, fetch_types: false }); + await using sql = postgres(options); expect( ( await sql.begin(sql => [ @@ -636,12 +637,21 @@ if (isDockerEnabled()) { ).toBe("testing"); }); - test("Uncaught transaction request errosó rs bubbles to transaction", async () => { - const sql = postgres({ ...options, debug: true, idle_timeout: 1, fetch_types: false }); + test("Idle timeout retry works", async () => { + await using sql = postgres({ ...options, idleTimeout: 1 }); + await sql`select 1`; + await Bun.sleep(1100); // 1.1 seconds so it should retry + await sql`select 1`; + expect().pass(); + }); + + test("Uncaught transaction request errors bubbles to transaction", async () => { + const sql = postgres(options); + process.nextTick(() => sql.close({ timeout: 1 })); expect( await sql .begin(sql => [sql`select wat`, sql`select current_setting('bun_sql.test') as x, ${1} as a`]) - .catch(e => e.errno), + .catch(e => e.errno || e), ).toBe("42703"); }); @@ -993,8 +1003,6 @@ if (isDockerEnabled()) { const sql = postgres(options); const promise = sql`select pg_sleep(0.2) as x`.execute(); - // we await 1 to start the query - await 1; await sql.end(); return expect(await promise).toEqual([{ x: "" }]); }); diff --git a/test/js/sql/tls-sql.test.ts b/test/js/sql/tls-sql.test.ts index bb573a2976..2272ef0bd1 100644 --- a/test/js/sql/tls-sql.test.ts +++ b/test/js/sql/tls-sql.test.ts @@ -152,7 +152,7 @@ if (TLS_POSTGRES_DATABASE_URL) { }); test("Transaction requests are executed implicitly", async () => { - await using sql = new SQL({ ...options, debug: true, idle_timeout: 1, fetch_types: false }); + await using sql = new SQL(options); expect( ( await sql.begin(sql => [ @@ -164,11 +164,11 @@ if (TLS_POSTGRES_DATABASE_URL) { }); test("Uncaught transaction request errors bubbles to transaction", async () => { - await using sql = new SQL({ ...options, debug: true, idle_timeout: 1, fetch_types: false, max: 10 }); + await using sql = new SQL(options); expect( await sql .begin(sql => [sql`select wat`, sql`select current_setting('bun_sql.test') as x, ${1} as a`]) - .catch(e => e.errno), + .catch(e => e.errno || e), ).toBe("42703"); }); @@ -198,9 +198,9 @@ if (TLS_POSTGRES_DATABASE_URL) { }); test("Many transactions at beginning of connection", async () => { - await using sql = new SQL({ ...options, max: 10 }); - const xs = await Promise.all(Array.from({ length: 100 }, () => sql.begin(sql => sql`select 1`))); - return expect(xs.length).toBe(100); + await using sql = new SQL({ ...options, max: 2 }); + const xs = await Promise.all(Array.from({ length: 30 }, () => sql.begin(sql => sql`select 1`))); + return expect(xs.length).toBe(30); }); test("Transactions array", async () => { @@ -212,7 +212,7 @@ if (TLS_POSTGRES_DATABASE_URL) { }); test("Transaction waits", async () => { - await using sql = new SQL({ ...options, max: 10 }); + await using sql = new SQL({ ...options, max: 2 }); await sql`CREATE TEMPORARY TABLE IF NOT EXISTS test (a int)`; await sql.begin(async sql => { await sql`insert into test values(1)`;