BetterFrame/server/src/plugins/service-store/pg-adapter.ts

147 lines
4.5 KiB
TypeScript
Raw Normal View History

/**
* Postgres backend for the repository.
*
* Translates SQLite-style `?` placeholders to Postgres `$1, $2, ...` at
* execute time so the Repository code can stay dialect-neutral. RETURNING
* id captures lastInsertRowid (caller must add `RETURNING id` to INSERTs
* that need it same for SQLite path so the SQL strings are portable).
*
* Pool size: PG default of 10 bumpable via BF_PG_POOL_MAX env if needed.
*/
import { Pool, type PoolClient } from "pg";
import type { DbAdapter, RunResult, Row, SqlValue } from "./db-adapter.js";
export class PgAdapter implements DbAdapter {
private readonly pool: Pool;
/** Per-async-context client when inside transaction(). */
private currentTxClient: PoolClient | null = null;
private txDepth = 0;
constructor(connectionString: string) {
this.pool = new Pool({
connectionString,
max: Number(process.env["BF_PG_POOL_MAX"] ?? 10),
idleTimeoutMillis: 30_000,
});
}
private rewriteSql(sql: string): string {
// `?` → `$1`, `$2`, ... Skips `?` characters inside string literals.
let out = "";
let n = 0;
let inString = false;
let stringChar = "";
for (let i = 0; i < sql.length; i += 1) {
const c = sql[i]!;
if (inString) {
out += c;
if (c === stringChar) {
// Handle '' escape.
if (sql[i + 1] === stringChar) { out += sql[i + 1]; i += 1; }
else inString = false;
}
continue;
}
if (c === "'" || c === '"') {
inString = true;
stringChar = c;
out += c;
continue;
}
if (c === "?") {
n += 1;
out += `$${n}`;
continue;
}
out += c;
}
return out;
}
private async runner<T>(fn: (c: PoolClient) => Promise<T>): Promise<T> {
if (this.currentTxClient) return fn(this.currentTxClient);
const client = await this.pool.connect();
try { return await fn(client); }
finally { client.release(); }
}
async run(sql: string, params: ReadonlyArray<SqlValue> = []): Promise<RunResult> {
const pgSql = this.rewriteSql(sql);
return this.runner(async (c) => {
const res = await c.query(pgSql, params as unknown[]);
let lastInsertRowid = 0n;
// If the caller added RETURNING id, pluck it.
if (res.rows.length > 0 && res.rows[0] && "id" in res.rows[0]) {
const v = (res.rows[0] as Record<string, unknown>)["id"];
if (typeof v === "number" || typeof v === "bigint") {
lastInsertRowid = BigInt(v);
}
}
return { lastInsertRowid, changes: Number(res.rowCount ?? 0) };
});
}
async get<T = Row>(sql: string, params: ReadonlyArray<SqlValue> = []): Promise<T | undefined> {
const pgSql = this.rewriteSql(sql);
return this.runner(async (c) => {
const res = await c.query(pgSql, params as unknown[]);
return (res.rows[0] as T | undefined);
});
}
async all<T = Row>(sql: string, params: ReadonlyArray<SqlValue> = []): Promise<T[]> {
const pgSql = this.rewriteSql(sql);
return this.runner(async (c) => {
const res = await c.query(pgSql, params as unknown[]);
return res.rows as T[];
});
}
async exec(sql: string): Promise<void> {
// PG accepts multi-statement strings via simple query protocol.
await this.runner(async (c) => { await c.query(sql); });
}
async transaction<T>(fn: () => Promise<T>): Promise<T> {
if (this.currentTxClient) {
// Already in a transaction — use a savepoint.
this.txDepth += 1;
const name = `sp_${this.txDepth}`;
await this.currentTxClient.query(`SAVEPOINT ${name}`);
try {
const result = await fn();
await this.currentTxClient.query(`RELEASE SAVEPOINT ${name}`);
this.txDepth -= 1;
return result;
} catch (err) {
try { await this.currentTxClient.query(`ROLLBACK TO SAVEPOINT ${name}`); } catch { /* ignore */ }
this.txDepth -= 1;
throw err;
}
}
const client = await this.pool.connect();
this.currentTxClient = client;
this.txDepth = 1;
try {
await client.query("BEGIN");
const result = await fn();
await client.query("COMMIT");
return result;
} catch (err) {
try { await client.query("ROLLBACK"); } catch { /* ignore */ }
throw err;
} finally {
this.currentTxClient = null;
this.txDepth = 0;
client.release();
}
}
dialect(): "postgres" { return "postgres"; }
async close(): Promise<void> {
await this.pool.end();
}
}