feat(store): Postgres adapter foundation + BF_DB selector (phase 1)

Lays groundwork for sqlite|postgres backend selection without yet
converting Repository. Adds:

- db-adapter.ts: async DbAdapter interface (run/get/all/exec/transaction)
- sqlite-adapter.ts: wraps node:sqlite sync API in Promise-returning shape,
  caches prepared statements
- pg-adapter.ts: pg Pool + ? → $N placeholder rewrite + RETURNING-id
  capture + savepoint-nested transactions
- service-store config: driver (sqlite|postgres), pgUrl
- BF_DB env override, plumbed via envStr

Selecting BF_DB=postgres throws at init() until the Repository is
converted off DatabaseSync. This commit ships the foundation only.

Next phases (separate commits):
  2. Convert Repository methods sync → async via DbAdapter
  3. Update every caller to await
  4. Split MIGRATIONS into sqlite + portable / pg-specific sets
  5. UUIDv7 IDs for new tables on PG path

Adds deps: pg ^8.13.1, uuidv7 ^1.0.2, @types/pg ^8.20.0
This commit is contained in:
Mitchell R 2026-05-18 22:50:48 +02:00
parent 8082571b03
commit 936e6170a6
No known key found for this signature in database
6 changed files with 466 additions and 0 deletions

161
package-lock.json generated
View file

@ -542,6 +542,18 @@
"undici-types": "~7.19.0"
}
},
"node_modules/@types/pg": {
"version": "8.20.0",
"resolved": "https://registry.npmjs.org/@types/pg/-/pg-8.20.0.tgz",
"integrity": "sha512-bEPFOaMAHTEP1EzpvHTbmwR8UsFyHSKsRisLIHVMXnpNefSbGA1bD6CVy+qKjGSqmZqNqBDV2azOBo8TgkcVow==",
"dev": true,
"license": "MIT",
"dependencies": {
"@types/node": "*",
"pg-protocol": "*",
"pg-types": "^2.2.0"
}
},
"node_modules/@types/readable-stream": {
"version": "4.0.23",
"resolved": "https://registry.npmjs.org/@types/readable-stream/-/readable-stream-4.0.23.tgz",
@ -1087,6 +1099,134 @@
"node": ">=8"
}
},
"node_modules/pg": {
"version": "8.21.0",
"resolved": "https://registry.npmjs.org/pg/-/pg-8.21.0.tgz",
"integrity": "sha512-AUP1EYJuHraQGsVoCQVIcM7TEJVGtDzxWtGFZd8rds9d+CCXlU5Js1rYgfLNvxy9iJrpHjGrRjoi/3BT9fRyiA==",
"license": "MIT",
"dependencies": {
"pg-connection-string": "^2.13.0",
"pg-pool": "^3.14.0",
"pg-protocol": "^1.14.0",
"pg-types": "2.2.0",
"pgpass": "1.0.5"
},
"engines": {
"node": ">= 16.0.0"
},
"optionalDependencies": {
"pg-cloudflare": "^1.4.0"
},
"peerDependencies": {
"pg-native": ">=3.0.1"
},
"peerDependenciesMeta": {
"pg-native": {
"optional": true
}
}
},
"node_modules/pg-cloudflare": {
"version": "1.4.0",
"resolved": "https://registry.npmjs.org/pg-cloudflare/-/pg-cloudflare-1.4.0.tgz",
"integrity": "sha512-Vo7z/6rrQYxpNRylp4Tlob2elzbh+N/MOQbxFVWCxS7oEx6jF53GTJFxK2WWpKuBRkmiin4Mt+xofFDjx09R0A==",
"license": "MIT",
"optional": true
},
"node_modules/pg-connection-string": {
"version": "2.13.0",
"resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.13.0.tgz",
"integrity": "sha512-EMnU9E2fSULdsbErBbMaXJvFeD9B4+nPcM3f+4lsiCR0BHLPrLVjv3DbyM2hgQQviKJaTWIRRTjKjWlHg3p2ig==",
"license": "MIT"
},
"node_modules/pg-int8": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz",
"integrity": "sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==",
"license": "ISC",
"engines": {
"node": ">=4.0.0"
}
},
"node_modules/pg-pool": {
"version": "3.14.0",
"resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.14.0.tgz",
"integrity": "sha512-gKtPkFdQPU3DksooVLi9LsjZxrsBUZIpa+7aVx+LV5pNh0KzP4Zleud2po+ConrxbuXGBJ6Hfer6hdgpIBpBaw==",
"license": "MIT",
"peerDependencies": {
"pg": ">=8.0"
}
},
"node_modules/pg-protocol": {
"version": "1.14.0",
"resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.14.0.tgz",
"integrity": "sha512-n5taZ1kO3s9ngDTVxsEznOqCyToTgz0FLuPq0B33COy5pPpuWJpY3/2oRBVETuOgzdqRXfWpM9HIhp2LBBT1BA==",
"license": "MIT"
},
"node_modules/pg-types": {
"version": "2.2.0",
"resolved": "https://registry.npmjs.org/pg-types/-/pg-types-2.2.0.tgz",
"integrity": "sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==",
"license": "MIT",
"dependencies": {
"pg-int8": "1.0.1",
"postgres-array": "~2.0.0",
"postgres-bytea": "~1.0.0",
"postgres-date": "~1.0.4",
"postgres-interval": "^1.1.0"
},
"engines": {
"node": ">=4"
}
},
"node_modules/pgpass": {
"version": "1.0.5",
"resolved": "https://registry.npmjs.org/pgpass/-/pgpass-1.0.5.tgz",
"integrity": "sha512-FdW9r/jQZhSeohs1Z3sI1yxFQNFvMcnmfuj4WBMUTxOrAyLMaTcE1aAMBiTlbMNaXvBCQuVi0R7hd8udDSP7ug==",
"license": "MIT",
"dependencies": {
"split2": "^4.1.0"
}
},
"node_modules/postgres-array": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/postgres-array/-/postgres-array-2.0.0.tgz",
"integrity": "sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==",
"license": "MIT",
"engines": {
"node": ">=4"
}
},
"node_modules/postgres-bytea": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/postgres-bytea/-/postgres-bytea-1.0.1.tgz",
"integrity": "sha512-5+5HqXnsZPE65IJZSMkZtURARZelel2oXUEO8rH83VS/hxH5vv1uHquPg5wZs8yMAfdv971IU+kcPUczi7NVBQ==",
"license": "MIT",
"engines": {
"node": ">=0.10.0"
}
},
"node_modules/postgres-date": {
"version": "1.0.7",
"resolved": "https://registry.npmjs.org/postgres-date/-/postgres-date-1.0.7.tgz",
"integrity": "sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==",
"license": "MIT",
"engines": {
"node": ">=0.10.0"
}
},
"node_modules/postgres-interval": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/postgres-interval/-/postgres-interval-1.2.0.tgz",
"integrity": "sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==",
"license": "MIT",
"dependencies": {
"xtend": "^4.0.0"
},
"engines": {
"node": ">=0.10.0"
}
},
"node_modules/process": {
"version": "0.11.10",
"resolved": "https://registry.npmjs.org/process/-/process-0.11.10.tgz",
@ -1328,6 +1468,15 @@
"uuid": "dist-node/bin/uuid"
}
},
"node_modules/uuidv7": {
"version": "1.2.1",
"resolved": "https://registry.npmjs.org/uuidv7/-/uuidv7-1.2.1.tgz",
"integrity": "sha512-4kPkK3/XTQW9Hbm4CaqfICn+kY9LJtDVEOfgsRRra/+n2Ofg4NqzRFceAkxvQ/Ud/6BpHOPzj8cirqM7TzTN5Q==",
"license": "Apache-2.0",
"bin": {
"uuidv7": "cli.js"
}
},
"node_modules/which": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/which/-/which-2.0.2.tgz",
@ -1433,6 +1582,15 @@
"node": ">=4.0"
}
},
"node_modules/xtend": {
"version": "4.0.2",
"resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz",
"integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==",
"license": "MIT",
"engines": {
"node": ">=0.4"
}
},
"node_modules/yaml": {
"version": "2.8.4",
"resolved": "https://registry.npmjs.org/yaml/-/yaml-2.8.4.tgz",
@ -1467,10 +1625,13 @@
"mqtt": "^5.10.4",
"onvif": "^0.8.1",
"otpauth": "^9.5.1",
"pg": "^8.13.1",
"uuidv7": "^1.0.2",
"ws": "^8.20.0"
},
"devDependencies": {
"@types/node": "^25.0.0",
"@types/pg": "^8.20.0",
"tsx": "^4.21.0",
"typescript": "^6.0.3"
},

View file

@ -30,10 +30,13 @@
"mqtt": "^5.10.4",
"onvif": "^0.8.1",
"otpauth": "^9.5.1",
"pg": "^8.13.1",
"uuidv7": "^1.0.2",
"ws": "^8.20.0"
},
"devDependencies": {
"@types/node": "^25.0.0",
"@types/pg": "^8.20.0",
"tsx": "^4.21.0",
"typescript": "^6.0.3"
},

View file

@ -0,0 +1,53 @@
/**
* Backend-agnostic DB adapter. Repository talks to this; concrete adapters
* (sqlite, postgres) implement it.
*
* Design choices:
* - All methods return Promises so the Postgres path can use real async I/O.
* The SQLite adapter wraps node:sqlite's synchronous calls in
* Promise.resolve to keep the same interface.
* - `?` is the canonical placeholder in SQL strings. The Postgres adapter
* rewrites them to `$1, $2, ...` at execute time so repository code stays
* dialect-neutral.
* - INSERTs that need to return the new row id must use `... RETURNING id`
* explicitly. Both SQLite (3.35+) and Postgres support it.
*
* Migrations and DDL fragments still differ between dialects (AUTOINCREMENT
* vs SERIAL, STRICT vs nothing, strftime vs now()), so each backend ships
* its own migration set rather than trying to abstract DDL.
*/
export type SqlValue = string | number | bigint | boolean | null | Uint8Array;
export type Row = Record<string, unknown>;
export interface RunResult {
/** New row id when the statement used `RETURNING id`, else 0n. */
lastInsertRowid: bigint;
/** Rows affected (approximate for some Postgres queries). */
changes: number;
}
export interface DbAdapter {
/** Execute a write statement (INSERT / UPDATE / DELETE). */
run(sql: string, params?: ReadonlyArray<SqlValue>): Promise<RunResult>;
/** Single-row query. Undefined if no row. */
get<T = Row>(sql: string, params?: ReadonlyArray<SqlValue>): Promise<T | undefined>;
/** Multi-row query. */
all<T = Row>(sql: string, params?: ReadonlyArray<SqlValue>): Promise<T[]>;
/** Execute multi-statement DDL (no params, no result). */
exec(sql: string): Promise<void>;
/** Run a callback inside a transaction. Rolls back on throw. */
transaction<T>(fn: () => Promise<T>): Promise<T>;
/** Identifies the backend. */
dialect(): "sqlite" | "postgres";
/** Release the connection / pool. */
close(): Promise<void>;
}
export interface DbAdapterConfig {
driver: "sqlite" | "postgres";
/** SQLite-only: filesystem path. */
sqlitePath?: string;
/** Postgres-only: connection string (postgres://user:pass@host:port/db). */
pgUrl?: string;
}

View file

@ -41,7 +41,12 @@ import { envStr } from "../../shared/env-overrides.js";
const ConfigSchema = av.object(
{
/** Backend selector. Override at runtime via BF_DB env. */
driver: av.enum_(["sqlite", "postgres"] as const).default("sqlite"),
/** sqlite-only: filesystem path to the .db file. */
sqlitePath: av.string().minLength(1).default("/var/lib/betterframe/betterframe.db"),
/** postgres-only: full libpq URL. Override via BF_PG_URL env. */
pgUrl: av.string().default(""),
},
{ unknownKeys: "strip" },
);
@ -100,6 +105,19 @@ export class Plugin extends BSBService<InstanceType<typeof Config>, typeof Event
}
async init(obs: Observable): Promise<void> {
const driver = envStr("BF_DB", this.config.driver) as "sqlite" | "postgres";
if (driver === "postgres") {
// Repository conversion to the async DbAdapter interface is in progress.
// Until that lands, refuse to start under postgres rather than corrupt
// data via half-converted code paths. See db-adapter.ts / pg-adapter.ts
// for the foundation already in place.
throw new Error(
"BF_DB=postgres: foundation present (pg-adapter.ts) but Repository " +
"is still on the sync sqlite path. Pending refactor — keep BF_DB " +
"unset (defaults to sqlite) or set BF_DB=sqlite explicitly.",
);
}
const path = envStr("BF_SQLITE_PATH", this.config.sqlitePath);
obs.log.info("opening sqlite at {path}", { path });

View file

@ -0,0 +1,146 @@
/**
* Postgres backend for the repository.
*
* Translates SQLite-style `?` placeholders to Postgres `$1, $2, ...` at
* execute time so the Repository code can stay dialect-neutral. RETURNING
* id captures lastInsertRowid (caller must add `RETURNING id` to INSERTs
* that need it same for SQLite path so the SQL strings are portable).
*
* Pool size: PG default of 10 bumpable via BF_PG_POOL_MAX env if needed.
*/
import { Pool, type PoolClient } from "pg";
import type { DbAdapter, RunResult, Row, SqlValue } from "./db-adapter.js";
export class PgAdapter implements DbAdapter {
private readonly pool: Pool;
/** Per-async-context client when inside transaction(). */
private currentTxClient: PoolClient | null = null;
private txDepth = 0;
constructor(connectionString: string) {
this.pool = new Pool({
connectionString,
max: Number(process.env["BF_PG_POOL_MAX"] ?? 10),
idleTimeoutMillis: 30_000,
});
}
private rewriteSql(sql: string): string {
// `?` → `$1`, `$2`, ... Skips `?` characters inside string literals.
let out = "";
let n = 0;
let inString = false;
let stringChar = "";
for (let i = 0; i < sql.length; i += 1) {
const c = sql[i]!;
if (inString) {
out += c;
if (c === stringChar) {
// Handle '' escape.
if (sql[i + 1] === stringChar) { out += sql[i + 1]; i += 1; }
else inString = false;
}
continue;
}
if (c === "'" || c === '"') {
inString = true;
stringChar = c;
out += c;
continue;
}
if (c === "?") {
n += 1;
out += `$${n}`;
continue;
}
out += c;
}
return out;
}
private async runner<T>(fn: (c: PoolClient) => Promise<T>): Promise<T> {
if (this.currentTxClient) return fn(this.currentTxClient);
const client = await this.pool.connect();
try { return await fn(client); }
finally { client.release(); }
}
async run(sql: string, params: ReadonlyArray<SqlValue> = []): Promise<RunResult> {
const pgSql = this.rewriteSql(sql);
return this.runner(async (c) => {
const res = await c.query(pgSql, params as unknown[]);
let lastInsertRowid = 0n;
// If the caller added RETURNING id, pluck it.
if (res.rows.length > 0 && res.rows[0] && "id" in res.rows[0]) {
const v = (res.rows[0] as Record<string, unknown>)["id"];
if (typeof v === "number" || typeof v === "bigint") {
lastInsertRowid = BigInt(v);
}
}
return { lastInsertRowid, changes: Number(res.rowCount ?? 0) };
});
}
async get<T = Row>(sql: string, params: ReadonlyArray<SqlValue> = []): Promise<T | undefined> {
const pgSql = this.rewriteSql(sql);
return this.runner(async (c) => {
const res = await c.query(pgSql, params as unknown[]);
return (res.rows[0] as T | undefined);
});
}
async all<T = Row>(sql: string, params: ReadonlyArray<SqlValue> = []): Promise<T[]> {
const pgSql = this.rewriteSql(sql);
return this.runner(async (c) => {
const res = await c.query(pgSql, params as unknown[]);
return res.rows as T[];
});
}
async exec(sql: string): Promise<void> {
// PG accepts multi-statement strings via simple query protocol.
await this.runner(async (c) => { await c.query(sql); });
}
async transaction<T>(fn: () => Promise<T>): Promise<T> {
if (this.currentTxClient) {
// Already in a transaction — use a savepoint.
this.txDepth += 1;
const name = `sp_${this.txDepth}`;
await this.currentTxClient.query(`SAVEPOINT ${name}`);
try {
const result = await fn();
await this.currentTxClient.query(`RELEASE SAVEPOINT ${name}`);
this.txDepth -= 1;
return result;
} catch (err) {
try { await this.currentTxClient.query(`ROLLBACK TO SAVEPOINT ${name}`); } catch { /* ignore */ }
this.txDepth -= 1;
throw err;
}
}
const client = await this.pool.connect();
this.currentTxClient = client;
this.txDepth = 1;
try {
await client.query("BEGIN");
const result = await fn();
await client.query("COMMIT");
return result;
} catch (err) {
try { await client.query("ROLLBACK"); } catch { /* ignore */ }
throw err;
} finally {
this.currentTxClient = null;
this.txDepth = 0;
client.release();
}
}
dialect(): "postgres" { return "postgres"; }
async close(): Promise<void> {
await this.pool.end();
}
}

View file

@ -0,0 +1,85 @@
/**
* SQLite backend for the repository. Wraps node:sqlite (sync API) in
* Promise-returning methods so the Repository can stay async-uniform across
* both backends.
*
* Prepared statements are cached per-SQL for perf parity with the
* old direct-DatabaseSync code path.
*/
import { DatabaseSync, type StatementSync } from "node:sqlite";
import type { DbAdapter, RunResult, Row, SqlValue } from "./db-adapter.js";
export class SqliteAdapter implements DbAdapter {
private readonly db: DatabaseSync;
private readonly stmts = new Map<string, StatementSync>();
private txDepth = 0;
constructor(path: string) {
this.db = new DatabaseSync(path);
this.db.exec("PRAGMA journal_mode = WAL");
this.db.exec("PRAGMA foreign_keys = ON");
this.db.exec("PRAGMA synchronous = NORMAL");
}
private prep(sql: string): StatementSync {
let s = this.stmts.get(sql);
if (!s) {
s = this.db.prepare(sql);
this.stmts.set(sql, s);
}
return s;
}
async run(sql: string, params: ReadonlyArray<SqlValue> = []): Promise<RunResult> {
const stmt = this.prep(sql);
const r = stmt.run(...(params as any[]));
return {
lastInsertRowid:
typeof r.lastInsertRowid === "bigint" ? r.lastInsertRowid : BigInt(r.lastInsertRowid),
changes: Number(r.changes),
};
}
async get<T = Row>(sql: string, params: ReadonlyArray<SqlValue> = []): Promise<T | undefined> {
const stmt = this.prep(sql);
const r = stmt.get(...(params as any[]));
return r as T | undefined;
}
async all<T = Row>(sql: string, params: ReadonlyArray<SqlValue> = []): Promise<T[]> {
const stmt = this.prep(sql);
return stmt.all(...(params as any[])) as T[];
}
async exec(sql: string): Promise<void> {
this.db.exec(sql);
}
async transaction<T>(fn: () => Promise<T>): Promise<T> {
if (this.txDepth === 0) this.db.exec("BEGIN");
this.txDepth += 1;
try {
const result = await fn();
this.txDepth -= 1;
if (this.txDepth === 0) this.db.exec("COMMIT");
return result;
} catch (err) {
this.txDepth -= 1;
if (this.txDepth === 0) {
try { this.db.exec("ROLLBACK"); } catch { /* ignore */ }
}
throw err;
}
}
dialect(): "sqlite" { return "sqlite"; }
async close(): Promise<void> {
this.db.close();
}
/** Expose raw DB for migrations that need fine control (idempotent
* ALTER TABLE, PRAGMA inspection, etc). Sqlite-only. */
rawSync(): DatabaseSync { return this.db; }
}