refactor(db): move service-store from BSB plugin to shared/db library

Each service plugin now independently initializes its own DB connection
via shared/db/init.ts instead of depending on a central service-store
plugin. This removes the inter-plugin dependency ordering and the
plugin-registry singleton, making each service self-contained.

- Move db-adapter, repository, mappers, migrations, adapters to shared/db/
- Create shared/db/config.ts (reusable dbConfigSchema) and init.ts
- Delete service-store plugin and plugin-registry
- Add db config block to each service's ConfigSchema + sec-config template
- Move event_log purge timer into service-admin-http
- Update all import paths across shared modules and plugins

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mitchell R 2026-05-24 02:48:32 +02:00
parent 4062b8bb6f
commit 0479cb7b4b
No known key found for this signature in database
23 changed files with 252 additions and 329 deletions

View file

@ -18,25 +18,20 @@ default:
plugin: events-default plugin: events-default
enabled: true enabled: true
services: services:
service-store:
package: betterframe
plugin: service-store
enabled: true
config:
driver: ${BF_DB_DRIVER}
sqlitePath: /var/lib/betterframe/betterframe.db
pgHost: ${BF_PG_HOST}
pgPort: ${BF_PG_PORT}
pgDatabase: ${BF_PG_DATABASE}
pgUser: ${BF_PG_USER}
pgPassword: ${BF_PG_PASSWORD}
pgPoolMax: ${BF_PG_POOL_MAX}
service-admin-http: service-admin-http:
package: betterframe package: betterframe
plugin: service-admin-http plugin: service-admin-http
enabled: true enabled: true
config: config:
db:
driver: ${BF_DB_DRIVER}
sqlitePath: /var/lib/betterframe/betterframe.db
pgHost: ${BF_PG_HOST}
pgPort: ${BF_PG_PORT}
pgDatabase: ${BF_PG_DATABASE}
pgUser: ${BF_PG_USER}
pgPassword: ${BF_PG_PASSWORD}
pgPoolMax: ${BF_PG_POOL_MAX}
host: 0.0.0.0 host: 0.0.0.0
port: 18080 port: 18080
dataDir: /var/lib/betterframe dataDir: /var/lib/betterframe
@ -61,6 +56,15 @@ default:
plugin: service-api-http plugin: service-api-http
enabled: true enabled: true
config: config:
db:
driver: ${BF_DB_DRIVER}
sqlitePath: /var/lib/betterframe/betterframe.db
pgHost: ${BF_PG_HOST}
pgPort: ${BF_PG_PORT}
pgDatabase: ${BF_PG_DATABASE}
pgUser: ${BF_PG_USER}
pgPassword: ${BF_PG_PASSWORD}
pgPoolMax: ${BF_PG_POOL_MAX}
host: 0.0.0.0 host: 0.0.0.0
port: 18081 port: 18081
codeTtlSeconds: 600 codeTtlSeconds: 600
@ -81,6 +85,15 @@ default:
plugin: service-coordinator-ws plugin: service-coordinator-ws
enabled: true enabled: true
config: config:
db:
driver: ${BF_DB_DRIVER}
sqlitePath: /var/lib/betterframe/betterframe.db
pgHost: ${BF_PG_HOST}
pgPort: ${BF_PG_PORT}
pgDatabase: ${BF_PG_DATABASE}
pgUser: ${BF_PG_USER}
pgPassword: ${BF_PG_PASSWORD}
pgPoolMax: ${BF_PG_POOL_MAX}
host: 0.0.0.0 host: 0.0.0.0
port: 18082 port: 18082
dataDir: /var/lib/betterframe dataDir: /var/lib/betterframe

View file

@ -89,16 +89,6 @@
"dispose": null "dispose": null
}, },
"tests": [] "tests": []
},
{
"id": "service-store",
"skip": true,
"default": {
"config": {},
"setup": null,
"dispose": null
},
"tests": []
} }
] ]
} }

View file

@ -15,14 +15,15 @@ import {
import { H3, serve } from "h3"; import { H3, serve } from "h3";
import type { Server } from "srvx"; import type { Server } from "srvx";
import { getRepo } from "../../shared/plugin-registry.js"; import { dbConfigSchema, type DbConfig } from "../../shared/db/config.js";
import { initDb } from "../../shared/db/init.js";
import type { Repository } from "../../shared/db/repository.js";
import { initSecrets, type SecretsApi } from "../../shared/secrets.js"; import { initSecrets, type SecretsApi } from "../../shared/secrets.js";
import { createAuth, type AuthApi } from "../../shared/auth.js"; import { createAuth, type AuthApi } from "../../shared/auth.js";
import { initNoderedBridge, type NoderedBridge } from "../../shared/nodered-bridge.js"; import { initNoderedBridge, type NoderedBridge } from "../../shared/nodered-bridge.js";
import { initFirmware, type FirmwareApi } from "../../shared/firmware.js"; import { initFirmware, type FirmwareApi } from "../../shared/firmware.js";
import { initOsUpdates, type OsUpdateApi } from "../../shared/os-updates.js"; import { initOsUpdates, type OsUpdateApi } from "../../shared/os-updates.js";
import { serverVersion } from "../../shared/version.js"; import { serverVersion } from "../../shared/version.js";
import type { Repository } from "../service-store/repository.js";
import { registerMiddleware } from "./middleware.js"; import { registerMiddleware } from "./middleware.js";
import { registerSetupRoutes } from "./routes-setup.js"; import { registerSetupRoutes } from "./routes-setup.js";
@ -38,6 +39,7 @@ import { registerCloudRoutes } from "./routes-cloud.js";
const ConfigSchema = av.object( const ConfigSchema = av.object(
{ {
db: dbConfigSchema,
host: av.string().default("127.0.0.1"), host: av.string().default("127.0.0.1"),
port: av.int().min(1).max(65535).default(18080), port: av.int().min(1).max(65535).default(18080),
// Secrets config (was service-secrets) // Secrets config (was service-secrets)
@ -108,11 +110,13 @@ export class Plugin extends BSBService<InstanceType<typeof Config>, typeof Event
static override EventSchemas = EventSchemas; static override EventSchemas = EventSchemas;
initBeforePlugins?: string[]; initBeforePlugins?: string[];
initAfterPlugins?: string[] = ["service-store"]; initAfterPlugins?: string[];
runBeforePlugins?: string[]; runBeforePlugins?: string[];
runAfterPlugins?: string[]; runAfterPlugins?: string[];
private server?: Server; private server?: Server;
private dbClose?: () => Promise<void>;
private purgeTimer?: ReturnType<typeof setInterval>;
private cameraHealthChecker?: { stop: () => void }; private cameraHealthChecker?: { stop: () => void };
private artifactCleanup?: { stop: () => void }; private artifactCleanup?: { stop: () => void };
@ -128,7 +132,16 @@ export class Plugin extends BSBService<InstanceType<typeof Config>, typeof Event
const cookieName = this.config.cookieName; const cookieName = this.config.cookieName;
const totpIssuer = this.config.totpIssuer; const totpIssuer = this.config.totpIssuer;
const repo = getRepo(); const dbResult = await initDb(
this.config.db as DbConfig,
{
info: (m) => obs.log.info(m as any, {}),
warn: (m) => obs.log.warn(m as any, {}),
},
);
const repo = dbResult.repo;
this.dbClose = dbResult.close;
const secrets = initSecrets( const secrets = initSecrets(
{ dataDir, systemdCredsName: this.config.systemdCredsName, systemdCredsDir: this.config.systemdCredsDir || undefined }, { dataDir, systemdCredsName: this.config.systemdCredsName, systemdCredsDir: this.config.systemdCredsDir || undefined },
{ info: (m) => obs.log.info(m as any, {}), warn: (m) => obs.log.warn(m as any, {}) }, { info: (m) => obs.log.info(m as any, {}), warn: (m) => obs.log.warn(m as any, {}) },
@ -260,9 +273,29 @@ export class Plugin extends BSBService<InstanceType<typeof Config>, typeof Event
// to set server URL + API key manually. Best-effort with retries because // to set server URL + API key manually. Best-effort with retries because
// Node-RED may still be starting. // Node-RED may still be starting.
void this.provisionNoderedBridge(repo, secrets, auth, nodered, selfUrl, obs); void this.provisionNoderedBridge(repo, secrets, auth, nodered, selfUrl, obs);
// Startup purge (inherited from old service-store)
this._repo = repo;
void this.runPurge(obs);
} }
async run(_obs: Observable): Promise<void> {} private _repo?: Repository;
private async runPurge(obs: Observable): Promise<void> {
if (!this._repo) return;
const r = this._repo;
const kl = await r.purgeKioskLogs(14);
const el = await r.purgeEventLog(30, 100_000);
const al = await r.purgeAuditLog(90);
if (kl + el + al > 0) {
obs.log.info("purge: {kl} kiosk_logs, {el} event_log, {al} audit_log", { kl, el, al });
}
}
async run(obs: Observable): Promise<void> {
// Purge every 6 hours (inherited from old service-store).
this.purgeTimer = setInterval(() => this.runPurge(obs), 6 * 60 * 60 * 1000);
}
private async provisionNoderedBridge( private async provisionNoderedBridge(
repo: Repository, repo: Repository,
@ -324,10 +357,12 @@ export class Plugin extends BSBService<InstanceType<typeof Config>, typeof Event
} }
async dispose(): Promise<void> { async dispose(): Promise<void> {
if (this.purgeTimer) clearInterval(this.purgeTimer);
this.cameraHealthChecker?.stop(); this.cameraHealthChecker?.stop();
this.artifactCleanup?.stop(); this.artifactCleanup?.stop();
if (this.server) { if (this.server) {
await this.server.close(); await this.server.close();
} }
await this.dbClose?.();
} }
} }

View file

@ -15,7 +15,9 @@ import {
import { H3, serve, readBody, getRequestHeader, getRouterParam, createError } from "h3"; import { H3, serve, readBody, getRequestHeader, getRouterParam, createError } from "h3";
import type { Server } from "srvx"; import type { Server } from "srvx";
import { getRepo } from "../../shared/plugin-registry.js"; import { dbConfigSchema, type DbConfig } from "../../shared/db/config.js";
import { initDb } from "../../shared/db/init.js";
import type { Repository } from "../../shared/db/repository.js";
import { initSecrets } from "../../shared/secrets.js"; import { initSecrets } from "../../shared/secrets.js";
import { createAuth } from "../../shared/auth.js"; import { createAuth } from "../../shared/auth.js";
import { initiatePairing, claimPairing } from "../../shared/pairing.js"; import { initiatePairing, claimPairing } from "../../shared/pairing.js";
@ -26,7 +28,6 @@ import { initOsUpdates, type OsUpdateApi } from "../../shared/os-updates.js";
import { createRateLimiter } from "../../shared/rate-limit.js"; import { createRateLimiter } from "../../shared/rate-limit.js";
import { initMqttBridge, type MqttBridge } from "../../shared/mqtt-bridge.js"; import { initMqttBridge, type MqttBridge } from "../../shared/mqtt-bridge.js";
import { createHash } from "node:crypto"; import { createHash } from "node:crypto";
import type { Repository } from "../service-store/repository.js";
import type { AuthApi } from "../../shared/auth.js"; import type { AuthApi } from "../../shared/auth.js";
import type { SecretsApi } from "../../shared/secrets.js"; import type { SecretsApi } from "../../shared/secrets.js";
import type { FirmwareChannel } from "../../shared/types.js"; import type { FirmwareChannel } from "../../shared/types.js";
@ -35,6 +36,7 @@ import type { FirmwareChannel } from "../../shared/types.js";
const ConfigSchema = av.object( const ConfigSchema = av.object(
{ {
db: dbConfigSchema,
host: av.string().default("127.0.0.1"), host: av.string().default("127.0.0.1"),
port: av.int().min(1).max(65535).default(18081), port: av.int().min(1).max(65535).default(18081),
codeTtlSeconds: av.int().min(60).max(3600).default(600), codeTtlSeconds: av.int().min(60).max(3600).default(600),
@ -84,11 +86,12 @@ export class Plugin extends BSBService<InstanceType<typeof Config>, typeof Event
static override EventSchemas = EventSchemas; static override EventSchemas = EventSchemas;
initBeforePlugins?: string[]; initBeforePlugins?: string[];
initAfterPlugins?: string[] = ["service-store"]; initAfterPlugins?: string[];
runBeforePlugins?: string[]; runBeforePlugins?: string[];
runAfterPlugins?: string[]; runAfterPlugins?: string[];
private server?: Server; private server?: Server;
private dbClose?: () => Promise<void>;
constructor(cfg: BSBServiceConstructor<InstanceType<typeof Config>, typeof EventSchemas>) { constructor(cfg: BSBServiceConstructor<InstanceType<typeof Config>, typeof EventSchemas>) {
super(cfg); super(cfg);
@ -100,7 +103,16 @@ export class Plugin extends BSBService<InstanceType<typeof Config>, typeof Event
const cookieName = this.config.cookieName; const cookieName = this.config.cookieName;
const totpIssuer = this.config.totpIssuer; const totpIssuer = this.config.totpIssuer;
const repo = getRepo(); const dbResult = await initDb(
this.config.db as DbConfig,
{
info: (m) => obs.log.info(m as any, {}),
warn: (m) => obs.log.warn(m as any, {}),
},
);
const repo = dbResult.repo;
this.dbClose = dbResult.close;
const secrets = initSecrets( const secrets = initSecrets(
{ dataDir }, { dataDir },
{ info: (m) => obs.log.info(m as any, {}), warn: (m) => obs.log.warn(m as any, {}) }, { info: (m) => obs.log.info(m as any, {}), warn: (m) => obs.log.warn(m as any, {}) },
@ -186,6 +198,7 @@ export class Plugin extends BSBService<InstanceType<typeof Config>, typeof Event
if (this.server) { if (this.server) {
await this.server.close(); await this.server.close();
} }
await this.dbClose?.();
} }
} }

View file

@ -23,7 +23,8 @@ import { createServer, type IncomingMessage, type Server as HttpServer } from "n
import { randomUUID } from "node:crypto"; import { randomUUID } from "node:crypto";
import { WebSocketServer, WebSocket } from "ws"; import { WebSocketServer, WebSocket } from "ws";
import { getRepo } from "../../shared/plugin-registry.js"; import { dbConfigSchema, type DbConfig } from "../../shared/db/config.js";
import { initDb } from "../../shared/db/init.js";
import { initSecrets } from "../../shared/secrets.js"; import { initSecrets } from "../../shared/secrets.js";
import { createAuth } from "../../shared/auth.js"; import { createAuth } from "../../shared/auth.js";
import { setCoordinator } from "../../shared/coordinator-registry.js"; import { setCoordinator } from "../../shared/coordinator-registry.js";
@ -33,6 +34,7 @@ import { initNoderedBridge, type NoderedBridge } from "../../shared/nodered-brid
const ConfigSchema = av.object( const ConfigSchema = av.object(
{ {
db: dbConfigSchema,
host: av.string().default("127.0.0.1"), host: av.string().default("127.0.0.1"),
port: av.int().min(1).max(65535).default(18082), port: av.int().min(1).max(65535).default(18082),
noderedUrl: av.string().minLength(1).default("http://127.0.0.1:1880"), noderedUrl: av.string().minLength(1).default("http://127.0.0.1:1880"),
@ -185,7 +187,7 @@ export class Plugin extends BSBService<InstanceType<typeof Config>, typeof Event
static override EventSchemas = EventSchemas; static override EventSchemas = EventSchemas;
initBeforePlugins?: string[]; initBeforePlugins?: string[];
initAfterPlugins?: string[] = ["service-store"]; initAfterPlugins?: string[];
runBeforePlugins?: string[]; runBeforePlugins?: string[];
runAfterPlugins?: string[]; runAfterPlugins?: string[];
@ -193,6 +195,7 @@ export class Plugin extends BSBService<InstanceType<typeof Config>, typeof Event
private wss?: WebSocketServer; private wss?: WebSocketServer;
private pingInterval?: ReturnType<typeof setInterval>; private pingInterval?: ReturnType<typeof setInterval>;
private nodered?: NoderedBridge; private nodered?: NoderedBridge;
private dbClose?: () => Promise<void>;
constructor(cfg: BSBServiceConstructor<InstanceType<typeof Config>, typeof EventSchemas>) { constructor(cfg: BSBServiceConstructor<InstanceType<typeof Config>, typeof EventSchemas>) {
super(cfg); super(cfg);
@ -204,7 +207,16 @@ export class Plugin extends BSBService<InstanceType<typeof Config>, typeof Event
const cookieName = this.config.cookieName; const cookieName = this.config.cookieName;
const totpIssuer = this.config.totpIssuer; const totpIssuer = this.config.totpIssuer;
const repo = getRepo(); const dbResult = await initDb(
this.config.db as DbConfig,
{
info: (m) => obs.log.info(m as any, {}),
warn: (m) => obs.log.warn(m as any, {}),
},
);
const repo = dbResult.repo;
this.dbClose = dbResult.close;
const secrets = initSecrets( const secrets = initSecrets(
{ dataDir }, { dataDir },
{ info: (m) => obs.log.info(m as any, {}), warn: (m) => obs.log.warn(m as any, {}) }, { info: (m) => obs.log.info(m as any, {}), warn: (m) => obs.log.warn(m as any, {}) },
@ -457,7 +469,7 @@ export class Plugin extends BSBService<InstanceType<typeof Config>, typeof Event
try { k.ws.close(); } catch { /* ignore */ } try { k.ws.close(); } catch { /* ignore */ }
} }
connectedKiosks.clear(); connectedKiosks.clear();
return new Promise((resolve) => { await new Promise<void>((resolve) => {
if (this.wss) this.wss.close(); if (this.wss) this.wss.close();
if (this.httpServer) { if (this.httpServer) {
this.httpServer.close(() => resolve()); this.httpServer.close(() => resolve());
@ -465,5 +477,6 @@ export class Plugin extends BSBService<InstanceType<typeof Config>, typeof Event
resolve(); resolve();
} }
}); });
await this.dbClose?.();
} }
} }

View file

@ -1,265 +0,0 @@
/**
* service-store the only service that opens the sqlite database.
*
* Architecture choice (v0.1):
* For now, other services hold a typed reference to this plugin's
* `Repository` instance via constructor injection (BSB plugin clients).
* We expose the high-level data API as plain methods rather than wiring
* every CRUD operation as a typed BSB event.
*
* Reason: 60+ tables × 4 operations × 2 (input + output) anyvali schemas
* would be ~2000 lines of declarative bus plumbing. The event bus pays off
* when calls cross processes; in v0.1 everything is single-process.
*
* When we scale `service-coordinator-ws` to multiple instances (one per N
* kiosks), we'll graduate the hot-path operations (bundle lookup, label
* filter) to typed returnable events and keep the rest as direct calls.
*
* To-then: emit a domain-event broadcast on every write so listeners
* (e.g. coordinator-ws notifying kiosks of bundle changes) can react.
*/
import { DatabaseSync, type StatementSync } from "node:sqlite";
import { dirname } from "node:path";
import { mkdirSync } from "node:fs";
import * as av from "@anyvali/js";
import {
BSBService,
type BSBServiceConstructor,
createConfigSchema,
createEventSchemas,
createBroadcastEvent,
type Observable,
} from "@bsb/base";
import { MIGRATIONS } from "./migrations.js";
import { Repository } from "./repository.js";
import { registerRepo } from "../../shared/plugin-registry.js";
// ---- Config -----------------------------------------------------------------
const ConfigSchema = av.object(
{
/** Backend selector: "sqlite" or "postgres". */
driver: av.enum_(["sqlite", "postgres"] as const).default("sqlite"),
/** sqlite-only: filesystem path to the .db file. */
sqlitePath: av.string().minLength(1).default("/var/lib/betterframe/betterframe.db"),
/** postgres: full libpq URL. Overrides individual pg* fields if non-empty. */
pgUrl: av.string().default(""),
/** postgres: host. */
pgHost: av.string().default("postgres"),
/** postgres: port. */
pgPort: av.int().min(1).max(65535).default(5432),
/** postgres: database name. */
pgDatabase: av.string().default("betterframe"),
/** postgres: username. */
pgUser: av.string().default("betterframe"),
/** postgres: password. */
pgPassword: av.string().default("betterframe"),
/** postgres: connection pool max size. */
pgPoolMax: av.int().min(1).max(1000).default(10),
},
{ unknownKeys: "strip" },
);
export const Config = createConfigSchema(
{
name: "service-store",
description:
"BetterFrame canonical SQLite store. The single writer in the system; " +
"all other services read/write through this plugin.",
tags: ["service", "store", "sqlite"],
},
ConfigSchema,
);
// ---- Event schemas ----------------------------------------------------------
const broadcastDomainChange = av.object(
{
table: av.string(),
op: av.enum_(["create", "update", "delete"] as const),
id: av.optional(av.union([av.string(), av.int()])),
},
{ unknownKeys: "reject" },
);
export const EventSchemas = createEventSchemas({
emitEvents: {},
onEvents: {},
emitReturnableEvents: {},
onReturnableEvents: {},
emitBroadcast: {
"store.changed": createBroadcastEvent(broadcastDomainChange, "Domain row changed"),
},
onBroadcast: {},
});
// ---- Plugin -----------------------------------------------------------------
export class Plugin extends BSBService<InstanceType<typeof Config>, typeof EventSchemas> {
static override Config = Config;
static override EventSchemas = EventSchemas;
initBeforePlugins?: string[];
initAfterPlugins?: string[];
runBeforePlugins?: string[];
runAfterPlugins?: string[];
private db?: DatabaseSync;
private _repo?: Repository;
private purgeTimer?: ReturnType<typeof setInterval>;
constructor(cfg: BSBServiceConstructor<InstanceType<typeof Config>, typeof EventSchemas>) {
super(cfg);
}
async init(obs: Observable): Promise<void> {
const driver = this.config.driver;
if (driver === "postgres") {
let pgUrl = this.config.pgUrl ?? "";
if (!pgUrl) {
const u = encodeURIComponent(this.config.pgUser);
const p = encodeURIComponent(this.config.pgPassword);
pgUrl = `postgres://${u}:${p}@${this.config.pgHost}:${this.config.pgPort}/${this.config.pgDatabase}`;
}
obs.log.info("connecting to postgres at {url}", { url: pgUrl.replace(/:[^:@]+@/, ":***@") });
const { PgAdapter } = await import("./pg-adapter.js");
const adapter = new PgAdapter(pgUrl, this.config.pgPoolMax);
// Run PG migrations. Track version in schema_migrations table.
const { TENANT_MIGRATIONS } = await import("./migrations-pg.js");
const versionRow = await adapter.get<{ version: number }>(
`SELECT COALESCE(MAX(version), 0) AS version FROM schema_migrations WHERE schema_name = 'public'`,
).catch(() => undefined);
const currentVersion = versionRow?.version ?? 0;
if (currentVersion < TENANT_MIGRATIONS.length) {
obs.log.info("running PG migrations from {from} to {to}", {
from: currentVersion,
to: TENANT_MIGRATIONS.length,
});
// Ensure schema_migrations exists (bootstrap).
await adapter.exec(`CREATE TABLE IF NOT EXISTS schema_migrations (
schema_name TEXT NOT NULL, version INTEGER NOT NULL,
applied_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (schema_name, version)
)`);
for (let i = currentVersion; i < TENANT_MIGRATIONS.length; i++) {
await adapter.exec(TENANT_MIGRATIONS[i]!);
await adapter.run(
`INSERT INTO schema_migrations (schema_name, version) VALUES ('public', ?)`,
[i + 1],
);
}
} else {
obs.log.info("PG schema up to date (version {v})", { v: currentVersion });
}
this._repo = new Repository(adapter, async (table, op, id) => {
try {
await this.events.emitBroadcast("store.changed", obs, { table, op, id });
} catch (err) {
obs.log.warn("broadcast store.changed failed: {err}", {
err: (err as Error).message,
});
}
});
} else {
// SQLite path (default).
const path = this.config.sqlitePath;
obs.log.info("opening sqlite at {path}", { path });
try {
mkdirSync(dirname(path), { recursive: true });
} catch (err) {
obs.log.warn("mkdir failed for {dir}: {err}", {
dir: dirname(path),
err: (err as Error).message,
});
}
this.db = new DatabaseSync(path);
this.db.exec("PRAGMA journal_mode = WAL");
this.db.exec("PRAGMA synchronous = NORMAL");
this.db.exec("PRAGMA foreign_keys = ON");
this.db.exec("PRAGMA busy_timeout = 10000");
const row = this.db.prepare("PRAGMA user_version").get() as { user_version: number };
const currentVersion = row.user_version;
const targetVersion = MIGRATIONS.length;
if (currentVersion < targetVersion) {
obs.log.info("running migrations from {from} to {to}", {
from: currentVersion,
to: targetVersion,
});
for (let i = currentVersion; i < targetVersion; i++) {
const entry = MIGRATIONS[i];
if (typeof entry === "string") {
this.db.exec(entry);
} else if (typeof entry === "function") {
entry(this.db);
}
}
this.db.exec(`PRAGMA user_version = ${targetVersion}`);
} else {
obs.log.info("schema up to date (version {v})", { v: currentVersion });
}
const { SqliteAdapter } = await import("./sqlite-adapter.js");
const adapter = SqliteAdapter.fromExisting(this.db);
this._repo = new Repository(adapter, async (table, op, id) => {
try {
await this.events.emitBroadcast("store.changed", obs, { table, op, id });
} catch (err) {
obs.log.warn("broadcast store.changed failed: {err}", {
err: (err as Error).message,
});
}
});
}
registerRepo(this._repo);
// Startup purge
this.runPurge(obs);
obs.log.info("store ready");
}
private async runPurge(obs: Observable): Promise<void> {
if (!this._repo) return;
const r = this._repo;
const kl = await r.purgeKioskLogs(14);
const el = await r.purgeEventLog(30, 100_000);
const al = await r.purgeAuditLog(90);
if (kl + el + al > 0) {
obs.log.info("purge: {kl} kiosk_logs, {el} event_log, {al} audit_log", { kl, el, al });
}
}
async run(obs: Observable): Promise<void> {
// Purge every 6 hours.
this.purgeTimer = setInterval(() => this.runPurge(obs), 6 * 60 * 60 * 1000);
}
async dispose(): Promise<void> {
if (this.purgeTimer) clearInterval(this.purgeTimer);
this.db?.close();
}
/**
* Public accessor for sibling services. Throws before init() completes
* services that need the repo should declare their initAfterPlugins
* dependency on `service-store`.
*/
get repo(): Repository {
if (!this._repo) {
throw new Error("service-store: repository accessed before init()");
}
return this._repo;
}
}

View file

@ -12,7 +12,7 @@
* trying to delete the same file (ENOENT is swallowed). * trying to delete the same file (ENOENT is swallowed).
*/ */
import { unlink } from "node:fs/promises"; import { unlink } from "node:fs/promises";
import type { Repository } from "../plugins/service-store/repository.js"; import type { Repository } from "./db/repository.js";
interface CleanupLog { interface CleanupLog {
info(msg: string): void; info(msg: string): void;

View file

@ -7,7 +7,7 @@
* Pulls actor + ip out of the h3 event context. Never throws logging * Pulls actor + ip out of the h3 event context. Never throws logging
* failure must not break the caller's request. * failure must not break the caller's request.
*/ */
import type { Repository } from "../plugins/service-store/repository.js"; import type { Repository } from "./db/repository.js";
import type { AuditActorType, AuditResult } from "./types.js"; import type { AuditActorType, AuditResult } from "./types.js";
interface AuditCtx { interface AuditCtx {

View file

@ -7,7 +7,7 @@ import { createHmac, randomBytes, timingSafeEqual } from "node:crypto";
import argon2 from "argon2"; import argon2 from "argon2";
import { TOTP, Secret } from "otpauth"; import { TOTP, Secret } from "otpauth";
import type { Repository } from "../plugins/service-store/repository.js"; import type { Repository } from "./db/repository.js";
import type { SecretsApi } from "./secrets.js"; import type { SecretsApi } from "./secrets.js";
import type { ApiKey, ApiKeyScope, Session, User } from "./types.js"; import type { ApiKey, ApiKeyScope, Session, User } from "./types.js";

View file

@ -5,7 +5,7 @@
* No label filtering for v0.1. * No label filtering for v0.1.
*/ */
import { createHash } from "node:crypto"; import { createHash } from "node:crypto";
import type { Repository } from "../plugins/service-store/repository.js"; import type { Repository } from "./db/repository.js";
import type { SecretsApi } from "./secrets.js"; import type { SecretsApi } from "./secrets.js";
export interface BundleCamera { export interface BundleCamera {

View file

@ -10,7 +10,7 @@
* OPTIONS, check for 200. No actual stream decode just reachability. * OPTIONS, check for 200. No actual stream decode just reachability.
*/ */
import { createConnection, type Socket } from "node:net"; import { createConnection, type Socket } from "node:net";
import type { Repository } from "../plugins/service-store/repository.js"; import type { Repository } from "./db/repository.js";
export interface CameraHealthConfig { export interface CameraHealthConfig {
intervalMs: number; intervalMs: number;

View file

@ -0,0 +1,28 @@
import * as av from "@anyvali/js";
export const dbConfigSchema = av.object(
{
driver: av.enum_(["sqlite", "postgres"] as const).default("postgres"),
sqlitePath: av.string().minLength(1).default("/var/lib/betterframe/betterframe.db"),
pgUrl: av.string().default(""),
pgHost: av.string().default("postgres"),
pgPort: av.int().min(1).max(65535).default(5432),
pgDatabase: av.string().default("betterframe"),
pgUser: av.string().default("betterframe"),
pgPassword: av.string().default("betterframe"),
pgPoolMax: av.int().min(1).max(1000).default(10),
},
{ unknownKeys: "strip" },
);
export type DbConfig = {
driver: "sqlite" | "postgres";
sqlitePath: string;
pgUrl: string;
pgHost: string;
pgPort: number;
pgDatabase: string;
pgUser: string;
pgPassword: string;
pgPoolMax: number;
};

View file

@ -0,0 +1,115 @@
/**
* initDb initialize the database from config (shared module).
*
* Replaces the init logic that was in service-store/index.ts.
* Each service plugin calls this independently with its own config.
*/
import { DatabaseSync } from "node:sqlite";
import { dirname } from "node:path";
import { mkdirSync } from "node:fs";
import { MIGRATIONS } from "./migrations.js";
import { Repository } from "./repository.js";
import type { DbConfig } from "./config.js";
interface DbLog {
info(msg: string): void;
warn(msg: string): void;
}
export async function initDb(
config: DbConfig,
log: DbLog,
notifyFn?: (table: string, op: string, id?: string | number) => void,
): Promise<{ repo: Repository; close: () => Promise<void> }> {
const driver = config.driver;
const notify = notifyFn ?? (() => {});
if (driver === "postgres") {
let pgUrl = config.pgUrl ?? "";
if (!pgUrl) {
const u = encodeURIComponent(config.pgUser);
const p = encodeURIComponent(config.pgPassword);
pgUrl = `postgres://${u}:${p}@${config.pgHost}:${config.pgPort}/${config.pgDatabase}`;
}
log.info(`connecting to postgres at ${pgUrl.replace(/:[^:@]+@/, ":***@")}`);
const { PgAdapter } = await import("./pg-adapter.js");
const adapter = new PgAdapter(pgUrl, config.pgPoolMax);
// Run PG migrations. Track version in schema_migrations table.
const { TENANT_MIGRATIONS } = await import("./migrations-pg.js");
const versionRow = await adapter.get<{ version: number }>(
`SELECT COALESCE(MAX(version), 0) AS version FROM schema_migrations WHERE schema_name = 'public'`,
).catch(() => undefined);
const currentVersion = versionRow?.version ?? 0;
if (currentVersion < TENANT_MIGRATIONS.length) {
log.info(`running PG migrations from ${currentVersion} to ${TENANT_MIGRATIONS.length}`);
// Ensure schema_migrations exists (bootstrap).
await adapter.exec(`CREATE TABLE IF NOT EXISTS schema_migrations (
schema_name TEXT NOT NULL, version INTEGER NOT NULL,
applied_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (schema_name, version)
)`);
for (let i = currentVersion; i < TENANT_MIGRATIONS.length; i++) {
await adapter.exec(TENANT_MIGRATIONS[i]!);
await adapter.run(
`INSERT INTO schema_migrations (schema_name, version) VALUES ('public', ?)`,
[i + 1],
);
}
} else {
log.info(`PG schema up to date (version ${currentVersion})`);
}
const repo = new Repository(adapter, async (table, op, id) => {
notify(table, op, id);
});
return { repo, close: () => adapter.close() };
}
// SQLite path (default).
const path = config.sqlitePath;
log.info(`opening sqlite at ${path}`);
try {
mkdirSync(dirname(path), { recursive: true });
} catch (err) {
log.warn(`mkdir failed for ${dirname(path)}: ${(err as Error).message}`);
}
const db = new DatabaseSync(path);
db.exec("PRAGMA journal_mode = WAL");
db.exec("PRAGMA synchronous = NORMAL");
db.exec("PRAGMA foreign_keys = ON");
db.exec("PRAGMA busy_timeout = 10000");
const row = db.prepare("PRAGMA user_version").get() as { user_version: number };
const currentVersion = row.user_version;
const targetVersion = MIGRATIONS.length;
if (currentVersion < targetVersion) {
log.info(`running migrations from ${currentVersion} to ${targetVersion}`);
for (let i = currentVersion; i < targetVersion; i++) {
const entry = MIGRATIONS[i];
if (typeof entry === "string") {
db.exec(entry);
} else if (typeof entry === "function") {
entry(db);
}
}
db.exec(`PRAGMA user_version = ${targetVersion}`);
} else {
log.info(`schema up to date (version ${currentVersion})`);
}
const { SqliteAdapter } = await import("./sqlite-adapter.js");
const adapter = SqliteAdapter.fromExisting(db);
const repo = new Repository(adapter, async (table, op, id) => {
notify(table, op, id);
});
return { repo, close: () => adapter.close() };
}

View file

@ -54,7 +54,7 @@ import type {
StreamSelector, StreamSelector,
User, User,
UserRole, UserRole,
} from "../../shared/types.js"; } from "../types.js";
import { b, j } from "./util.js"; import { b, j } from "./util.js";
type Row = Record<string, unknown>; type Row = Record<string, unknown>;

View file

@ -55,7 +55,7 @@ import type {
StreamRole, StreamRole,
User, User,
UserRole, UserRole,
} from "../../shared/types.js"; } from "../types.js";
import { import {
rowToApiKey, rowToApiKey,
rowToAuditEntry, rowToAuditEntry,

View file

@ -7,7 +7,7 @@
* 3. Admin enters code in UI confirmPairing creates kiosk + kiosk_key * 3. Admin enters code in UI confirmPairing creates kiosk + kiosk_key
*/ */
import { randomBytes } from "node:crypto"; import { randomBytes } from "node:crypto";
import type { Repository } from "../plugins/service-store/repository.js"; import type { Repository } from "./db/repository.js";
import type { AuthApi } from "./auth.js"; import type { AuthApi } from "./auth.js";
import type { SecretsApi } from "./secrets.js"; import type { SecretsApi } from "./secrets.js";
import type { PairingCode } from "./types.js"; import type { PairingCode } from "./types.js";

View file

@ -1,19 +0,0 @@
/**
* Module-level store registry the one cross-plugin reference needed.
*
* service-store registers its repo in init(). Downstream plugins
* (admin-http, api-http, coordinator-ws) look it up in their init().
* initAfterPlugins guarantees ordering.
*/
import type { Repository } from "../plugins/service-store/repository.js";
let _repo: Repository | undefined;
export function registerRepo(repo: Repository): void {
_repo = repo;
}
export function getRepo(): Repository {
if (!_repo) throw new Error("plugin-registry: store repo not registered (init order bug?)");
return _repo;
}