feat(sql) transactions, savepoints, connection pooling and reserve (#16381)

This commit is contained in:
Ciro Spaciari
2025-01-18 16:03:42 -08:00
committed by GitHub
parent 87dedd109a
commit ba930ad54a
9 changed files with 1882 additions and 309 deletions

View File

@@ -1,4 +1,5 @@
import { postgres, sql } from "bun:sql";
import { sql } from "bun";
const postgres = (...args) => new sql(...args);
import { expect, test, mock } from "bun:test";
import { $ } from "bun";
import { bunExe, isCI, withoutAggressiveGC } from "harness";
@@ -382,168 +383,240 @@ if (!isCI && hasPsql) {
}
});
// t('Throws on illegal transactions', async() => {
// const sql = postgres({ ...options, max: 2, fetch_types: false })
// const error = await sql`begin`.catch(e => e)
// return [
// error.code,
// 'UNSAFE_TRANSACTION'
// ]
// })
test("Throws on illegal transactions", async () => {
const sql = postgres({ ...options, max: 2, fetch_types: false });
const error = await sql`begin`.catch(e => e);
return expect(error.code).toBe("ERR_POSTGRES_UNSAFE_TRANSACTION");
});
// t('Transaction throws', async() => {
// await sql`create table test (a int)`
// return ['22P02', await sql.begin(async sql => {
// await sql`insert into test values(1)`
// await sql`insert into test values('hej')`
// }).catch(x => x.code), await sql`drop table test`]
// })
test("Transaction throws", async () => {
await sql`create table if not exists test (a int)`;
try {
expect(
await sql
.begin(async sql => {
await sql`insert into test values(1)`;
await sql`insert into test values('hej')`;
})
.catch(e => e.errno),
).toBe(22);
} finally {
await sql`drop table test`;
}
});
// t('Transaction rolls back', async() => {
// await sql`create table test (a int)`
// await sql.begin(async sql => {
// await sql`insert into test values(1)`
// await sql`insert into test values('hej')`
// }).catch(() => { /* ignore */ })
// return [0, (await sql`select a from test`).count, await sql`drop table test`]
// })
test("Transaction rolls back", async () => {
await sql`create table if not exists test (a int)`;
// t('Transaction throws on uncaught savepoint', async() => {
// await sql`create table test (a int)`
try {
await sql
.begin(async sql => {
await sql`insert into test values(1)`;
await sql`insert into test values('hej')`;
})
.catch(() => {
/* ignore */
});
// return ['fail', (await sql.begin(async sql => {
// await sql`insert into test values(1)`
// await sql.savepoint(async sql => {
// await sql`insert into test values(2)`
// throw new Error('fail')
// })
// }).catch((err) => err.message)), await sql`drop table test`]
// })
expect((await sql`select a from test`).count).toBe(0);
} finally {
await sql`drop table test`;
}
});
// t('Transaction throws on uncaught named savepoint', async() => {
// await sql`create table test (a int)`
test("Transaction throws on uncaught savepoint", async () => {
await sql`create table test (a int)`;
try {
expect(
await sql
.begin(async sql => {
await sql`insert into test values(1)`;
await sql.savepoint(async sql => {
await sql`insert into test values(2)`;
throw new Error("fail");
});
})
.catch(err => err.message),
).toBe("fail");
} finally {
await sql`drop table test`;
}
});
// return ['fail', (await sql.begin(async sql => {
// await sql`insert into test values(1)`
// await sql.savepoit('watpoint', async sql => {
// await sql`insert into test values(2)`
// throw new Error('fail')
// })
// }).catch(() => 'fail')), await sql`drop table test`]
// })
test("Transaction throws on uncaught named savepoint", async () => {
await sql`create table test (a int)`;
try {
expect(
await sql
.begin(async sql => {
await sql`insert into test values(1)`;
await sql.savepoit("watpoint", async sql => {
await sql`insert into test values(2)`;
throw new Error("fail");
});
})
.catch(() => "fail"),
).toBe("fail");
} finally {
await sql`drop table test`;
}
});
// t('Transaction succeeds on caught savepoint', async() => {
// await sql`create table test (a int)`
// await sql.begin(async sql => {
// await sql`insert into test values(1)`
// await sql.savepoint(async sql => {
// await sql`insert into test values(2)`
// throw new Error('please rollback')
// }).catch(() => { /* ignore */ })
// await sql`insert into test values(3)`
// })
test("Transaction succeeds on caught savepoint", async () => {
try {
await sql`create table test (a int)`;
await sql.begin(async sql => {
await sql`insert into test values(1)`;
await sql
.savepoint(async sql => {
await sql`insert into test values(2)`;
throw new Error("please rollback");
})
.catch(() => {
/* ignore */
});
await sql`insert into test values(3)`;
});
expect((await sql`select count(1) from test`)[0].count).toBe("2");
} finally {
await sql`drop table test`;
}
});
// return ['2', (await sql`select count(1) from test`)[0].count, await sql`drop table test`]
// })
test("Savepoint returns Result", async () => {
let result;
await sql.begin(async t => {
result = await t.savepoint(s => s`select 1 as x`);
});
expect(result[0]?.x).toBe(1);
});
// t('Savepoint returns Result', async() => {
// let result
// await sql.begin(async sql => {
// result = await sql.savepoint(sql =>
// sql`select 1 as x`
// )
// })
// return [1, result[0].x]
// })
// t('Prepared transaction', async() => {
// await sql`create table test (a int)`
// test("Prepared transaction", async () => {
// await sql`create table test (a int)`;
// await sql.begin(async sql => {
// await sql`insert into test values(1)`
// await sql.prepare('tx1')
// })
// await sql`insert into test values(1)`;
// await sql.prepare("tx1");
// });
// await sql`commit prepared 'tx1'`
// return ['1', (await sql`select count(1) from test`)[0].count, await sql`drop table test`]
// })
// t('Transaction requests are executed implicitly', async() => {
// const sql = postgres({ debug: true, idle_timeout: 1, fetch_types: false })
// return [
// 'testing',
// (await sql.begin(sql => [
// sql`select set_config('bun_sql.test', 'testing', true)`,
// sql`select current_setting('bun_sql.test') as x`
// ]))[1][0].x
// ]
// })
// t('Uncaught transaction request errors bubbles to transaction', async() => [
// '42703',
// (await sql.begin(sql => [
// sql`select wat`,
// sql`select current_setting('bun_sql.test') as x, ${ 1 } as a`
// ]).catch(e => e.code))
// ])
// t('Fragments in transactions', async() => [
// true,
// (await sql.begin(sql => sql`select true as x where ${ sql`1=1` }`))[0].x
// ])
// t('Transaction rejects with rethrown error', async() => [
// 'WAT',
// await sql.begin(async sql => {
// try {
// await sql`select exception`
// } catch (ex) {
// throw new Error('WAT')
// }
// }).catch(e => e.message)
// ])
// t('Parallel transactions', async() => {
// await sql`create table test (a int)`
// return ['11', (await Promise.all([
// sql.begin(sql => sql`select 1`),
// sql.begin(sql => sql`select 1`)
// ])).map(x => x.count).join(''), await sql`drop table test`]
// })
// t("Many transactions at beginning of connection", async () => {
// const sql = postgres(options);
// const xs = await Promise.all(Array.from({ length: 100 }, () => sql.begin(sql => sql`select 1`)));
// return [100, xs.length];
// await sql`commit prepared 'tx1'`;
// try {
// expect((await sql`select count(1) from test`)[0].count).toBe("1");
// } finally {
// await sql`drop table test`;
// }
// });
// t('Transactions array', async() => {
// await sql`create table test (a int)`
test("Prepared transaction", async () => {
await sql`create table test (a int)`;
// return ['11', (await sql.begin(sql => [
// sql`select 1`.then(x => x),
// sql`select 1`
// ])).map(x => x.count).join(''), await sql`drop table test`]
// })
try {
await sql.beginDistributed("tx1", async sql => {
await sql`insert into test values(1)`;
});
// t('Transaction waits', async() => {
// await sql`create table test (a int)`
// await sql.begin(async sql => {
// await sql`insert into test values(1)`
// await sql.savepoint(async sql => {
// await sql`insert into test values(2)`
// throw new Error('please rollback')
// }).catch(() => { /* ignore */ })
// await sql`insert into test values(3)`
// })
await sql.commitDistributed("tx1");
expect((await sql`select count(1) from test`)[0].count).toBe("1");
} finally {
await sql`drop table test`;
}
});
// return ['11', (await Promise.all([
// sql.begin(sql => sql`select 1`),
// sql.begin(sql => sql`select 1`)
// ])).map(x => x.count).join(''), await sql`drop table test`]
// })
test("Transaction requests are executed implicitly", async () => {
const sql = postgres({ ...options, debug: true, idle_timeout: 1, fetch_types: false });
expect(
(
await sql.begin(sql => [
sql`select set_config('bun_sql.test', 'testing', true)`,
sql`select current_setting('bun_sql.test') as x`,
])
)[1][0].x,
).toBe("testing");
});
test("Uncaught transaction request errosó rs bubbles to transaction", async () => {
const sql = postgres({ ...options, debug: true, idle_timeout: 1, fetch_types: false });
expect(
await sql
.begin(sql => [sql`select wat`, sql`select current_setting('bun_sql.test') as x, ${1} as a`])
.catch(e => e.errno),
).toBe(42703);
});
// test.only("Fragments in transactions", async () => {
// const sql = postgres({ ...options, debug: true, idle_timeout: 1, fetch_types: false });
// expect((await sql.begin(sql => sql`select true as x where ${sql`1=1`}`))[0].x).toBe(true);
// });
test("Transaction rejects with rethrown error", async () => {
await using sql = postgres({ ...options });
expect(
await sql
.begin(async sql => {
try {
await sql`select exception`;
} catch (ex) {
throw new Error("WAT");
}
})
.catch(e => e.message),
).toBe("WAT");
});
test("Parallel transactions", async () => {
await sql`create table test (a int)`;
expect(
(await Promise.all([sql.begin(sql => sql`select 1 as count`), sql.begin(sql => sql`select 1 as count`)]))
.map(x => x[0].count)
.join(""),
).toBe("11");
await sql`drop table test`;
});
test("Many transactions at beginning of connection", async () => {
await using sql = postgres(options);
const xs = await Promise.all(Array.from({ length: 100 }, () => sql.begin(sql => sql`select 1`)));
return expect(xs.length).toBe(100);
});
test("Transactions array", async () => {
await using sql = postgres(options);
await sql`create table test (a int)`;
try {
expect(
(await sql.begin(sql => [sql`select 1 as count`, sql`select 1 as count`])).map(x => x[0].count).join(""),
).toBe("11");
} finally {
await sql`drop table test`;
}
});
test("Transaction waits", async () => {
await using sql = postgres({ ...options });
await sql`create table test (a int)`;
try {
await sql.begin(async sql => {
await sql`insert into test values(1)`;
await sql
.savepoint(async sql => {
await sql`insert into test values(2)`;
throw new Error("please rollback");
})
.catch(() => {
/* ignore */
});
await sql`insert into test values(3)`;
});
expect(
(await Promise.all([sql.begin(sql => sql`select 1 as count`), sql.begin(sql => sql`select 1 as count`)]))
.map(x => x[0].count)
.join(""),
).toBe("11");
} finally {
await sql`drop table test`;
}
});
// t('Helpers in Transaction', async() => {
// return ['1', (await sql.begin(async sql =>