From cce9b51887834e0be7f43e53990d05b39e2f59ab Mon Sep 17 00:00:00 2001 From: Mitchell R Date: Tue, 26 May 2026 02:38:43 +0200 Subject: [PATCH] feat(events): add persistent ONVIF event topic subscriptions with status tracking Add camera_event_subscriptions table to track per-camera per-topic subscription state (inactive/pending/active/failed). Refresh-events handler now merges discovered topics instead of replacing, so topics are never lost when a camera goes temporarily offline. Admin UI shows colored status dots and last-event timestamps per topic, with a "subscribe all inactive" button to queue subscriptions for kiosk pickup. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../service-admin-http/routes-admin.ts | 27 +++++- server/src/shared/db/mappers.ts | 15 +++ server/src/shared/db/migrations-pg.ts | 14 +++ server/src/shared/db/migrations.ts | 14 +++ server/src/shared/db/repository.ts | 65 +++++++++++++ server/src/shared/types.ts | 13 +++ server/src/web-templates/admin-pages.tsx | 93 ++++++++++++++----- 7 files changed, 214 insertions(+), 27 deletions(-) diff --git a/server/src/plugins/service-admin-http/routes-admin.ts b/server/src/plugins/service-admin-http/routes-admin.ts index c2b6bcf..498afbe 100644 --- a/server/src/plugins/service-admin-http/routes-admin.ts +++ b/server/src/plugins/service-admin-http/routes-admin.ts @@ -1397,6 +1397,8 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void { }); } + const eventSubscriptions = await deps.repo.listEventSubscriptions(id); + return htmlPage(CameraEditPage({ user: user.username, camera, @@ -1404,6 +1406,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void { allLabels: await deps.repo.listLabels(), streams: await deps.repo.listCameraStreams(id), subscriptions, + eventSubscriptions, })); }); @@ -1516,6 +1519,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void { }); // Refresh supported ONVIF event topics from the camera. + // MERGE: new topics are added to the existing list, never removed. app.post("/admin/cameras/:id/refresh-events", async (event) => { const id = Number(getRouterParam(event, "id")); const cam = await deps.repo.getCameraById(id); @@ -1538,20 +1542,39 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void { ? kioskOnvifSoapTransport(Number(runner.slice("kiosk:".length))) : undefined; try { - const topics = await onvifGetEventProperties({ + const discoveredTopics = await onvifGetEventProperties({ host: cam.onvif_host, port: cam.onvif_port ?? 80, username: cam.onvif_username ?? "", password: cam.onvif_password ?? "", soapTransport, }); - await deps.repo.updateCamera(id, { supported_event_topics: JSON.stringify(topics) } as any); + // Merge: keep existing topics, add new ones — never remove + const existingSet = new Set(cam.supported_event_topics); + for (const t of discoveredTopics) existingSet.add(t); + const merged = [...existingSet].sort(); + await deps.repo.updateCamera(id, { supported_event_topics: JSON.stringify(merged) } as any); + // Upsert subscription rows for each discovered topic (additive only) + for (const topic of discoveredTopics) { + await deps.repo.upsertEventSubscription({ camera_id: id, topic, status: "inactive" }); + } } catch { // Camera offline or events not supported — leave existing topics. } return new Response(null, { status: 302, headers: { location: `/admin/cameras/${id}` } }); }); + // Subscribe to all inactive event topics for this camera. + app.post("/admin/cameras/:id/subscribe-events", async (event) => { + const id = Number(getRouterParam(event, "id")); + const cam = await deps.repo.getCameraById(id); + if (!cam) { + return new Response(null, { status: 302, headers: { location: `/admin/cameras/${id}` } }); + } + await deps.repo.setAllEventSubscriptionsStatus(id, "inactive", "pending"); + return new Response(null, { status: 302, headers: { location: `/admin/cameras/${id}` } }); + }); + app.post("/admin/cameras/:id/delete", async (event) => { event.context.obs?.log.info("camera delete {id} by {user}", { id: getRouterParam(event, "id") ?? "?", user: event.context.user?.username ?? "unknown" }); const id = Number(getRouterParam(event, "id")); diff --git a/server/src/shared/db/mappers.ts b/server/src/shared/db/mappers.ts index bfae32a..2567156 100644 --- a/server/src/shared/db/mappers.ts +++ b/server/src/shared/db/mappers.ts @@ -12,6 +12,7 @@ import type { AuditEntry, AuditResult, Camera, + CameraEventSubscription, CloudAccount, CloudVendor, CameraStream, @@ -24,6 +25,7 @@ import type { EntityType, EventLog, EventSourceType, + EventSubscriptionStatus, FirmwareChannel, FirmwareRelease, FirmwareRollout, @@ -472,3 +474,16 @@ export function rowToCloudAccount(r: Row): CloudAccount { created_at: s(r["created_at"]), }; } + +export function rowToCameraEventSubscription(r: Row): CameraEventSubscription { + return { + id: n(r["id"]), + camera_id: n(r["camera_id"]), + topic: s(r["topic"]), + status: s(r["status"]) as EventSubscriptionStatus, + subscribed_by_kiosk_id: nn(r["subscribed_by_kiosk_id"]), + last_event_at: sn(r["last_event_at"]), + error_message: sn(r["error_message"]), + created_at: s(r["created_at"]), + }; +} diff --git a/server/src/shared/db/migrations-pg.ts b/server/src/shared/db/migrations-pg.ts index 1cf5a25..3bae9df 100644 --- a/server/src/shared/db/migrations-pg.ts +++ b/server/src/shared/db/migrations-pg.ts @@ -465,4 +465,18 @@ export const TENANT_MIGRATIONS: readonly string[] = [ created_at TIMESTAMPTZ NOT NULL DEFAULT now() )`, `CREATE INDEX IF NOT EXISTS idx_cloud_accounts_vendor ON cloud_accounts(vendor)`, + + // ---- camera_event_subscriptions --------------------------------------------- + `CREATE TABLE IF NOT EXISTS camera_event_subscriptions ( + id SERIAL PRIMARY KEY, + camera_id INTEGER NOT NULL REFERENCES cameras(id) ON DELETE CASCADE, + topic TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'inactive' CHECK(status IN ('inactive', 'pending', 'active', 'failed')), + subscribed_by_kiosk_id INTEGER REFERENCES kiosks(id) ON DELETE SET NULL, + last_event_at TIMESTAMPTZ, + error_message TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE(camera_id, topic) + )`, + `CREATE INDEX IF NOT EXISTS idx_camera_event_subs_camera ON camera_event_subscriptions(camera_id)`, ]; diff --git a/server/src/shared/db/migrations.ts b/server/src/shared/db/migrations.ts index 22d75da..a7c96d6 100644 --- a/server/src/shared/db/migrations.ts +++ b/server/src/shared/db/migrations.ts @@ -1074,4 +1074,18 @@ export const MIGRATIONS: readonly MigrationEntry[] = [ }, `CREATE INDEX IF NOT EXISTS idx_cameras_cloud_account ON cameras(cloud_account_id)`, `CREATE INDEX IF NOT EXISTS idx_cameras_cloud_vendor ON cameras(cloud_account_id, cloud_vendor_camera_id)`, + + // ---- camera_event_subscriptions: per-camera per-topic subscription state --- + `CREATE TABLE IF NOT EXISTS camera_event_subscriptions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + camera_id INTEGER NOT NULL REFERENCES cameras(id) ON DELETE CASCADE, + topic TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'inactive' CHECK(status IN ('inactive', 'pending', 'active', 'failed')), + subscribed_by_kiosk_id INTEGER REFERENCES kiosks(id) ON DELETE SET NULL, + last_event_at TEXT, + error_message TEXT, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), + UNIQUE(camera_id, topic) + ) STRICT`, + `CREATE INDEX IF NOT EXISTS idx_camera_event_subs_camera ON camera_event_subscriptions(camera_id)`, ]; diff --git a/server/src/shared/db/repository.ts b/server/src/shared/db/repository.ts index 32a5585..a92a245 100644 --- a/server/src/shared/db/repository.ts +++ b/server/src/shared/db/repository.ts @@ -19,6 +19,7 @@ import type { AuditEntry, AuditResult, Camera, + CameraEventSubscription, CameraStream, CameraType, CloudAccount, @@ -28,6 +29,7 @@ import type { EventLog, EventQueryFilters, EventSourceType, + EventSubscriptionStatus, FirmwareChannel, FirmwareRelease, FirmwareRollout, @@ -61,6 +63,7 @@ import { rowToApiKey, rowToAuditEntry, rowToCamera, + rowToCameraEventSubscription, rowToCloudAccount, rowToCameraStream, rowToDisplay, @@ -2411,6 +2414,68 @@ export class Repository { void this.notify("labels", "update", id); } + // =========================================================================== + // camera_event_subscriptions + // =========================================================================== + + async listEventSubscriptions(cameraId: number): Promise { + const rs = await this._all( + "SELECT * FROM camera_event_subscriptions WHERE camera_id = ? ORDER BY topic", + [cameraId], + ); + return rs.map((r) => rowToCameraEventSubscription(r as Record)); + } + + async upsertEventSubscription(input: { + camera_id: number; + topic: string; + status?: EventSubscriptionStatus; + }): Promise { + const status = input.status ?? "inactive"; + await this._run( + `INSERT INTO camera_event_subscriptions (camera_id, topic, status) + VALUES (?, ?, ?) + ON CONFLICT (camera_id, topic) DO UPDATE SET status = COALESCE(NULLIF(?, ''), camera_event_subscriptions.status)`, + [input.camera_id, input.topic, status, status], + ); + } + + async updateEventSubscriptionStatus( + cameraId: number, + topic: string, + status: EventSubscriptionStatus, + error?: string | null, + ): Promise { + await this._run( + `UPDATE camera_event_subscriptions + SET status = ?, error_message = ? + WHERE camera_id = ? AND topic = ?`, + [status, error ?? null, cameraId, topic], + ); + } + + async markEventReceived(cameraId: number, topic: string): Promise { + await this._run( + `UPDATE camera_event_subscriptions + SET last_event_at = ?, status = 'active' + WHERE camera_id = ? AND topic = ?`, + [isoNow(), cameraId, topic], + ); + } + + async setAllEventSubscriptionsStatus( + cameraId: number, + fromStatus: EventSubscriptionStatus, + toStatus: EventSubscriptionStatus, + ): Promise { + await this._run( + `UPDATE camera_event_subscriptions + SET status = ? + WHERE camera_id = ? AND status = ?`, + [toStatus, cameraId, fromStatus], + ); + } + // =========================================================================== // cloud_accounts // =========================================================================== diff --git a/server/src/shared/types.ts b/server/src/shared/types.ts index 7cf04fe..b00b7a8 100644 --- a/server/src/shared/types.ts +++ b/server/src/shared/types.ts @@ -405,6 +405,19 @@ export interface EventLog { forwarded_to_nodered: boolean; } +export type EventSubscriptionStatus = "inactive" | "pending" | "active" | "failed"; + +export interface CameraEventSubscription { + id: number; + camera_id: number; + topic: string; + status: EventSubscriptionStatus; + subscribed_by_kiosk_id: number | null; + last_event_at: string | null; + error_message: string | null; + created_at: string; +} + export interface EventQueryFilters { topic?: string; kiosk_id?: number; diff --git a/server/src/web-templates/admin-pages.tsx b/server/src/web-templates/admin-pages.tsx index 228f91f..e170d57 100644 --- a/server/src/web-templates/admin-pages.tsx +++ b/server/src/web-templates/admin-pages.tsx @@ -6,6 +6,7 @@ import { Layout } from "./layout.js"; import type { AuditEntry, Camera, + CameraEventSubscription, Display, Entity, FirmwareRelease, @@ -1168,6 +1169,7 @@ interface CameraEditProps { allLabels: Label[]; streams: Array<{ id: number; role: string; name: string; rtsp_uri: string }>; subscriptions: CameraSubscription[]; + eventSubscriptions?: CameraEventSubscription[]; error?: string; success?: string; } @@ -1370,32 +1372,73 @@ export function CameraEditPage(props: CameraEditProps) { - {cam.type === "onvif" && ( -
-

Supported Event Topics

-

- Topics this camera advertises via GetEventProperties. Click refresh - to re-query the camera (via the designated event source). -

-
- -
- {cam.supported_event_topics.length > 0 ? ( -
- - - - {cam.supported_event_topics.map((t) => ( - - ))} - -
Topic
{t}
+ {cam.type === "onvif" && (() => { + const subs = props.eventSubscriptions ?? []; + const subMap = new Map(subs.map((s) => [s.topic, s])); + // Merge: show all topics from subscriptions table + any in supported_event_topics not yet in DB + const allTopics = new Set([...subs.map((s) => s.topic), ...cam.supported_event_topics]); + const sortedTopics = [...allTopics].sort(); + const hasInactive = subs.some((s) => s.status === "inactive"); + return ( +
+

Event Topics & Subscriptions

+

+ Topics this camera advertises via GetEventProperties. Click refresh + to re-query the camera (via the designated event source). New topics + are merged into the list and never removed. +

+
+
+ +
+ {hasInactive && ( +
+ +
+ )}
- ) : ( -

No topics discovered yet. Click refresh above.

- )} -
- )} + {sortedTopics.length > 0 ? ( +
+ + + + {[...sortedTopics].map((t) => { + const sub = subMap.get(t); + const status = sub?.status ?? "inactive"; + const dotColor = + status === "active" ? "#22c55e" : + status === "pending" ? "#f59e0b" : + status === "failed" ? "#ef4444" : + "#9ca3af"; + const dotTitle = + status === "active" ? "Active" : + status === "pending" ? "Pending" : + status === "failed" ? "Failed" : + "Inactive"; + return ( + + + + + + + ); + })} + +
StatusTopicLast EventError
+ + {t} + {sub?.last_event_at ? formatTime(sub.last_event_at) : "—"} + + {sub?.error_message ?? ""} +
+
+ ) : ( +

No topics discovered yet. Click refresh above.

+ )} +
+ ); + })()}

Labels