feat(events): add persistent ONVIF event topic subscriptions with status tracking

Add camera_event_subscriptions table to track per-camera per-topic
subscription state (inactive/pending/active/failed). Refresh-events
handler now merges discovered topics instead of replacing, so topics
are never lost when a camera goes temporarily offline. Admin UI shows
colored status dots and last-event timestamps per topic, with a
"subscribe all inactive" button to queue subscriptions for kiosk pickup.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mitchell R 2026-05-26 02:38:43 +02:00
parent aa068a32f1
commit cce9b51887
No known key found for this signature in database
7 changed files with 214 additions and 27 deletions

View file

@ -1397,6 +1397,8 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void {
});
}
const eventSubscriptions = await deps.repo.listEventSubscriptions(id);
return htmlPage(CameraEditPage({
user: user.username,
camera,
@ -1404,6 +1406,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void {
allLabels: await deps.repo.listLabels(),
streams: await deps.repo.listCameraStreams(id),
subscriptions,
eventSubscriptions,
}));
});
@ -1516,6 +1519,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void {
});
// Refresh supported ONVIF event topics from the camera.
// MERGE: new topics are added to the existing list, never removed.
app.post("/admin/cameras/:id/refresh-events", async (event) => {
const id = Number(getRouterParam(event, "id"));
const cam = await deps.repo.getCameraById(id);
@ -1538,20 +1542,39 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void {
? kioskOnvifSoapTransport(Number(runner.slice("kiosk:".length)))
: undefined;
try {
const topics = await onvifGetEventProperties({
const discoveredTopics = await onvifGetEventProperties({
host: cam.onvif_host,
port: cam.onvif_port ?? 80,
username: cam.onvif_username ?? "",
password: cam.onvif_password ?? "",
soapTransport,
});
await deps.repo.updateCamera(id, { supported_event_topics: JSON.stringify(topics) } as any);
// Merge: keep existing topics, add new ones — never remove
const existingSet = new Set(cam.supported_event_topics);
for (const t of discoveredTopics) existingSet.add(t);
const merged = [...existingSet].sort();
await deps.repo.updateCamera(id, { supported_event_topics: JSON.stringify(merged) } as any);
// Upsert subscription rows for each discovered topic (additive only)
for (const topic of discoveredTopics) {
await deps.repo.upsertEventSubscription({ camera_id: id, topic, status: "inactive" });
}
} catch {
// Camera offline or events not supported — leave existing topics.
}
return new Response(null, { status: 302, headers: { location: `/admin/cameras/${id}` } });
});
// Subscribe to all inactive event topics for this camera.
app.post("/admin/cameras/:id/subscribe-events", async (event) => {
const id = Number(getRouterParam(event, "id"));
const cam = await deps.repo.getCameraById(id);
if (!cam) {
return new Response(null, { status: 302, headers: { location: `/admin/cameras/${id}` } });
}
await deps.repo.setAllEventSubscriptionsStatus(id, "inactive", "pending");
return new Response(null, { status: 302, headers: { location: `/admin/cameras/${id}` } });
});
app.post("/admin/cameras/:id/delete", async (event) => {
event.context.obs?.log.info("camera delete {id} by {user}", { id: getRouterParam(event, "id") ?? "?", user: event.context.user?.username ?? "unknown" });
const id = Number(getRouterParam(event, "id"));

View file

@ -12,6 +12,7 @@ import type {
AuditEntry,
AuditResult,
Camera,
CameraEventSubscription,
CloudAccount,
CloudVendor,
CameraStream,
@ -24,6 +25,7 @@ import type {
EntityType,
EventLog,
EventSourceType,
EventSubscriptionStatus,
FirmwareChannel,
FirmwareRelease,
FirmwareRollout,
@ -472,3 +474,16 @@ export function rowToCloudAccount(r: Row): CloudAccount {
created_at: s(r["created_at"]),
};
}
export function rowToCameraEventSubscription(r: Row): CameraEventSubscription {
return {
id: n(r["id"]),
camera_id: n(r["camera_id"]),
topic: s(r["topic"]),
status: s(r["status"]) as EventSubscriptionStatus,
subscribed_by_kiosk_id: nn(r["subscribed_by_kiosk_id"]),
last_event_at: sn(r["last_event_at"]),
error_message: sn(r["error_message"]),
created_at: s(r["created_at"]),
};
}

View file

@ -465,4 +465,18 @@ export const TENANT_MIGRATIONS: readonly string[] = [
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
)`,
`CREATE INDEX IF NOT EXISTS idx_cloud_accounts_vendor ON cloud_accounts(vendor)`,
// ---- camera_event_subscriptions ---------------------------------------------
`CREATE TABLE IF NOT EXISTS camera_event_subscriptions (
id SERIAL PRIMARY KEY,
camera_id INTEGER NOT NULL REFERENCES cameras(id) ON DELETE CASCADE,
topic TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'inactive' CHECK(status IN ('inactive', 'pending', 'active', 'failed')),
subscribed_by_kiosk_id INTEGER REFERENCES kiosks(id) ON DELETE SET NULL,
last_event_at TIMESTAMPTZ,
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE(camera_id, topic)
)`,
`CREATE INDEX IF NOT EXISTS idx_camera_event_subs_camera ON camera_event_subscriptions(camera_id)`,
];

View file

@ -1074,4 +1074,18 @@ export const MIGRATIONS: readonly MigrationEntry[] = [
},
`CREATE INDEX IF NOT EXISTS idx_cameras_cloud_account ON cameras(cloud_account_id)`,
`CREATE INDEX IF NOT EXISTS idx_cameras_cloud_vendor ON cameras(cloud_account_id, cloud_vendor_camera_id)`,
// ---- camera_event_subscriptions: per-camera per-topic subscription state ---
`CREATE TABLE IF NOT EXISTS camera_event_subscriptions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
camera_id INTEGER NOT NULL REFERENCES cameras(id) ON DELETE CASCADE,
topic TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'inactive' CHECK(status IN ('inactive', 'pending', 'active', 'failed')),
subscribed_by_kiosk_id INTEGER REFERENCES kiosks(id) ON DELETE SET NULL,
last_event_at TEXT,
error_message TEXT,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
UNIQUE(camera_id, topic)
) STRICT`,
`CREATE INDEX IF NOT EXISTS idx_camera_event_subs_camera ON camera_event_subscriptions(camera_id)`,
];

View file

@ -19,6 +19,7 @@ import type {
AuditEntry,
AuditResult,
Camera,
CameraEventSubscription,
CameraStream,
CameraType,
CloudAccount,
@ -28,6 +29,7 @@ import type {
EventLog,
EventQueryFilters,
EventSourceType,
EventSubscriptionStatus,
FirmwareChannel,
FirmwareRelease,
FirmwareRollout,
@ -61,6 +63,7 @@ import {
rowToApiKey,
rowToAuditEntry,
rowToCamera,
rowToCameraEventSubscription,
rowToCloudAccount,
rowToCameraStream,
rowToDisplay,
@ -2411,6 +2414,68 @@ export class Repository {
void this.notify("labels", "update", id);
}
// ===========================================================================
// camera_event_subscriptions
// ===========================================================================
async listEventSubscriptions(cameraId: number): Promise<CameraEventSubscription[]> {
const rs = await this._all(
"SELECT * FROM camera_event_subscriptions WHERE camera_id = ? ORDER BY topic",
[cameraId],
);
return rs.map((r) => rowToCameraEventSubscription(r as Record<string, unknown>));
}
async upsertEventSubscription(input: {
camera_id: number;
topic: string;
status?: EventSubscriptionStatus;
}): Promise<void> {
const status = input.status ?? "inactive";
await this._run(
`INSERT INTO camera_event_subscriptions (camera_id, topic, status)
VALUES (?, ?, ?)
ON CONFLICT (camera_id, topic) DO UPDATE SET status = COALESCE(NULLIF(?, ''), camera_event_subscriptions.status)`,
[input.camera_id, input.topic, status, status],
);
}
async updateEventSubscriptionStatus(
cameraId: number,
topic: string,
status: EventSubscriptionStatus,
error?: string | null,
): Promise<void> {
await this._run(
`UPDATE camera_event_subscriptions
SET status = ?, error_message = ?
WHERE camera_id = ? AND topic = ?`,
[status, error ?? null, cameraId, topic],
);
}
async markEventReceived(cameraId: number, topic: string): Promise<void> {
await this._run(
`UPDATE camera_event_subscriptions
SET last_event_at = ?, status = 'active'
WHERE camera_id = ? AND topic = ?`,
[isoNow(), cameraId, topic],
);
}
async setAllEventSubscriptionsStatus(
cameraId: number,
fromStatus: EventSubscriptionStatus,
toStatus: EventSubscriptionStatus,
): Promise<void> {
await this._run(
`UPDATE camera_event_subscriptions
SET status = ?
WHERE camera_id = ? AND status = ?`,
[toStatus, cameraId, fromStatus],
);
}
// ===========================================================================
// cloud_accounts
// ===========================================================================

View file

@ -405,6 +405,19 @@ export interface EventLog {
forwarded_to_nodered: boolean;
}
export type EventSubscriptionStatus = "inactive" | "pending" | "active" | "failed";
export interface CameraEventSubscription {
id: number;
camera_id: number;
topic: string;
status: EventSubscriptionStatus;
subscribed_by_kiosk_id: number | null;
last_event_at: string | null;
error_message: string | null;
created_at: string;
}
export interface EventQueryFilters {
topic?: string;
kiosk_id?: number;

View file

@ -6,6 +6,7 @@ import { Layout } from "./layout.js";
import type {
AuditEntry,
Camera,
CameraEventSubscription,
Display,
Entity,
FirmwareRelease,
@ -1168,6 +1169,7 @@ interface CameraEditProps {
allLabels: Label[];
streams: Array<{ id: number; role: string; name: string; rtsp_uri: string }>;
subscriptions: CameraSubscription[];
eventSubscriptions?: CameraEventSubscription[];
error?: string;
success?: string;
}
@ -1370,24 +1372,64 @@ export function CameraEditPage(props: CameraEditProps) {
</form>
</div>
{cam.type === "onvif" && (
{cam.type === "onvif" && (() => {
const subs = props.eventSubscriptions ?? [];
const subMap = new Map(subs.map((s) => [s.topic, s]));
// Merge: show all topics from subscriptions table + any in supported_event_topics not yet in DB
const allTopics = new Set([...subs.map((s) => s.topic), ...cam.supported_event_topics]);
const sortedTopics = [...allTopics].sort();
const hasInactive = subs.some((s) => s.status === "inactive");
return (
<div class="card" style="margin-bottom:1.5rem">
<h2 style="margin:0 0 1rem; font-size:1.1rem">Supported Event Topics</h2>
<h2 style="margin:0 0 1rem; font-size:1.1rem">Event Topics &amp; Subscriptions</h2>
<p style="color:#666; font-size:0.85rem; margin-bottom:0.75rem">
Topics this camera advertises via GetEventProperties. Click refresh
to re-query the camera (via the designated event source).
to re-query the camera (via the designated event source). New topics
are merged into the list and never removed.
</p>
<form method="post" action={`/admin/cameras/${cam.id}/refresh-events`} style="margin-bottom:0.75rem">
<div style="display:flex; gap:0.5rem; margin-bottom:0.75rem">
<form method="post" action={`/admin/cameras/${cam.id}/refresh-events`}>
<button type="submit" class="btn btn-sm">Refresh from camera</button>
</form>
{cam.supported_event_topics.length > 0 ? (
{hasInactive && (
<form method="post" action={`/admin/cameras/${cam.id}/subscribe-events`}>
<button type="submit" class="btn btn-sm btn-primary">Subscribe all inactive</button>
</form>
)}
</div>
{sortedTopics.length > 0 ? (
<div class="table-wrap">
<table>
<thead><tr><th>Topic</th></tr></thead>
<thead><tr><th style="width:2rem">Status</th><th>Topic</th><th>Last Event</th><th>Error</th></tr></thead>
<tbody>
{cam.supported_event_topics.map((t) => (
<tr><td><code style="font-size:0.8rem">{t}</code></td></tr>
))}
{[...sortedTopics].map((t) => {
const sub = subMap.get(t);
const status = sub?.status ?? "inactive";
const dotColor =
status === "active" ? "#22c55e" :
status === "pending" ? "#f59e0b" :
status === "failed" ? "#ef4444" :
"#9ca3af";
const dotTitle =
status === "active" ? "Active" :
status === "pending" ? "Pending" :
status === "failed" ? "Failed" :
"Inactive";
return (
<tr>
<td style="text-align:center" title={dotTitle}>
<span style={`display:inline-block; width:10px; height:10px; border-radius:50%; background:${dotColor}`}></span>
</td>
<td><code style="font-size:0.8rem">{t}</code></td>
<td style="font-size:0.8rem; white-space:nowrap; color:#666">
{sub?.last_event_at ? formatTime(sub.last_event_at) : "—"}
</td>
<td style="font-size:0.75rem; color:#ef4444; max-width:200px; overflow:hidden; text-overflow:ellipsis">
{sub?.error_message ?? ""}
</td>
</tr>
);
})}
</tbody>
</table>
</div>
@ -1395,7 +1437,8 @@ export function CameraEditPage(props: CameraEditProps) {
<p style="color:#999">No topics discovered yet. Click refresh above.</p>
)}
</div>
)}
);
})()}
<div class="card" style="margin-bottom:1.5rem">
<h2 style="margin:0 0 1rem; font-size:1.1rem">Labels</h2>