fix(sql) fix execution queue (#16854)

This commit is contained in:
Ciro Spaciari
2025-01-29 23:52:19 -08:00
committed by GitHub
parent 574a41b03f
commit 892764ec43
5 changed files with 422 additions and 223 deletions

View File

@@ -45,6 +45,51 @@ pub const AnyPostgresError = error{
UnsupportedIntegerSize,
};
pub fn postgresErrorToJS(globalObject: *JSC.JSGlobalObject, message: ?[]const u8, err: AnyPostgresError) JSValue {
const error_code: JSC.Error = switch (err) {
error.ConnectionClosed => JSC.Error.ERR_POSTGRES_CONNECTION_CLOSED,
error.ExpectedRequest => JSC.Error.ERR_POSTGRES_EXPECTED_REQUEST,
error.ExpectedStatement => JSC.Error.ERR_POSTGRES_EXPECTED_STATEMENT,
error.InvalidBackendKeyData => JSC.Error.ERR_POSTGRES_INVALID_BACKEND_KEY_DATA,
error.InvalidBinaryData => JSC.Error.ERR_POSTGRES_INVALID_BINARY_DATA,
error.InvalidByteSequence => JSC.Error.ERR_POSTGRES_INVALID_BYTE_SEQUENCE,
error.InvalidByteSequenceForEncoding => JSC.Error.ERR_POSTGRES_INVALID_BYTE_SEQUENCE_FOR_ENCODING,
error.InvalidCharacter => JSC.Error.ERR_POSTGRES_INVALID_CHARACTER,
error.InvalidMessage => JSC.Error.ERR_POSTGRES_INVALID_MESSAGE,
error.InvalidMessageLength => JSC.Error.ERR_POSTGRES_INVALID_MESSAGE_LENGTH,
error.InvalidQueryBinding => JSC.Error.ERR_POSTGRES_INVALID_QUERY_BINDING,
error.InvalidServerKey => JSC.Error.ERR_POSTGRES_INVALID_SERVER_KEY,
error.InvalidServerSignature => JSC.Error.ERR_POSTGRES_INVALID_SERVER_SIGNATURE,
error.MultidimensionalArrayNotSupportedYet => JSC.Error.ERR_POSTGRES_MULTIDIMENSIONAL_ARRAY_NOT_SUPPORTED_YET,
error.NullsInArrayNotSupportedYet => JSC.Error.ERR_POSTGRES_NULLS_IN_ARRAY_NOT_SUPPORTED_YET,
error.Overflow => JSC.Error.ERR_POSTGRES_OVERFLOW,
error.PBKDFD2 => JSC.Error.ERR_POSTGRES_AUTHENTICATION_FAILED_PBKDF2,
error.SASL_SIGNATURE_MISMATCH => JSC.Error.ERR_POSTGRES_SASL_SIGNATURE_MISMATCH,
error.SASL_SIGNATURE_INVALID_BASE64 => JSC.Error.ERR_POSTGRES_SASL_SIGNATURE_INVALID_BASE64,
error.TLSNotAvailable => JSC.Error.ERR_POSTGRES_TLS_NOT_AVAILABLE,
error.TLSUpgradeFailed => JSC.Error.ERR_POSTGRES_TLS_UPGRADE_FAILED,
error.UnexpectedMessage => JSC.Error.ERR_POSTGRES_UNEXPECTED_MESSAGE,
error.UNKNOWN_AUTHENTICATION_METHOD => JSC.Error.ERR_POSTGRES_UNKNOWN_AUTHENTICATION_METHOD,
error.UNSUPPORTED_AUTHENTICATION_METHOD => JSC.Error.ERR_POSTGRES_UNSUPPORTED_AUTHENTICATION_METHOD,
error.UnsupportedByteaFormat => JSC.Error.ERR_POSTGRES_UNSUPPORTED_BYTEA_FORMAT,
error.UnsupportedIntegerSize => JSC.Error.ERR_POSTGRES_UNSUPPORTED_INTEGER_SIZE,
error.JSError => {
return globalObject.takeException(error.JSError);
},
error.OutOfMemory => {
// TODO: add binding for creating an out of memory error?
return globalObject.takeException(globalObject.throwOutOfMemory());
},
error.ShortRead => {
bun.unreachablePanic("Assertion failed: ShortRead should be handled by the caller in postgres", .{});
},
};
if (message) |msg| {
return error_code.fmt(globalObject, "{s}", .{msg});
}
return error_code.fmt(globalObject, "Failed to bind query: {s}", .{@errorName(err)});
}
pub const SSLMode = enum(u8) {
disable = 0,
prefer = 1,
@@ -207,13 +252,71 @@ pub const PostgresSQLQueryResultMode = enum(u8) {
values = 1,
raw = 2,
};
const JSRef = union(enum) {
weak: JSC.JSValue,
strong: JSC.Strong,
pub fn initWeak(value: JSC.JSValue) @This() {
return .{ .weak = value };
}
pub fn initStrong(value: JSC.JSValue, globalThis: *JSC.JSGlobalObject) @This() {
return .{ .strong = JSC.Strong.create(value, globalThis) };
}
pub fn empty() @This() {
return .{ .weak = .zero };
}
pub fn get(this: *@This()) JSC.JSValue {
return switch (this.*) {
.weak => this.weak,
.strong => this.strong.get() orelse .zero,
};
}
pub fn setWeak(this: *@This(), value: JSC.JSValue) void {
if (this == .strong) {
this.strong.deinit();
}
this.* = .{ .weak = value };
}
pub fn setStrong(this: *@This(), value: JSC.JSValue, globalThis: *JSC.JSGlobalObject) void {
if (this == .strong) {
this.strong.set(globalThis, value);
return;
}
this.* = .{ .strong = JSC.Strong.create(value, globalThis) };
}
pub fn upgrade(this: *@This(), globalThis: *JSC.JSGlobalObject) void {
switch (this.*) {
.weak => {
bun.assert(this.weak != .zero);
this.* = .{ .strong = JSC.Strong.create(this.weak, globalThis) };
},
.strong => {},
}
}
pub fn deinit(this: *@This()) void {
switch (this.*) {
.weak => {
this.weak = .zero;
},
.strong => {
this.strong.deinit();
},
}
}
};
pub const PostgresSQLQuery = struct {
statement: ?*PostgresSQLStatement = null,
query: bun.String = bun.String.empty,
cursor_name: bun.String = bun.String.empty,
// Kept alive by being in the "queries" array from JS.
thisValue: JSValue = .undefined,
thisValue: JSRef = JSRef.empty(),
status: Status = Status.pending,
@@ -229,24 +332,29 @@ pub const PostgresSQLQuery = struct {
pub usingnamespace JSC.Codegen.JSPostgresSQLQuery;
const log = bun.Output.scoped(.PostgresSQLQuery, false);
pub fn getTarget(this: *PostgresSQLQuery, globalObject: *JSC.JSGlobalObject) JSC.JSValue {
if (this.thisValue == .zero) {
const thisValue = this.thisValue.get();
if (thisValue == .zero) {
return .zero;
}
const target = PostgresSQLQuery.targetGetCached(this.thisValue) orelse return .zero;
PostgresSQLQuery.targetSetCached(this.thisValue, globalObject, .zero);
const target = PostgresSQLQuery.targetGetCached(thisValue) orelse return .zero;
PostgresSQLQuery.targetSetCached(thisValue, globalObject, .zero);
return target;
}
pub const Status = enum(u8) {
/// The query was just enqueued, statement status can be checked for more details
pending,
written,
running,
/// The query is being bound to the statement
binding,
/// The query is running
running,
/// The query was successful
success,
/// The query failed
fail,
pub fn isRunning(this: Status) bool {
return this == .running or this == .binding;
return @intFromEnum(this) > @intFromEnum(Status.pending) and @intFromEnum(this) < @intFromEnum(Status.success);
}
};
@@ -255,6 +363,7 @@ pub const PostgresSQLQuery = struct {
}
pub fn deinit(this: *@This()) void {
this.thisValue.deinit();
if (this.statement) |statement| {
statement.deref();
}
@@ -265,7 +374,11 @@ pub const PostgresSQLQuery = struct {
pub fn finalize(this: *@This()) void {
debug("PostgresSQLQuery finalize", .{});
this.thisValue = .zero;
if (this.thisValue == .weak) {
// clean up if is a weak reference, if is a strong reference we need to wait until the query is done
// if we are a strong reference, here is probably a bug because GC'd should not happen
this.thisValue.weak = .zero;
}
this.deref();
}
@@ -281,27 +394,6 @@ pub const PostgresSQLQuery = struct {
bun.assert(this.ref_count.fetchAdd(1, .monotonic) > 0);
}
pub fn onNoData(this: *@This(), globalObject: *JSC.JSGlobalObject, queries_array: JSValue) void {
this.status = .success;
defer this.deref();
const thisValue = this.thisValue;
const targetValue = this.getTarget(globalObject);
if (thisValue == .zero or targetValue == .zero) {
return;
}
const vm = JSC.VirtualMachine.get();
const function = vm.rareData().postgresql_context.onQueryResolveFn.get().?;
const event_loop = vm.eventLoop();
event_loop.runCallback(function, globalObject, thisValue, &.{
targetValue,
this.pending_value.trySwap() orelse .undefined,
JSValue.jsNumber(0),
JSValue.jsNumber(0),
queries_array,
});
}
pub fn onWriteFail(
this: *@This(),
err: AnyPostgresError,
@@ -309,30 +401,29 @@ pub const PostgresSQLQuery = struct {
queries_array: JSValue,
) void {
this.status = .fail;
const thisValue = this.thisValue;
const thisValue = this.thisValue.get();
defer this.thisValue.deinit();
const targetValue = this.getTarget(globalObject);
if (thisValue == .zero or targetValue == .zero) {
return;
}
const instance = globalObject.createErrorInstance("Failed to bind query: {s}", .{@errorName(err)});
// TODO: error handling
const vm = JSC.VirtualMachine.get();
const function = vm.rareData().postgresql_context.onQueryRejectFn.get().?;
const event_loop = vm.eventLoop();
event_loop.runCallback(function, globalObject, thisValue, &.{
targetValue,
instance,
postgresErrorToJS(globalObject, null, err),
queries_array,
});
}
pub fn onError(this: *@This(), err: protocol.ErrorResponse, globalObject: *JSC.JSGlobalObject) void {
pub fn onJSError(this: *@This(), err: JSC.JSValue, globalObject: *JSC.JSGlobalObject) void {
this.status = .fail;
this.ref();
defer this.deref();
const thisValue = this.thisValue;
const thisValue = this.thisValue.get();
defer this.thisValue.deinit();
const targetValue = this.getTarget(globalObject);
if (thisValue == .zero or targetValue == .zero) {
return;
@@ -343,9 +434,12 @@ pub const PostgresSQLQuery = struct {
const event_loop = vm.eventLoop();
event_loop.runCallback(function, globalObject, thisValue, &.{
targetValue,
err.toJS(globalObject),
err,
});
}
pub fn onError(this: *@This(), err: PostgresSQLStatement.Error, globalObject: *JSC.JSGlobalObject) void {
this.onJSError(err.toJS(globalObject), globalObject);
}
const CommandTag = union(enum) {
// For an INSERT command, the tag is INSERT oid rows, where rows is the
@@ -464,9 +558,11 @@ pub const PostgresSQLQuery = struct {
pub fn onSuccess(this: *@This(), command_tag_str: []const u8, globalObject: *JSC.JSGlobalObject, connection: JSC.JSValue) void {
this.status = .success;
this.ref();
defer this.deref();
const thisValue = this.thisValue;
const thisValue = this.thisValue.get();
defer this.thisValue.deinit();
const targetValue = this.getTarget(globalObject);
defer allowGC(thisValue, globalObject);
if (thisValue == .zero or targetValue == .zero) {
@@ -533,7 +629,7 @@ pub const PostgresSQLQuery = struct {
ptr.* = .{
.query = query.toBunString(globalThis),
.thisValue = this_value,
.thisValue = JSRef.initWeak(this_value),
.flags = .{
.bigint = bigint,
},
@@ -604,78 +700,91 @@ pub const PostgresSQLQuery = struct {
};
const has_params = signature.fields.len > 0;
var did_write = false;
var reset_timeout = false;
enqueue: {
if (entry.found_existing) {
this.statement = entry.value_ptr.*;
this.statement.?.ref();
signature.deinit();
if (has_params and this.statement.?.status == .parsing) {
switch (this.statement.?.status) {
.failed => {
// If the statement failed, we need to throw the error
return globalObject.throwValue(this.statement.?.error_response.?.toJS(globalObject));
},
.prepared => {
if (!connection.hasQueryRunning()) {
this.flags.binary = this.statement.?.fields.len > 0;
log("bindAndExecute", .{});
// bindAndExecute will bind + execute, it will change to running after binding is complete
PostgresRequest.bindAndExecute(globalObject, this.statement.?, binding_value, columns_value, PostgresSQLConnection.Writer, writer) catch |err| {
if (!globalObject.hasException())
return globalObject.throwValue(postgresErrorToJS(globalObject, "failed to bind and execute query", err));
return error.JSError;
};
this.status = .binding;
// if it has params, we need to wait for ParamDescription to be received before we can write the data
} else {
this.flags.binary = this.statement.?.fields.len > 0;
log("bindAndExecute", .{});
PostgresRequest.bindAndExecute(globalObject, this.statement.?, binding_value, columns_value, PostgresSQLConnection.Writer, writer) catch |err| {
if (!globalObject.hasException())
return globalObject.throwError(err, "failed to bind and execute query");
return error.JSError;
};
did_write = true;
reset_timeout = true;
}
},
.parsing, .pending => {},
}
break :enqueue;
}
// If it does not have params, we can write and execute immediately in one go
if (!has_params) {
log("prepareAndQueryWithSignature", .{});
const can_execute = !connection.hasQueryRunning();
PostgresRequest.prepareAndQueryWithSignature(globalObject, query_str.slice(), binding_value, PostgresSQLConnection.Writer, writer, &signature) catch |err| {
signature.deinit();
if (!globalObject.hasException())
return globalObject.throwError(err, "failed to prepare and query");
return error.JSError;
};
did_write = true;
} else {
log("writeQuery", .{});
PostgresRequest.writeQuery(query_str.slice(), signature.prepared_statement_name, signature.fields, PostgresSQLConnection.Writer, writer) catch |err| {
signature.deinit();
if (!globalObject.hasException())
return globalObject.throwError(err, "failed to write query");
return error.JSError;
};
writer.write(&protocol.Sync) catch |err| {
signature.deinit();
if (!globalObject.hasException())
return globalObject.throwError(err, "failed to flush");
return error.JSError;
};
if (can_execute) {
// If it does not have params, we can write and execute immediately in one go
if (!has_params) {
log("prepareAndQueryWithSignature", .{});
// prepareAndQueryWithSignature will write + bind + execute, it will change to running after binding is complete
PostgresRequest.prepareAndQueryWithSignature(globalObject, query_str.slice(), binding_value, PostgresSQLConnection.Writer, writer, &signature) catch |err| {
signature.deinit();
if (!globalObject.hasException())
return globalObject.throwValue(postgresErrorToJS(globalObject, "failed to prepare and query", err));
return error.JSError;
};
this.status = .binding;
reset_timeout = true;
} else {
log("writeQuery", .{});
PostgresRequest.writeQuery(query_str.slice(), signature.prepared_statement_name, signature.fields, PostgresSQLConnection.Writer, writer) catch |err| {
signature.deinit();
if (!globalObject.hasException())
return globalObject.throwValue(postgresErrorToJS(globalObject, "failed to write query", err));
return error.JSError;
};
writer.write(&protocol.Sync) catch |err| {
signature.deinit();
if (!globalObject.hasException())
return globalObject.throwValue(postgresErrorToJS(globalObject, "failed to flush", err));
return error.JSError;
};
reset_timeout = true;
}
}
{
const stmt = bun.default_allocator.create(PostgresSQLStatement) catch |err| {
return globalObject.throwError(err, "failed to allocate statement");
const stmt = bun.default_allocator.create(PostgresSQLStatement) catch {
return globalObject.throwOutOfMemory();
};
connection.prepared_statement_id += 1;
stmt.* = .{ .signature = signature, .ref_count = 2, .status = PostgresSQLStatement.Status.parsing };
stmt.* = .{ .signature = signature, .ref_count = 2, .status = if (can_execute) .parsing else .pending };
this.statement = stmt;
entry.value_ptr.* = stmt;
}
}
connection.requests.writeItem(this) catch {};
// We need a strong reference to the query so that it doesn't get GC'd
connection.requests.writeItem(this) catch return globalObject.throwOutOfMemory();
this.ref();
this.status = if (did_write) .binding else .pending;
this.thisValue.upgrade(globalObject);
PostgresSQLQuery.targetSetCached(this_value, globalObject, query);
if (connection.is_ready_for_query)
connection.flushDataAndResetTimeout()
else if (did_write)
else if (reset_timeout)
connection.resetConnectionTimeout();
return .undefined;
@@ -1373,7 +1482,8 @@ pub const PostgresSQLConnection = struct {
this.ref();
defer this.deref();
this.refAndClose();
// we defer the refAndClose so the on_close will be called first before we reject the pending requests
defer this.refAndClose(value);
const on_close = this.consumeOnCloseCallback(this.globalObject) orelse return;
const loop = this.globalObject.bunVM().eventLoop();
@@ -1397,47 +1507,8 @@ pub const PostgresSQLConnection = struct {
debug("failed: {s}: {s}", .{ message, @errorName(err) });
const globalObject = this.globalObject;
const error_code: JSC.Error = switch (err) {
error.ConnectionClosed => JSC.Error.ERR_POSTGRES_CONNECTION_CLOSED,
error.ExpectedRequest => JSC.Error.ERR_POSTGRES_EXPECTED_REQUEST,
error.ExpectedStatement => JSC.Error.ERR_POSTGRES_EXPECTED_STATEMENT,
error.InvalidBackendKeyData => JSC.Error.ERR_POSTGRES_INVALID_BACKEND_KEY_DATA,
error.InvalidBinaryData => JSC.Error.ERR_POSTGRES_INVALID_BINARY_DATA,
error.InvalidByteSequence => JSC.Error.ERR_POSTGRES_INVALID_BYTE_SEQUENCE,
error.InvalidByteSequenceForEncoding => JSC.Error.ERR_POSTGRES_INVALID_BYTE_SEQUENCE_FOR_ENCODING,
error.InvalidCharacter => JSC.Error.ERR_POSTGRES_INVALID_CHARACTER,
error.InvalidMessage => JSC.Error.ERR_POSTGRES_INVALID_MESSAGE,
error.InvalidMessageLength => JSC.Error.ERR_POSTGRES_INVALID_MESSAGE_LENGTH,
error.InvalidQueryBinding => JSC.Error.ERR_POSTGRES_INVALID_QUERY_BINDING,
error.InvalidServerKey => JSC.Error.ERR_POSTGRES_INVALID_SERVER_KEY,
error.InvalidServerSignature => JSC.Error.ERR_POSTGRES_INVALID_SERVER_SIGNATURE,
error.MultidimensionalArrayNotSupportedYet => JSC.Error.ERR_POSTGRES_MULTIDIMENSIONAL_ARRAY_NOT_SUPPORTED_YET,
error.NullsInArrayNotSupportedYet => JSC.Error.ERR_POSTGRES_NULLS_IN_ARRAY_NOT_SUPPORTED_YET,
error.Overflow => JSC.Error.ERR_POSTGRES_OVERFLOW,
error.PBKDFD2 => JSC.Error.ERR_POSTGRES_AUTHENTICATION_FAILED_PBKDF2,
error.SASL_SIGNATURE_MISMATCH => JSC.Error.ERR_POSTGRES_SASL_SIGNATURE_MISMATCH,
error.SASL_SIGNATURE_INVALID_BASE64 => JSC.Error.ERR_POSTGRES_SASL_SIGNATURE_INVALID_BASE64,
error.TLSNotAvailable => JSC.Error.ERR_POSTGRES_TLS_NOT_AVAILABLE,
error.TLSUpgradeFailed => JSC.Error.ERR_POSTGRES_TLS_UPGRADE_FAILED,
error.UnexpectedMessage => JSC.Error.ERR_POSTGRES_UNEXPECTED_MESSAGE,
error.UNKNOWN_AUTHENTICATION_METHOD => JSC.Error.ERR_POSTGRES_UNKNOWN_AUTHENTICATION_METHOD,
error.UNSUPPORTED_AUTHENTICATION_METHOD => JSC.Error.ERR_POSTGRES_UNSUPPORTED_AUTHENTICATION_METHOD,
error.UnsupportedByteaFormat => JSC.Error.ERR_POSTGRES_UNSUPPORTED_BYTEA_FORMAT,
error.UnsupportedIntegerSize => JSC.Error.ERR_POSTGRES_UNSUPPORTED_INTEGER_SIZE,
error.JSError => {
this.failWithJSValue(globalObject.takeException(error.JSError));
return;
},
error.OutOfMemory => {
// TODO: add binding for creating an out of memory error?
this.failWithJSValue(globalObject.takeException(globalObject.throwOutOfMemory()));
return;
},
error.ShortRead => {
bun.unreachablePanic("Assertion failed: ShortRead should be handled by the caller in postgres", .{});
},
};
this.failWithJSValue(error_code.fmt(globalObject, "{s}", .{message}));
this.failWithJSValue(postgresErrorToJS(globalObject, message, err));
}
pub fn onClose(this: *PostgresSQLConnection) void {
@@ -1460,7 +1531,6 @@ pub const PostgresSQLConnection = struct {
.options = Data{ .temporary = this.options },
};
msg.writeInternal(Writer, this.writer()) catch |err| {
this.refAndClose();
this.fail("Failed to write startup message", err);
};
}
@@ -1937,13 +2007,46 @@ pub const PostgresSQLConnection = struct {
bun.default_allocator.destroy(this);
}
fn refAndClose(this: *@This()) void {
fn refAndClose(this: *@This(), js_reason: ?JSC.JSValue) void {
// refAndClose is always called when we wanna to disconnect or when we are closed
if (!this.socket.isClosed()) {
// event loop need to be alive to close the socket
this.poll_ref.ref(this.globalObject.bunVM());
// will unref on socket close
this.socket.close();
}
// cleanup requests
while (this.current()) |request| {
switch (request.status) {
// pending we will fail the request and the stmt will be marked as error ConnectionClosed too
.pending => {
const stmt = request.statement orelse continue;
stmt.error_response = .{ .postgres_error = AnyPostgresError.ConnectionClosed };
stmt.status = .failed;
if (js_reason) |reason| {
request.onJSError(reason, this.globalObject);
} else {
request.onError(.{ .postgres_error = AnyPostgresError.ConnectionClosed }, this.globalObject);
}
},
// in the middle of running
.binding,
.running,
=> {
if (js_reason) |reason| {
request.onJSError(reason, this.globalObject);
} else {
request.onError(.{ .postgres_error = AnyPostgresError.ConnectionClosed }, this.globalObject);
}
},
// just ignore success and fail cases
.success, .fail => {},
}
request.deref();
this.requests.discard(1);
}
}
pub fn disconnect(this: *@This()) void {
@@ -1951,7 +2054,7 @@ pub const PostgresSQLConnection = struct {
if (this.status == .connected) {
this.status = .disconnected;
this.refAndClose();
this.refAndClose(null);
}
}
@@ -1963,6 +2066,10 @@ pub const PostgresSQLConnection = struct {
return this.requests.peekItem(0);
}
fn hasQueryRunning(this: *PostgresSQLConnection) bool {
return !this.is_ready_for_query or this.current() != null;
}
pub const Writer = struct {
connection: *PostgresSQLConnection,
@@ -2444,59 +2551,117 @@ pub const PostgresSQLConnection = struct {
};
};
fn advance(this: *PostgresSQLConnection) !bool {
fn advance(this: *PostgresSQLConnection) !void {
defer this.updateRef();
var any = false;
defer if (any) this.resetConnectionTimeout();
while (this.requests.readableLength() > 0) {
var req: *PostgresSQLQuery = this.requests.peekItem(0);
switch (req.status) {
.pending => {
const stmt = req.statement orelse return error.ExpectedStatement;
if (stmt.status == .failed) {
req.onError(stmt.error_response, this.globalObject);
this.requests.discard(1);
any = true;
} else {
break;
}
},
.success, .fail => {
this.requests.discard(1);
req.deref();
any = true;
},
else => break,
}
}
while (this.requests.readableLength() > 0) {
var req: *PostgresSQLQuery = this.requests.peekItem(0);
const stmt = req.statement orelse return error.ExpectedStatement;
switch (stmt.status) {
.prepared => {
if (req.status == .pending and stmt.status == .prepared) {
const binding_value = PostgresSQLQuery.bindingGetCached(req.thisValue) orelse .zero;
const columns_value = PostgresSQLQuery.columnsGetCached(req.thisValue) orelse .zero;
PostgresRequest.bindAndExecute(this.globalObject, stmt, binding_value, columns_value, PostgresSQLConnection.Writer, this.writer()) catch |err| {
req.onWriteFail(err, this.globalObject, this.getQueriesArray());
switch (stmt.status) {
.failed => {
bun.assert(stmt.error_response != null);
req.onError(stmt.error_response.?, this.globalObject);
req.deref();
this.requests.discard(1);
any = true;
continue;
};
req.status = .binding;
req.flags.binary = stmt.fields.len > 0;
any = true;
} else {
break;
},
.prepared => {
const thisValue = req.thisValue.get();
bun.assert(thisValue != .zero);
const binding_value = PostgresSQLQuery.bindingGetCached(thisValue) orelse .zero;
const columns_value = PostgresSQLQuery.columnsGetCached(thisValue) orelse .zero;
req.flags.binary = stmt.fields.len > 0;
PostgresRequest.bindAndExecute(this.globalObject, stmt, binding_value, columns_value, PostgresSQLConnection.Writer, this.writer()) catch |err| {
req.onWriteFail(err, this.globalObject, this.getQueriesArray());
req.deref();
this.requests.discard(1);
continue;
};
req.status = .binding;
any = true;
return;
},
.pending => {
// statement is pending, lets write/parse it
var query_str = req.query.toUTF8(bun.default_allocator);
defer query_str.deinit();
const has_params = stmt.signature.fields.len > 0;
// If it does not have params, we can write and execute immediately in one go
if (!has_params) {
const thisValue = req.thisValue.get();
bun.assert(thisValue != .zero);
// prepareAndQueryWithSignature will write + bind + execute, it will change to running after binding is complete
const binding_value = PostgresSQLQuery.bindingGetCached(thisValue) orelse .zero;
PostgresRequest.prepareAndQueryWithSignature(this.globalObject, query_str.slice(), binding_value, PostgresSQLConnection.Writer, this.writer(), &stmt.signature) catch |err| {
stmt.status = .failed;
stmt.error_response = .{ .postgres_error = err };
req.onWriteFail(err, this.globalObject, this.getQueriesArray());
req.deref();
this.requests.discard(1);
continue;
};
req.status = .binding;
stmt.status = .parsing;
any = true;
return;
}
const connection_writer = this.writer();
// write query and wait for it to be prepared
PostgresRequest.writeQuery(query_str.slice(), stmt.signature.prepared_statement_name, stmt.signature.fields, PostgresSQLConnection.Writer, connection_writer) catch |err| {
stmt.error_response = .{ .postgres_error = err };
stmt.status = .failed;
req.onWriteFail(err, this.globalObject, this.getQueriesArray());
req.deref();
this.requests.discard(1);
continue;
};
connection_writer.write(&protocol.Sync) catch |err| {
stmt.error_response = .{ .postgres_error = err };
stmt.status = .failed;
req.onWriteFail(err, this.globalObject, this.getQueriesArray());
req.deref();
this.requests.discard(1);
continue;
};
stmt.status = .parsing;
any = true;
return;
},
.parsing => {
// we are still parsing, lets wait for it to be prepared or failed
return;
},
}
},
else => break,
.running, .binding => {
// if we are binding it will switch to running immediately
// if we are running, we need to wait for it to be success or fail
return;
},
.success, .fail => {
req.deref();
this.requests.discard(1);
any = true;
continue;
},
}
}
return any;
}
pub fn getQueriesArray(this: *const PostgresSQLConnection) JSValue {
@@ -2567,13 +2732,14 @@ pub const PostgresSQLConnection = struct {
DataCell.Putter.put,
);
}
const pending_value = if (request.thisValue == .zero) .zero else PostgresSQLQuery.pendingValueGetCached(request.thisValue) orelse .zero;
const thisValue = request.thisValue.get();
bun.assert(thisValue != .zero);
const pending_value = PostgresSQLQuery.pendingValueGetCached(thisValue) orelse .zero;
pending_value.ensureStillAlive();
const result = putter.toJS(this.globalObject, pending_value, structure, statement.fields_flags, request.flags.result_mode);
if (pending_value == .zero) {
PostgresSQLQuery.pendingValueSetCached(request.thisValue, this.globalObject, result);
PostgresSQLQuery.pendingValueSetCached(thisValue, this.globalObject, result);
}
},
.CopyData => {
@@ -2602,9 +2768,9 @@ pub const PostgresSQLConnection = struct {
this.is_ready_for_query = true;
this.socket.setTimeout(300);
if (try this.advance() or this.is_ready_for_query) {
this.flushData();
}
try this.advance();
this.flushData();
},
.CommandComplete => {
var request = this.current() orelse return error.ExpectedRequest;
@@ -2615,7 +2781,6 @@ pub const PostgresSQLConnection = struct {
cmd.deinit();
}
debug("-> {s}", .{cmd.command_tag.slice()});
_ = this.requests.discard(1);
defer this.updateRef();
request.onSuccess(cmd.command_tag.slice(), this.globalObject, this.js_value);
},
@@ -2875,29 +3040,26 @@ pub const PostgresSQLConnection = struct {
if (request.statement) |stmt| {
if (stmt.status == PostgresSQLStatement.Status.parsing) {
stmt.status = PostgresSQLStatement.Status.failed;
stmt.error_response = err;
stmt.error_response = .{ .protocol = err };
is_error_owned = false;
if (this.statements.remove(bun.hash(stmt.signature.name))) {
stmt.deref();
}
}
}
_ = this.requests.discard(1);
this.updateRef();
request.onError(err, this.globalObject);
request.onError(.{ .protocol = err }, this.globalObject);
},
.PortalSuspended => {
// try reader.eatMessage(&protocol.PortalSuspended);
// var request = this.current() orelse return error.ExpectedRequest;
// _ = request;
// _ = this.requests.discard(1);
debug("TODO PortalSuspended", .{});
},
.CloseComplete => {
try reader.eatMessage(protocol.CloseComplete);
var request = this.current() orelse return error.ExpectedRequest;
_ = this.requests.discard(1);
request.onSuccess("CLOSECOMPLETE", this.globalObject, this.getQueriesArray());
},
.CopyInResponse => {
@@ -2913,7 +3075,6 @@ pub const PostgresSQLConnection = struct {
.EmptyQueryResponse => {
try reader.eatMessage(protocol.EmptyQueryResponse);
var request = this.current() orelse return error.ExpectedRequest;
_ = this.requests.discard(1);
this.updateRef();
request.onSuccess("", this.globalObject, this.getQueriesArray());
},
@@ -2967,15 +3128,38 @@ pub const PostgresSQLStatement = struct {
fields: []protocol.FieldDescription = &[_]protocol.FieldDescription{},
parameters: []const int4 = &[_]int4{},
signature: Signature,
status: Status = Status.parsing,
error_response: protocol.ErrorResponse = .{},
status: Status = Status.pending,
error_response: ?Error = null,
needs_duplicate_check: bool = true,
fields_flags: PostgresSQLConnection.DataCell.Flags = .{},
pub const Error = union(enum) {
protocol: protocol.ErrorResponse,
postgres_error: AnyPostgresError,
pub fn deinit(this: *@This()) void {
switch (this.*) {
.protocol => |*err| err.deinit(),
.postgres_error => {},
}
}
pub fn toJS(this: *const @This(), globalObject: *JSC.JSGlobalObject) JSValue {
return switch (this.*) {
.protocol => |err| err.toJS(globalObject),
.postgres_error => |err| postgresErrorToJS(globalObject, null, err),
};
}
};
pub const Status = enum {
pending,
parsing,
prepared,
failed,
pub fn isRunning(this: @This()) bool {
return this == .parsing;
}
};
pub fn ref(this: *@This()) void {
bun.assert(this.ref_count > 0);
@@ -3047,7 +3231,11 @@ pub const PostgresSQLStatement = struct {
bun.default_allocator.free(this.fields);
bun.default_allocator.free(this.parameters);
this.cached_structure.deinit();
this.error_response.deinit();
if (this.error_response) |err| {
this.error_response = null;
var _error = err;
_error.deinit();
}
this.signature.deinit();
bun.default_allocator.destroy(this);
}