diff --git a/kiosk/src/onvif_events.rs b/kiosk/src/onvif_events.rs index 64d60d2..2577f58 100644 --- a/kiosk/src/onvif_events.rs +++ b/kiosk/src/onvif_events.rs @@ -110,25 +110,27 @@ pub fn get_statuses() -> HashMap { STATUS.lock().unwrap().clone().unwrap_or_default() } -/// Start event subscription workers for all ONVIF cameras in the bundle. -/// Idempotent — stops old workers (via ACTIVE flag) before starting new. +/// Start event subscription workers for ONVIF cameras assigned to this kiosk. +/// Only subscribes when event_source is "auto" or "kiosk:". pub fn start( cameras: &[BundleCamera], cluster_key: Option<&str>, server_url: &str, kiosk_key: &str, ) { - // Only subscribe to cameras where event_source is "auto" or "kiosk:" - // (not "server" or another kiosk). For "auto", this kiosk subscribes because - // the server put the camera in this kiosk's bundle — meaning it's reachable. + let my_kiosk_id = crate::server::load_kiosk_id(); let onvif_cams: Vec<_> = cameras .iter() .filter(|c| { if c.cam_type != "onvif" || c.onvif_host.is_none() { return false; } match c.event_source.as_deref() { - Some("server") => false, // server handles this one - Some(s) if s.starts_with("kiosk:") => true, // pinned to a kiosk (might be us) - _ => true, // "auto" or missing → this kiosk subscribes + Some("server") => false, + Some("none") | Some("disabled") => false, + Some(s) if s.starts_with("kiosk:") => { + let assigned = &s[6..]; + my_kiosk_id.as_deref() == Some(assigned) + } + _ => true, // "auto" or missing } }) .cloned() @@ -180,30 +182,32 @@ fn run_subscription( let has_pass = !pass.is_empty(); info!("onvif-events: cam {} ({}) subscribing at {event_url} user={user} has_pass={has_pass}", cam.id, cam.name); + let mut backoff_secs: u64 = 30; loop { if generation.upgrade().is_none() { info!("onvif-events: cam {} generation expired, exiting", cam.id); return; } - // 1. CreatePullPointSubscription set_status(&cam.id, "subscribing", None); let sub = match create_pullpoint(&event_url, user, pass) { Ok(s) => s, Err(e) => { - warn!("onvif-events: cam {} CreatePullPoint failed: {e}", cam.id); + warn!("onvif-events: cam {} CreatePullPoint failed: {e} (backoff {backoff_secs}s)", cam.id); set_status(&cam.id, "failed", Some(e)); - std::thread::sleep(Duration::from_secs(30)); + std::thread::sleep(Duration::from_secs(backoff_secs)); + backoff_secs = (backoff_secs * 2).min(600); continue; } }; + backoff_secs = 30; info!("onvif-events: cam {} subscribed, address={}", cam.id, sub.address); set_status(&cam.id, "active", None); - // 2. Poll loop - let poll_interval = Duration::from_secs(3); - let renew_interval = Duration::from_secs(55); // renew before 60s timeout + let poll_interval = Duration::from_secs(10); + let renew_interval = Duration::from_secs(55); let mut since_renew = std::time::Instant::now(); + let mut consecutive_errors: u32 = 0; loop { if generation.upgrade().is_none() { @@ -224,16 +228,22 @@ fn run_subscription( match pull_messages(&sub.address, user, pass) { Ok(events) => { + consecutive_errors = 0; for evt in events { forward_event(server, kiosk_key, &cam.id, &evt, user, pass); mark_event_received(&cam.id); } } Err(e) => { - warn!("onvif-events: cam {} pull failed: {e}", cam.id); + consecutive_errors += 1; + let error_backoff = (15 * consecutive_errors as u64).min(300); + warn!("onvif-events: cam {} pull failed ({consecutive_errors}x): {e}, backoff {error_backoff}s", cam.id); set_status(&cam.id, "failed", Some(e)); - std::thread::sleep(Duration::from_secs(15)); - break; // resubscribe after backoff + if consecutive_errors >= 5 { + break; // resubscribe from scratch + } + std::thread::sleep(Duration::from_secs(error_backoff)); + continue; } } diff --git a/kiosk/src/server.rs b/kiosk/src/server.rs index 98fd779..d8cb9e5 100644 --- a/kiosk/src/server.rs +++ b/kiosk/src/server.rs @@ -162,6 +162,10 @@ pub fn load_cached_bundle() -> Option { } } +pub fn load_kiosk_id() -> Option { + load_cached_bundle().map(|b| b.kiosk_id) +} + /// Discover the BetterFrame server. pub fn discover_server(override_url: Option<&str>) -> String { if let Some(url) = override_url { diff --git a/server/src/shared/bundle.ts b/server/src/shared/bundle.ts index a311974..cb6bd7c 100644 --- a/server/src/shared/bundle.ts +++ b/server/src/shared/bundle.ts @@ -302,6 +302,16 @@ export async function generateBundle( }); } + // ONVIF event ownership: for "auto" cameras, first kiosk to fetch bundle + // takes ownership. Server writes "kiosk:" into event_source so + // subsequent kiosks see it's taken and skip. + for (const cam of cameras) { + if (cam.type === "onvif" && cam.event_source === "auto") { + await repo.updateCamera(cam.id, { event_source: `kiosk:${kioskId}` } as any); + cam.event_source = `kiosk:${kioskId}`; + } + } + const bundleCameras: BundleCamera[] = []; for (const cam of cameras) { const streams = await repo.listCameraStreams(cam.id); diff --git a/server/src/shared/db/repository.ts b/server/src/shared/db/repository.ts index d22d20c..e3cdfe2 100644 --- a/server/src/shared/db/repository.ts +++ b/server/src/shared/db/repository.ts @@ -2435,6 +2435,7 @@ export class Repository { await this._run(`DELETE FROM displays WHERE kiosk_id = ?`, [id]); await this._run(`DELETE FROM kiosk_labels WHERE kiosk_id = ?`, [id]); await this._run(`DELETE FROM kiosk_gpio_bindings WHERE kiosk_id = ?`, [id]); + await this._run(`UPDATE cameras SET event_source = 'auto' WHERE event_source = ?`, [`kiosk:${id}`]); await this._run(`DELETE FROM kiosks WHERE id = ?`, [id]); }); for (const display of displays) { @@ -2529,6 +2530,18 @@ export class Repository { // camera_event_subscriptions // =========================================================================== + async getActiveOnvifOwners(): Promise> { + const rs = await this._all<{ camera_id: string; subscribed_by_kiosk_id: string }>( + `SELECT DISTINCT camera_id, subscribed_by_kiosk_id FROM camera_event_subscriptions + WHERE subscribed_by_kiosk_id IS NOT NULL AND status = 'active'`, + ); + const map = new Map(); + for (const r of rs) { + map.set(r.camera_id, r.subscribed_by_kiosk_id); + } + return map; + } + async listEventSubscriptions(cameraId: string): Promise { const rs = await this._all( "SELECT * FROM camera_event_subscriptions WHERE camera_id = ? ORDER BY topic",