more(sql) type fixes and tests (#16512)

This commit is contained in:
Ciro Spaciari
2025-01-20 16:58:37 -08:00
committed by GitHub
parent cfbb62df16
commit 9bfd9db78b
12 changed files with 1473 additions and 461 deletions

View File

@@ -58,6 +58,8 @@ pub const Data = union(enum) {
temporary: []const u8,
empty: void,
pub const Empty: Data = .{ .empty = {} };
pub fn toOwned(this: @This()) !bun.ByteList {
return switch (this) {
.owned => this.owned,
@@ -202,7 +204,11 @@ pub const PostgresSQLContext = struct {
}
}
};
pub const PostgresSQLQueryResultMode = enum(u8) {
objects = 0,
values = 1,
raw = 2,
};
pub const PostgresSQLQuery = struct {
statement: ?*PostgresSQLStatement = null,
query: bun.String = bun.String.empty,
@@ -212,9 +218,15 @@ pub const PostgresSQLQuery = struct {
thisValue: JSValue = .undefined,
status: Status = Status.pending,
is_done: bool = false,
ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(1),
binary: bool = false,
flags: packed struct {
is_done: bool = false,
binary: bool = false,
bigint: bool = false,
result_mode: PostgresSQLQueryResultMode = .objects,
} = .{},
pub usingnamespace JSC.Codegen.JSPostgresSQLQuery;
const log = bun.Output.scoped(.PostgresSQLQuery, false);
@@ -474,7 +486,7 @@ pub const PostgresSQLQuery = struct {
consumePendingValue(thisValue, globalObject) orelse .undefined,
tag.toJSTag(globalObject),
tag.toJSNumber(),
PostgresSQLConnection.queriesGetCached(connection) orelse .undefined,
if (connection == .zero) .undefined else PostgresSQLConnection.queriesGetCached(connection) orelse .undefined,
});
}
@@ -489,7 +501,7 @@ pub const PostgresSQLQuery = struct {
}
pub fn call(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
const arguments = callframe.arguments_old(4).slice();
const arguments = callframe.arguments_old(5).slice();
var args = JSC.Node.ArgumentsSlice.init(globalThis.bunVM(), arguments);
defer args.deinit();
const query = args.nextEat() orelse {
@@ -509,6 +521,8 @@ pub const PostgresSQLQuery = struct {
const pending_value = args.nextEat() orelse .undefined;
const columns = args.nextEat() orelse .undefined;
const js_bigint = args.nextEat() orelse .false;
const bigint = js_bigint.isBoolean() and js_bigint.asBoolean();
if (!pending_value.jsType().isArrayLike()) {
return globalThis.throwInvalidArgumentType("query", "pendingValue", "Array");
@@ -522,6 +536,9 @@ pub const PostgresSQLQuery = struct {
ptr.* = .{
.query = query.toBunString(globalThis),
.thisValue = this_value,
.flags = .{
.bigint = bigint,
},
};
ptr.query.ref();
@@ -541,16 +558,28 @@ pub const PostgresSQLQuery = struct {
pub fn doDone(this: *@This(), globalObject: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JSError!JSValue {
_ = globalObject;
this.is_done = true;
this.flags.is_done = true;
return .undefined;
}
pub fn setMode(this: *PostgresSQLQuery, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSValue {
const js_mode = callframe.argument(0);
if (js_mode.isEmptyOrUndefinedOrNull() or !js_mode.isNumber()) {
return globalObject.throwInvalidArgumentType("setMode", "mode", "Number");
}
const mode = js_mode.coerce(i32, globalObject);
this.flags.result_mode = std.meta.intToEnum(PostgresSQLQueryResultMode, mode) catch {
return globalObject.throwInvalidArgumentTypeValue("mode", "Number", js_mode);
};
return .undefined;
}
pub fn doRun(this: *PostgresSQLQuery, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSValue {
var arguments_ = callframe.arguments_old(2);
const arguments = arguments_.slice();
const connection: *PostgresSQLConnection = arguments[0].as(PostgresSQLConnection) orelse {
return globalObject.throw("connection must be a PostgresSQLConnection", .{});
};
connection.poll_ref.ref(globalObject.bunVM());
var query = arguments[1];
if (!query.isObject()) {
@@ -563,14 +592,13 @@ pub const PostgresSQLQuery = struct {
defer query_str.deinit();
const columns_value = PostgresSQLQuery.columnsGetCached(this_value) orelse .undefined;
var signature = Signature.generate(globalObject, query_str.slice(), binding_value, columns_value) catch |err| {
var signature = Signature.generate(globalObject, query_str.slice(), binding_value, columns_value, connection.prepared_statement_id) catch |err| {
if (!globalObject.hasException())
return globalObject.throwError(err, "failed to generate signature");
return error.JSError;
};
var writer = connection.writer();
const entry = connection.statements.getOrPut(bun.default_allocator, bun.hash(signature.name)) catch |err| {
signature.deinit();
return globalObject.throwError(err, "failed to allocate statement");
@@ -589,7 +617,7 @@ pub const PostgresSQLQuery = struct {
// if it has params, we need to wait for ParamDescription to be received before we can write the data
} else {
this.binary = this.statement.?.fields.len > 0;
this.flags.binary = this.statement.?.fields.len > 0;
log("bindAndExecute", .{});
PostgresRequest.bindAndExecute(globalObject, this.statement.?, binding_value, columns_value, PostgresSQLConnection.Writer, writer) catch |err| {
if (!globalObject.hasException())
@@ -604,6 +632,8 @@ pub const PostgresSQLQuery = struct {
// If it does not have params, we can write and execute immediately in one go
if (!has_params) {
log("prepareAndQueryWithSignature", .{});
PostgresRequest.prepareAndQueryWithSignature(globalObject, query_str.slice(), binding_value, PostgresSQLConnection.Writer, writer, &signature) catch |err| {
signature.deinit();
if (!globalObject.hasException())
@@ -612,7 +642,9 @@ pub const PostgresSQLQuery = struct {
};
did_write = true;
} else {
PostgresRequest.writeQuery(query_str.slice(), signature.name, signature.fields, PostgresSQLConnection.Writer, writer) catch |err| {
log("writeQuery", .{});
PostgresRequest.writeQuery(query_str.slice(), signature.prepared_statement_name, signature.fields, PostgresSQLConnection.Writer, writer) catch |err| {
signature.deinit();
if (!globalObject.hasException())
return globalObject.throwError(err, "failed to write query");
@@ -630,7 +662,7 @@ pub const PostgresSQLQuery = struct {
const stmt = bun.default_allocator.create(PostgresSQLStatement) catch |err| {
return globalObject.throwError(err, "failed to allocate statement");
};
connection.prepared_statement_id += 1;
stmt.* = .{ .signature = signature, .ref_count = 2, .status = PostgresSQLStatement.Status.parsing };
this.statement = stmt;
entry.value_ptr.* = stmt;
@@ -868,11 +900,11 @@ pub const PostgresRequest = struct {
writer: protocol.NewWriter(Context),
signature: *Signature,
) AnyPostgresError!void {
try writeQuery(query, signature.name, signature.fields, Context, writer);
try writeBind(signature.name, bun.String.empty, globalObject, array_value, .zero, &.{}, &.{}, Context, writer);
try writeQuery(query, signature.prepared_statement_name, signature.fields, Context, writer);
try writeBind(signature.prepared_statement_name, bun.String.empty, globalObject, array_value, .zero, &.{}, &.{}, Context, writer);
var exec = protocol.Execute{
.p = .{
.prepared_statement = signature.name,
.prepared_statement = signature.prepared_statement_name,
},
};
try exec.writeInternal(Context, writer);
@@ -889,10 +921,10 @@ pub const PostgresRequest = struct {
comptime Context: type,
writer: protocol.NewWriter(Context),
) !void {
try writeBind(statement.signature.name, bun.String.empty, globalObject, array_value, columns_value, statement.parameters, statement.fields, Context, writer);
try writeBind(statement.signature.prepared_statement_name, bun.String.empty, globalObject, array_value, columns_value, statement.parameters, statement.fields, Context, writer);
var exec = protocol.Execute{
.p = .{
.prepared_statement = statement.signature.name,
.prepared_statement = statement.signature.prepared_statement_name,
},
};
try exec.writeInternal(Context, writer);
@@ -979,6 +1011,7 @@ pub const PostgresSQLConnection = struct {
globalObject: *JSC.JSGlobalObject,
statements: PreparedStatementsMap,
prepared_statement_id: u64 = 0,
pending_activity_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(0),
js_value: JSValue = JSValue.undefined,
@@ -1343,7 +1376,7 @@ pub const PostgresSQLConnection = struct {
this.ref();
defer this.deref();
if (!this.socket.isClosed()) this.socket.close();
this.refAndClose();
const on_close = this.consumeOnCloseCallback(this.globalObject) orelse return;
const loop = this.globalObject.bunVM().eventLoop();
@@ -1415,6 +1448,8 @@ pub const PostgresSQLConnection = struct {
const loop = vm.eventLoop();
loop.enter();
defer loop.exit();
this.poll_ref.unref(this.globalObject.bunVM());
this.fail("Connection closed", error.ConnectionClosed);
}
@@ -1428,7 +1463,7 @@ pub const PostgresSQLConnection = struct {
.options = Data{ .temporary = this.options },
};
msg.writeInternal(Writer, this.writer()) catch |err| {
this.socket.close();
this.refAndClose();
this.fail("Failed to write startup message", err);
};
}
@@ -1906,13 +1941,21 @@ pub const PostgresSQLConnection = struct {
bun.default_allocator.destroy(this);
}
fn refAndClose(this: *@This()) void {
if (!this.socket.isClosed()) {
// event loop need to be alive to close the socket
this.poll_ref.ref(this.globalObject.bunVM());
// will unref on socket close
this.socket.close();
}
}
pub fn disconnect(this: *@This()) void {
this.stopTimers();
if (this.status == .connected) {
this.status = .disconnected;
this.poll_ref.disable();
this.socket.close();
this.refAndClose();
}
}
@@ -2019,11 +2062,12 @@ pub const PostgresSQLConnection = struct {
json = 9,
array = 10,
typed_array = 11,
raw = 12,
};
pub const Value = extern union {
null: u8,
string: bun.WTF.StringImpl,
string: ?bun.WTF.StringImpl,
float8: f64,
int4: i32,
int8: i64,
@@ -2031,9 +2075,10 @@ pub const PostgresSQLConnection = struct {
date: f64,
date_with_time_zone: f64,
bytea: [2]usize,
json: bun.WTF.StringImpl,
json: ?bun.WTF.StringImpl,
array: Array,
typed_array: TypedArray,
raw: Raw,
};
pub const Array = extern struct {
@@ -2045,6 +2090,10 @@ pub const PostgresSQLConnection = struct {
return ptr[0..this.len];
}
};
pub const Raw = extern struct {
ptr: ?[*]const u8 = null,
len: u64,
};
pub const TypedArray = extern struct {
head_ptr: ?[*]u8 = null,
ptr: ?[*]u8 = null,
@@ -2068,10 +2117,14 @@ pub const PostgresSQLConnection = struct {
switch (this.tag) {
.string => {
this.value.string.deref();
if (this.value.string) |str| {
str.deref();
}
},
.json => {
this.value.json.deref();
if (this.value.json) |str| {
str.deref();
}
},
.bytea => {
if (this.value.bytea[1] == 0) return;
@@ -2091,8 +2144,21 @@ pub const PostgresSQLConnection = struct {
else => {},
}
}
pub fn fromBytes(binary: bool, oid: int4, bytes: []const u8, globalObject: *JSC.JSGlobalObject) !DataCell {
pub fn raw(optional_bytes: ?*Data) DataCell {
if (optional_bytes) |bytes| {
const bytes_slice = bytes.slice();
return DataCell{
.tag = .raw,
.value = .{ .raw = .{ .ptr = @ptrCast(bytes_slice.ptr), .len = bytes_slice.len } },
};
}
// TODO: check empty and null fields
return DataCell{
.tag = .null,
.value = .{ .null = 0 },
};
}
pub fn fromBytes(binary: bool, bigint: bool, oid: int4, bytes: []const u8, globalObject: *JSC.JSGlobalObject) !DataCell {
switch (@as(types.Tag, @enumFromInt(@as(short, @intCast(oid))))) {
// TODO: .int2_array, .float8_array
inline .int4_array, .float4_array => |tag| {
@@ -2121,13 +2187,13 @@ pub const PostgresSQLConnection = struct {
.ptr = null,
.len = 0,
.byte_len = 0,
.type = tag.toJSTypedArrayType(),
.type = try tag.toJSTypedArrayType(),
},
},
};
}
const elements = tag.pgArrayType().init(bytes).slice();
const elements = (try tag.pgArrayType()).init(bytes).slice();
return DataCell{
.tag = .typed_array,
@@ -2137,13 +2203,13 @@ pub const PostgresSQLConnection = struct {
.ptr = if (elements.len > 0) @ptrCast(elements.ptr) else null,
.len = @truncate(elements.len),
.byte_len = @truncate(bytes.len),
.type = tag.toJSTypedArrayType(),
.type = try tag.toJSTypedArrayType(),
},
},
};
} else {
// TODO:
return fromBytes(false, @intFromEnum(types.Tag.bytea), bytes, globalObject);
return fromBytes(false, bigint, @intFromEnum(types.Tag.bytea), bytes, globalObject);
}
},
.int4 => {
@@ -2153,6 +2219,15 @@ pub const PostgresSQLConnection = struct {
return DataCell{ .tag = .int4, .value = .{ .int4 = bun.fmt.parseInt(i32, bytes, 0) catch 0 } };
}
},
// postgres when reading bigint as int8 it returns a string unless type: { bigint: postgres.BigInt is set
.int8 => {
if (bigint) {
// .int8 is a 64-bit integer always string
return DataCell{ .tag = .int8, .value = .{ .int8 = bun.fmt.parseInt(i64, bytes, 0) catch 0 } };
} else {
return DataCell{ .tag = .string, .value = .{ .string = if (bytes.len > 0) bun.String.createUTF8(bytes).value.WTFStringImpl else null }, .free_value = 1 };
}
},
.float8 => {
if (binary and bytes.len == 8) {
return DataCell{ .tag = .float8, .value = .{ .float8 = try parseBinary(.float8, f64, bytes) } };
@@ -2170,7 +2245,7 @@ pub const PostgresSQLConnection = struct {
}
},
.jsonb, .json => {
return DataCell{ .tag = .json, .value = .{ .json = String.createUTF8(bytes).value.WTFStringImpl }, .free_value = 1 };
return DataCell{ .tag = .json, .value = .{ .json = if (bytes.len > 0) String.createUTF8(bytes).value.WTFStringImpl else null }, .free_value = 1 };
},
.bool => {
if (binary) {
@@ -2218,7 +2293,7 @@ pub const PostgresSQLConnection = struct {
}
},
else => {
return DataCell{ .tag = .string, .value = .{ .string = bun.String.createUTF8(bytes).value.WTFStringImpl }, .free_value = 1 };
return DataCell{ .tag = .string, .value = .{ .string = if (bytes.len > 0) bun.String.createUTF8(bytes).value.WTFStringImpl else null }, .free_value = 1 };
},
}
}
@@ -2309,6 +2384,7 @@ pub const PostgresSQLConnection = struct {
list: []DataCell,
fields: []const protocol.FieldDescription,
binary: bool = false,
bigint: bool = false,
count: usize = 0,
globalObject: *JSC.JSGlobalObject,
@@ -2319,26 +2395,31 @@ pub const PostgresSQLConnection = struct {
[*]DataCell,
u32,
Flags,
u8, // result_mode
) JSValue;
pub fn toJS(this: *Putter, globalObject: *JSC.JSGlobalObject, array: JSValue, structure: JSValue, flags: Flags) JSValue {
return JSC__constructObjectFromDataCell(globalObject, array, structure, this.list.ptr, @truncate(this.fields.len), flags);
pub fn toJS(this: *Putter, globalObject: *JSC.JSGlobalObject, array: JSValue, structure: JSValue, flags: Flags, result_mode: PostgresSQLQueryResultMode) JSValue {
return JSC__constructObjectFromDataCell(globalObject, array, structure, this.list.ptr, @truncate(this.fields.len), flags, @intFromEnum(result_mode));
}
pub fn put(this: *Putter, index: u32, optional_bytes: ?*Data) !bool {
fn putImpl(this: *Putter, index: u32, optional_bytes: ?*Data, comptime is_raw: bool) !bool {
const field = &this.fields[index];
const oid = field.type_oid;
debug("index: {d}, oid: {d}", .{ index, oid });
const cell: *DataCell = &this.list[index];
cell.* = if (optional_bytes) |data|
try DataCell.fromBytes(this.binary, oid, data.slice(), this.globalObject)
else
DataCell{
.tag = .null,
.value = .{
.null = 0,
},
};
if (is_raw) {
cell.* = DataCell.raw(optional_bytes);
} else {
cell.* = if (optional_bytes) |data|
try DataCell.fromBytes(this.binary, this.bigint, oid, data.slice(), this.globalObject)
else
DataCell{
.tag = .null,
.value = .{
.null = 0,
},
};
}
this.count += 1;
cell.index = switch (field.name_or_index) {
// The indexed columns can be out of order.
@@ -2357,6 +2438,13 @@ pub const PostgresSQLConnection = struct {
};
return true;
}
pub fn putRaw(this: *Putter, index: u32, optional_bytes: ?*Data) !bool {
return this.putImpl(index, optional_bytes, true);
}
pub fn put(this: *Putter, index: u32, optional_bytes: ?*Data) !bool {
return this.putImpl(index, optional_bytes, false);
}
};
};
@@ -2402,7 +2490,7 @@ pub const PostgresSQLConnection = struct {
continue;
};
req.status = .binding;
req.binary = stmt.fields.len > 0;
req.flags.binary = stmt.fields.len > 0;
any = true;
} else {
break;
@@ -2429,12 +2517,24 @@ pub const PostgresSQLConnection = struct {
.DataRow => {
const request = this.current() orelse return error.ExpectedRequest;
var statement = request.statement orelse return error.ExpectedStatement;
statement.checkForDuplicateFields();
var structure: JSValue = .undefined;
// explict use switch without else so if new modes are added, we don't forget to check for duplicate fields
switch (request.flags.result_mode) {
.objects => {
// check for duplicate fields
statement.checkForDuplicateFields();
structure = statement.structure(this.js_value, this.globalObject);
},
.raw, .values => {
// no need to check for duplicate fields or structure
},
}
var putter = DataCell.Putter{
.list = &.{},
.fields = statement.fields,
.binary = request.binary,
.binary = request.flags.binary,
.bigint = request.flags.bigint,
.globalObject = this.globalObject,
};
@@ -2452,18 +2552,29 @@ pub const PostgresSQLConnection = struct {
cells = try bun.default_allocator.alloc(DataCell, statement.fields.len);
free_cells = true;
}
// make sure all cells are reseted if reader short breaks the fields will just be null with is better than undefined behavior
@memset(cells, DataCell{ .tag = .null, .value = .{ .null = 0 } });
putter.list = cells;
try protocol.DataRow.decode(
&putter,
Context,
reader,
DataCell.Putter.put,
);
if (request.flags.result_mode == .raw) {
try protocol.DataRow.decode(
&putter,
Context,
reader,
DataCell.Putter.putRaw,
);
} else {
try protocol.DataRow.decode(
&putter,
Context,
reader,
DataCell.Putter.put,
);
}
const pending_value = if (request.thisValue == .zero) .zero else PostgresSQLQuery.pendingValueGetCached(request.thisValue) orelse .zero;
pending_value.ensureStillAlive();
const result = putter.toJS(this.globalObject, pending_value, statement.structure(this.js_value, this.globalObject), statement.fields_flags);
const result = putter.toJS(this.globalObject, pending_value, structure, statement.fields_flags, request.flags.result_mode);
if (pending_value == .zero) {
PostgresSQLQuery.pendingValueSetCached(request.thisValue, this.globalObject, result);
@@ -2837,13 +2948,18 @@ pub const PostgresSQLConnection = struct {
}
pub fn consumeOnConnectCallback(this: *const PostgresSQLConnection, globalObject: *JSC.JSGlobalObject) ?JSC.JSValue {
debug("consumeOnConnectCallback", .{});
const on_connect = PostgresSQLConnection.onconnectGetCached(this.js_value) orelse return null;
debug("consumeOnConnectCallback exists", .{});
PostgresSQLConnection.onconnectSetCached(this.js_value, globalObject, .zero);
return on_connect;
}
pub fn consumeOnCloseCallback(this: *const PostgresSQLConnection, globalObject: *JSC.JSGlobalObject) ?JSC.JSValue {
debug("consumeOnCloseCallback", .{});
const on_close = PostgresSQLConnection.oncloseGetCached(this.js_value) orelse return null;
debug("consumeOnCloseCallback exists", .{});
PostgresSQLConnection.oncloseSetCached(this.js_value, globalObject, .zero);
return on_close;
}
@@ -3102,8 +3218,11 @@ const Signature = struct {
fields: []const int4,
name: []const u8,
query: []const u8,
prepared_statement_name: []const u8,
const log = bun.Output.scoped(.PostgresSignature, false);
pub fn deinit(this: *Signature) void {
bun.default_allocator.free(this.prepared_statement_name);
bun.default_allocator.free(this.fields);
bun.default_allocator.free(this.name);
bun.default_allocator.free(this.query);
@@ -3116,7 +3235,7 @@ const Signature = struct {
return hasher.final();
}
pub fn generate(globalObject: *JSC.JSGlobalObject, query: []const u8, array_value: JSValue, columns: JSValue) !Signature {
pub fn generate(globalObject: *JSC.JSGlobalObject, query: []const u8, array_value: JSValue, columns: JSValue, prepared_statement_id: u64) !Signature {
var fields = std.ArrayList(int4).init(bun.default_allocator);
var name = try std.ArrayList(u8).initCapacity(bun.default_allocator, query.len);
@@ -3170,8 +3289,11 @@ const Signature = struct {
if (iter.anyFailed()) {
return error.InvalidQueryBinding;
}
// max u64 length is 20, max prepared_statement_name length is 63
const prepared_statement_name = try bun.fmt.allocPrint(bun.default_allocator, "P{s}${d}", .{ name.items[0..@min(40, name.items.len)], prepared_statement_id });
return Signature{
.prepared_statement_name = prepared_statement_name,
.name = name.items,
.fields = fields.items,
.query = try bun.default_allocator.dupe(u8, query),