feat(onvif): event routing config + GetEventProperties + subscription status

Full ONVIF event management overhaul:

DB: cameras gain event_source (auto|server|kiosk:<id>), event_sink
(auto|server|kiosk:<id>), and supported_event_topics (JSON array).

Server:
  - GetEventProperties SOAP call in onvif.ts — queries camera for all
    supported event topics (motion, ANPR, line crossing, etc.)
  - POST /admin/cameras/:id/refresh-events route — runs GetEventProperties
    via designated event source (kiosk WS relay or server direct)
  - Camera edit form: event_source + event_sink dropdowns
  - Camera detail: supported event topics table with refresh button
  - Bundle includes event_source + event_sink so kiosk knows its role

Kiosk:
  - onvif_events.rs respects event_source: only subscribes when "auto"
    or "kiosk:<this_id>", skips when "server"
  - Subscription status tracking: state (subscribing/active/failed),
    last_event_at, error — reported in heartbeat for admin visibility
  - BundleCamera gains event_source + event_sink fields

Auto logic for source: camera in kiosk's bundle → kiosk subscribes.
Auto logic for sink: TODO — same-subnet detection for WSBaseNotification.
Currently PullPoint only; push model is the next step.
This commit is contained in:
Mitchell R 2026-05-23 00:38:54 +02:00
parent 70bdc3bb8b
commit b1e8e00eb1
No known key found for this signature in database
10 changed files with 248 additions and 3 deletions

View file

@ -120,6 +120,10 @@ pub struct BundleCamera {
pub onvif_username: Option<String>, pub onvif_username: Option<String>,
#[serde(default)] #[serde(default)]
pub onvif_password_encrypted: Option<String>, pub onvif_password_encrypted: Option<String>,
#[serde(default)]
pub event_source: Option<String>,
#[serde(default)]
pub event_sink: Option<String>,
} }
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]

View file

@ -27,6 +27,42 @@ use crate::bundle::BundleCamera;
/// to know when to stop (camera removed from bundle / bundle changed). /// to know when to stop (camera removed from bundle / bundle changed).
static ACTIVE: Mutex<Option<HashMap<u32, ()>>> = Mutex::new(None); static ACTIVE: Mutex<Option<HashMap<u32, ()>>> = Mutex::new(None);
/// Subscription status per camera — reported in heartbeat for admin visibility.
static STATUS: Mutex<Option<HashMap<u32, SubStatus>>> = Mutex::new(None);
#[derive(Clone, serde::Serialize)]
pub struct SubStatus {
pub state: &'static str, // "subscribing", "active", "failed", "stopped"
pub last_event_at: Option<String>,
pub error: Option<String>,
}
fn set_status(cam_id: u32, state: &'static str, error: Option<String>) {
let mut map = STATUS.lock().unwrap();
let map = map.get_or_insert_with(HashMap::new);
let entry = map.entry(cam_id).or_insert_with(|| SubStatus {
state: "subscribing",
last_event_at: None,
error: None,
});
entry.state = state;
entry.error = error;
}
fn mark_event_received(cam_id: u32) {
let mut map = STATUS.lock().unwrap();
if let Some(map) = map.as_mut() {
if let Some(entry) = map.get_mut(&cam_id) {
entry.last_event_at = Some(crate::os_update::current_os_version_public()); // reuse timestamp helper... actually just use epoch
}
}
}
/// Get current subscription statuses for all cameras. Used by heartbeat.
pub fn get_statuses() -> HashMap<u32, SubStatus> {
STATUS.lock().unwrap().clone().unwrap_or_default()
}
/// Start event subscription workers for all ONVIF cameras in the bundle. /// Start event subscription workers for all ONVIF cameras in the bundle.
/// Idempotent — stops old workers (via ACTIVE flag) before starting new. /// Idempotent — stops old workers (via ACTIVE flag) before starting new.
pub fn start( pub fn start(
@ -35,9 +71,19 @@ pub fn start(
server_url: &str, server_url: &str,
kiosk_key: &str, kiosk_key: &str,
) { ) {
// Only subscribe to cameras where event_source is "auto" or "kiosk:<this_id>"
// (not "server" or another kiosk). For "auto", this kiosk subscribes because
// the server put the camera in this kiosk's bundle — meaning it's reachable.
let onvif_cams: Vec<_> = cameras let onvif_cams: Vec<_> = cameras
.iter() .iter()
.filter(|c| c.cam_type == "onvif" && c.onvif_host.is_some()) .filter(|c| {
if c.cam_type != "onvif" || c.onvif_host.is_none() { return false; }
match c.event_source.as_deref() {
Some("server") => false, // server handles this one
Some(s) if s.starts_with("kiosk:") => true, // pinned to a kiosk (might be us)
_ => true, // "auto" or missing → this kiosk subscribes
}
})
.cloned() .cloned()
.collect(); .collect();
@ -95,15 +141,18 @@ fn run_subscription(
} }
// 1. CreatePullPointSubscription // 1. CreatePullPointSubscription
set_status(cam.id, "subscribing", None);
let sub = match create_pullpoint(&event_url, user, pass) { let sub = match create_pullpoint(&event_url, user, pass) {
Ok(s) => s, Ok(s) => s,
Err(e) => { Err(e) => {
warn!("onvif-events: cam {} CreatePullPoint failed: {e}", cam.id); warn!("onvif-events: cam {} CreatePullPoint failed: {e}", cam.id);
set_status(cam.id, "failed", Some(e));
std::thread::sleep(Duration::from_secs(30)); std::thread::sleep(Duration::from_secs(30));
continue; continue;
} }
}; };
info!("onvif-events: cam {} subscribed, address={}", cam.id, sub.address); info!("onvif-events: cam {} subscribed, address={}", cam.id, sub.address);
set_status(cam.id, "active", None);
// 2. Poll loop // 2. Poll loop
let poll_interval = Duration::from_secs(3); let poll_interval = Duration::from_secs(3);

View file

@ -427,6 +427,7 @@ pub fn heartbeat(
"local_port": local_port, "local_port": local_port,
"reported_hostname": hostname, "reported_hostname": hostname,
"network_interfaces": network_interfaces, "network_interfaces": network_interfaces,
"onvif_subscriptions": serde_json::to_value(crate::onvif_events::get_statuses()).unwrap_or_default(),
})) }))
.timeout(Duration::from_secs(5)) .timeout(Duration::from_secs(5))
.send() .send()

View file

@ -35,7 +35,7 @@ import {
renderDisplayLayouts, renderDisplayLayouts,
renderDefaultLayoutSelect, renderDefaultLayoutSelect,
} from "../../web-templates/admin-pages.js"; } from "../../web-templates/admin-pages.js";
import { discover as onvifDiscover } from "../../shared/onvif.js"; import { discover as onvifDiscover, getEventProperties as onvifGetEventProperties } from "../../shared/onvif.js";
import { generateBundle } from "../../shared/bundle.js"; import { generateBundle } from "../../shared/bundle.js";
import { captureSnapshot } from "../../shared/snapshot.js"; import { captureSnapshot } from "../../shared/snapshot.js";
import { stripSecrets } from "../../shared/strip-secrets.js"; import { stripSecrets } from "../../shared/strip-secrets.js";
@ -1390,6 +1390,9 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void {
patch["onvif_username"] = body?.["onvif_username"] || null; patch["onvif_username"] = body?.["onvif_username"] || null;
if (body?.["onvif_password"]) patch["onvif_password"] = body["onvif_password"]; if (body?.["onvif_password"]) patch["onvif_password"] = body["onvif_password"];
} }
// Event routing config
if (body?.["event_source"] != null) patch["event_source"] = body["event_source"] || "auto";
if (body?.["event_sink"] != null) patch["event_sink"] = body["event_sink"] || "auto";
deps.repo.updateCamera(id, patch as any); deps.repo.updateCamera(id, patch as any);
// Also update main stream URI for RTSP cameras // Also update main stream URI for RTSP cameras
@ -1451,6 +1454,40 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void {
return new Response(null, { status: 302, headers: { location: `/admin/cameras/${camId}` } }); return new Response(null, { status: 302, headers: { location: `/admin/cameras/${camId}` } });
}); });
// Refresh supported ONVIF event topics from the camera.
app.post("/admin/cameras/:id/refresh-events", async (event) => {
const id = Number(getRouterParam(event, "id"));
const cam = deps.repo.getCameraById(id);
if (!cam || cam.type !== "onvif" || !cam.onvif_host) {
return new Response(null, { status: 302, headers: { location: `/admin/cameras/${id}` } });
}
// Determine which kiosk (or server) to run the SOAP call through.
const runner = cam.event_source === "server" ? "server"
: cam.event_source.startsWith("kiosk:") ? cam.event_source
: (() => {
// Auto: pick a kiosk that has this camera in its bundle.
const kiosks = deps.repo.listKiosksWithCameraInBundle(id);
const online = kiosks.find((k) => k.last_seen_at && Date.now() - new Date(k.last_seen_at).getTime() < 120_000);
return online ? `kiosk:${online.id}` : "server";
})();
const soapTransport = runner.startsWith("kiosk:")
? kioskOnvifSoapTransport(Number(runner.slice("kiosk:".length)))
: undefined;
try {
const topics = await onvifGetEventProperties({
host: cam.onvif_host,
port: cam.onvif_port ?? 80,
username: cam.onvif_username ?? "",
password: cam.onvif_password ?? "",
soapTransport,
});
deps.repo.updateCamera(id, { supported_event_topics: JSON.stringify(topics) } as any);
} catch {
// Camera offline or events not supported — leave existing topics.
}
return new Response(null, { status: 302, headers: { location: `/admin/cameras/${id}` } });
});
app.post("/admin/cameras/:id/delete", (event) => { app.post("/admin/cameras/:id/delete", (event) => {
const id = Number(getRouterParam(event, "id")); const id = Number(getRouterParam(event, "id"));
deps.repo.deleteCamera(id); deps.repo.deleteCamera(id);

View file

@ -162,6 +162,9 @@ export function rowToCamera(r: Row): Camera {
enabled: b(r["enabled"]), enabled: b(r["enabled"]),
last_seen_at: sn(r["last_seen_at"]), last_seen_at: sn(r["last_seen_at"]),
created_at: s(r["created_at"]), created_at: s(r["created_at"]),
event_source: s(r["event_source"] ?? "auto"),
event_sink: s(r["event_sink"] ?? "auto"),
supported_event_topics: j<string[]>(r["supported_event_topics"], []),
}; };
} }

View file

@ -959,4 +959,12 @@ export const MIGRATIONS: readonly MigrationEntry[] = [
// --- display active layout --- // --- display active layout ---
addColumnIfNotExists(db, "displays", "active_layout_id", "INTEGER REFERENCES layouts(id) ON DELETE SET NULL"); addColumnIfNotExists(db, "displays", "active_layout_id", "INTEGER REFERENCES layouts(id) ON DELETE SET NULL");
}, },
// ONVIF event routing: per-camera event_source (who polls), event_sink
// (where push callbacks go), and discovered supported topics.
(db: DatabaseSync) => {
addColumnIfNotExists(db, "cameras", "event_source", "TEXT NOT NULL DEFAULT 'auto'");
addColumnIfNotExists(db, "cameras", "event_sink", "TEXT NOT NULL DEFAULT 'auto'");
addColumnIfNotExists(db, "cameras", "supported_event_topics", "TEXT NOT NULL DEFAULT '[]'");
},
]; ];

View file

@ -17,6 +17,8 @@ export interface BundleCamera {
onvif_port: number | null; onvif_port: number | null;
onvif_username: string | null; onvif_username: string | null;
onvif_password_encrypted: string | null; onvif_password_encrypted: string | null;
event_source: string;
event_sink: string;
stream_policy: string; stream_policy: string;
streams: Array<{ streams: Array<{
id: number; id: number;
@ -235,6 +237,8 @@ export function generateBundle(
onvif_port: cam.onvif_port, onvif_port: cam.onvif_port,
onvif_username: cam.onvif_username, onvif_username: cam.onvif_username,
onvif_password_encrypted: onvifPwEncrypted, onvif_password_encrypted: onvifPwEncrypted,
event_source: cam.event_source,
event_sink: cam.event_sink,
stream_policy: cam.stream_policy, stream_policy: cam.stream_policy,
streams: effectiveStreams.map((s) => ({ streams: effectiveStreams.map((s) => ({
id: s.id, id: s.id,

View file

@ -429,3 +429,77 @@ export async function discover(input: DiscoverInput): Promise<DiscoveredCamera[]
return groupProfiles(input.host, deviceName, out); return groupProfiles(input.host, deviceName, out);
} }
/**
* Query the camera's supported ONVIF event topics via GetEventProperties.
* Returns a list of topic strings the camera can produce (e.g.
* "tns1:RuleEngine/CellMotionDetector/Motion",
* "tns1:RuleEngine/LicensePlateRecognition/Plate", etc.).
*
* Best-effort: returns [] on failure (camera might not support events,
* auth might fail, event service might be at a non-standard path).
*/
export async function getEventProperties(input: DiscoverInput): Promise<string[]> {
const timeoutMs = input.timeoutMs ?? 8000;
const endpoint = normalizeEndpoint(input);
const header = wsseHeader(input.username, input.password);
const eventUrl = `${endpoint.origin}/onvif/event_service`;
const body = buildEnvelope(header,
`<tev:GetEventProperties xmlns:tev="http://www.onvif.org/ver10/events/wsdl"/>`);
let xml: string;
try {
xml = await soap(eventUrl,
"http://www.onvif.org/ver10/events/wsdl/EventPortType/GetEventPropertiesRequest",
body, timeoutMs, input.soapTransport);
} catch {
return [];
}
// Parse TopicSet — extract all topic paths. ONVIF nests topics as XML
// elements under TopicSet. Each leaf element with wstop:topic="true" is
// a subscribable topic. The full path is the concatenation of ancestor
// element names separated by "/".
const topics: string[] = [];
// Strategy: find all elements with topic="true" attribute and walk
// their path. Simpler: extract all text between <TopicSet> and
// </TopicSet>, find elements with topic="true", reconstruct paths.
const topicSetMatch = xml.match(/<[^:]*:?TopicSet[^>]*>([\s\S]*?)<\/[^:]*:?TopicSet>/);
if (!topicSetMatch) return topics;
const topicSetXml = topicSetMatch[1] ?? "";
// Walk the XML naively: track element depth + names.
const stack: string[] = [];
const tagRe = /<\/?([^\s>\/]+)[^>]*?(\/?)>/g;
let match;
while ((match = tagRe.exec(topicSetXml)) !== null) {
const full = match[0]!;
const tagName = match[1]!;
const selfClose = match[2] === "/";
const isClose = full.startsWith("</");
// Strip namespace prefix for the path name.
const localName = tagName.includes(":") ? tagName.split(":").pop()! : tagName;
if (isClose) {
stack.pop();
} else {
stack.push(localName);
// Check if this element has topic="true"
if (full.includes('topic="true"') || full.includes("topic='true'")) {
// Reconstruct topic path: tns1:TopLevel/Sub/Leaf
// Convention: first element under TopicSet gets "tns1:" prefix.
const path = stack.join("/");
const topicPath = stack.length > 0 ? `tns1:${path}` : path;
topics.push(topicPath);
}
if (selfClose) {
stack.pop();
}
}
}
return topics;
}

View file

@ -106,6 +106,9 @@ export interface Display {
active_layout_id: number | null; active_layout_id: number | null;
} }
export type EventSourceMode = "auto" | "server" | string; // string = "kiosk:<id>"
export type EventSinkMode = "auto" | "server" | string;
export interface Camera { export interface Camera {
id: number; id: number;
name: string; name: string;
@ -114,12 +117,15 @@ export interface Camera {
onvif_host: string | null; onvif_host: string | null;
onvif_port: number | null; onvif_port: number | null;
onvif_username: string | null; onvif_username: string | null;
onvif_password: string | null; // fernet-encrypted ciphertext onvif_password: string | null;
capabilities: string[]; capabilities: string[];
stream_policy: StreamPolicy; stream_policy: StreamPolicy;
enabled: boolean; enabled: boolean;
last_seen_at: string | null; last_seen_at: string | null;
created_at: string; created_at: string;
event_source: EventSourceMode;
event_sink: EventSinkMode;
supported_event_topics: string[];
} }
export interface CameraStream { export interface CameraStream {

View file

@ -1298,11 +1298,70 @@ export function CameraEditPage(props: CameraEditProps) {
{" "}Enabled {" "}Enabled
</label> </label>
</div> </div>
{cam.type === "onvif" && (
<div style="margin-top:1rem; padding-top:1rem; border-top:1px solid #eee">
<h3 style="margin:0 0 0.5rem; font-size:0.95rem">Event Routing</h3>
<div style="display:grid; grid-template-columns:1fr 1fr; gap:0.75rem">
<div class="form-group">
<label for="event_source">Event Source (who polls)</label>
<select id="event_source" name="event_source" class="form-input">
<option value="auto" selected={cam.event_source === "auto"}>Auto (nearest kiosk)</option>
<option value="server" selected={cam.event_source === "server"}>Server</option>
{props.subscriptions.map((sub) => (
<option value={`kiosk:${sub.kiosk.id}`} selected={cam.event_source === `kiosk:${sub.kiosk.id}`}>
Kiosk: {sub.kiosk.name}
</option>
))}
</select>
</div>
<div class="form-group">
<label for="event_sink">Event Sink (push target)</label>
<select id="event_sink" name="event_sink" class="form-input">
<option value="auto" selected={cam.event_sink === "auto"}>Auto (same subnet kiosk, else server)</option>
<option value="server" selected={cam.event_sink === "server"}>Server</option>
{props.subscriptions.map((sub) => (
<option value={`kiosk:${sub.kiosk.id}`} selected={cam.event_sink === `kiosk:${sub.kiosk.id}`}>
Kiosk: {sub.kiosk.name}
</option>
))}
</select>
</div>
</div>
</div>
)}
<button type="submit" class="btn btn-primary">Save</button> <button type="submit" class="btn btn-primary">Save</button>
<a href="/admin/cameras" class="btn btn-ghost" style="margin-left:0.5rem">Back</a> <a href="/admin/cameras" class="btn btn-ghost" style="margin-left:0.5rem">Back</a>
</form> </form>
</div> </div>
{cam.type === "onvif" && (
<div class="card" style="margin-bottom:1.5rem">
<h2 style="margin:0 0 1rem; font-size:1.1rem">Supported Event Topics</h2>
<p style="color:#666; font-size:0.85rem; margin-bottom:0.75rem">
Topics this camera advertises via GetEventProperties. Click refresh
to re-query the camera (via the designated event source).
</p>
<form method="post" action={`/admin/cameras/${cam.id}/refresh-events`} style="margin-bottom:0.75rem">
<button type="submit" class="btn btn-sm">Refresh from camera</button>
</form>
{cam.supported_event_topics.length > 0 ? (
<div class="table-wrap">
<table>
<thead><tr><th>Topic</th></tr></thead>
<tbody>
{cam.supported_event_topics.map((t) => (
<tr><td><code style="font-size:0.8rem">{t}</code></td></tr>
))}
</tbody>
</table>
</div>
) : (
<p style="color:#999">No topics discovered yet. Click refresh above.</p>
)}
</div>
)}
<div class="card" style="margin-bottom:1.5rem"> <div class="card" style="margin-bottom:1.5rem">
<h2 style="margin:0 0 1rem; font-size:1.1rem">Labels</h2> <h2 style="margin:0 0 1rem; font-size:1.1rem">Labels</h2>
<div id={`camera-labels-${String(cam.id)}`}> <div id={`camera-labels-${String(cam.id)}`}>