/** * Postgres backend for the repository. * * Translates SQLite-style `?` placeholders to Postgres `$1, $2, ...` at * execute time so the Repository code can stay dialect-neutral. RETURNING * id captures lastInsertRowid (caller must add `RETURNING id` to INSERTs * that need it — same for SQLite path so the SQL strings are portable). * * Pool size: PG default of 10 — bumpable via BF_PG_POOL_MAX env if needed. */ import { Pool, type PoolClient } from "pg"; import type { DbAdapter, RunResult, Row, SqlValue } from "./db-adapter.js"; export class PgAdapter implements DbAdapter { private readonly pool: Pool; /** Per-async-context client when inside transaction(). */ private currentTxClient: PoolClient | null = null; private txDepth = 0; constructor(connectionString: string) { this.pool = new Pool({ connectionString, max: Number(process.env["BF_PG_POOL_MAX"] ?? 10), idleTimeoutMillis: 30_000, }); } private rewriteSql(sql: string): string { // `?` → `$1`, `$2`, ... Skips `?` characters inside string literals. let out = ""; let n = 0; let inString = false; let stringChar = ""; for (let i = 0; i < sql.length; i += 1) { const c = sql[i]!; if (inString) { out += c; if (c === stringChar) { // Handle '' escape. if (sql[i + 1] === stringChar) { out += sql[i + 1]; i += 1; } else inString = false; } continue; } if (c === "'" || c === '"') { inString = true; stringChar = c; out += c; continue; } if (c === "?") { n += 1; out += `$${n}`; continue; } out += c; } return out; } private coerceParams(params: ReadonlyArray): unknown[] { return params.map((v) => { if (v === 0 || v === 1) { // Could be integer or boolean. PG is strict about boolean columns // receiving integer values. We can't know the column type here, but // the `pg` driver accepts JS booleans for both INTEGER and BOOLEAN // columns, so converting 0/1 to false/true is always safe. return v === 1; } return v; }); } private async runner(fn: (c: PoolClient) => Promise): Promise { if (this.currentTxClient) return fn(this.currentTxClient); const client = await this.pool.connect(); try { return await fn(client); } finally { client.release(); } } async run(sql: string, params: ReadonlyArray = []): Promise { const pgSql = this.rewriteSql(sql); const pgParams = this.coerceParams(params); return this.runner(async (c) => { const res = await c.query(pgSql, pgParams); let lastInsertRowid = 0n; // If the caller added RETURNING id, pluck it. if (res.rows.length > 0 && res.rows[0] && "id" in res.rows[0]) { const v = (res.rows[0] as Record)["id"]; if (typeof v === "number" || typeof v === "bigint") { lastInsertRowid = BigInt(v); } } return { lastInsertRowid, changes: Number(res.rowCount ?? 0) }; }); } async get(sql: string, params: ReadonlyArray = []): Promise { const pgSql = this.rewriteSql(sql); const pgParams = this.coerceParams(params); return this.runner(async (c) => { const res = await c.query(pgSql, pgParams); return (res.rows[0] as T | undefined); }); } async all(sql: string, params: ReadonlyArray = []): Promise { const pgSql = this.rewriteSql(sql); const pgParams = this.coerceParams(params); return this.runner(async (c) => { const res = await c.query(pgSql, pgParams); return res.rows as T[]; }); } async exec(sql: string): Promise { // PG accepts multi-statement strings via simple query protocol. await this.runner(async (c) => { await c.query(sql); }); } async transaction(fn: () => Promise): Promise { if (this.currentTxClient) { // Already in a transaction — use a savepoint. this.txDepth += 1; const name = `sp_${this.txDepth}`; await this.currentTxClient.query(`SAVEPOINT ${name}`); try { const result = await fn(); await this.currentTxClient.query(`RELEASE SAVEPOINT ${name}`); this.txDepth -= 1; return result; } catch (err) { try { await this.currentTxClient.query(`ROLLBACK TO SAVEPOINT ${name}`); } catch { /* ignore */ } this.txDepth -= 1; throw err; } } const client = await this.pool.connect(); this.currentTxClient = client; this.txDepth = 1; try { await client.query("BEGIN"); const result = await fn(); await client.query("COMMIT"); return result; } catch (err) { try { await client.query("ROLLBACK"); } catch { /* ignore */ } throw err; } finally { this.currentTxClient = null; this.txDepth = 0; client.release(); } } dialect(): "postgres" { return "postgres"; } async close(): Promise { await this.pool.end(); } }