diff --git a/kiosk/src/onvif_events.rs b/kiosk/src/onvif_events.rs index a49ee98..aef9804 100644 --- a/kiosk/src/onvif_events.rs +++ b/kiosk/src/onvif_events.rs @@ -38,32 +38,73 @@ static STATUS: Mutex>> = Mutex::new(None); #[derive(Clone, serde::Serialize)] pub struct SubStatus { - pub state: &'static str, // "subscribing", "active", "failed", "stopped" + pub state: &'static str, pub last_event_at: Option, + pub subscribed_at: Option, pub error: Option, } +fn epoch_now() -> String { + let secs = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + format!("{secs}") +} + +fn epoch_now_secs() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0) +} + fn set_status(cam_id: &str, state: &'static str, error: Option) { let mut map = STATUS.lock().unwrap(); let map = map.get_or_insert_with(HashMap::new); let entry = map.entry(cam_id.to_string()).or_insert_with(|| SubStatus { state: "subscribing", last_event_at: None, + subscribed_at: None, error: None, }); entry.state = state; entry.error = error; + if state == "active" { + entry.subscribed_at = Some(epoch_now()); + } } fn mark_event_received(cam_id: &str) { let mut map = STATUS.lock().unwrap(); if let Some(map) = map.as_mut() { if let Some(entry) = map.get_mut(cam_id) { - entry.last_event_at = Some(crate::os_update::current_os_version_public()); // reuse timestamp helper... actually just use epoch + entry.last_event_at = Some(epoch_now()); } } } +/// Check if any subscription needs a forced refresh (>24h since subscribe, +/// or currently in failed/stopped state). +pub fn needs_refresh() -> bool { + let map = STATUS.lock().unwrap(); + let Some(map) = map.as_ref() else { return false }; + let now = epoch_now_secs(); + for status in map.values() { + if status.state == "failed" || status.state == "stopped" { + return true; + } + if let Some(ref sub_at) = status.subscribed_at { + if let Ok(ts) = sub_at.parse::() { + if now.saturating_sub(ts) > 24 * 3600 { + return true; + } + } + } + } + false +} + /// Get current subscription statuses for all cameras. Used by heartbeat. pub fn get_statuses() -> HashMap { STATUS.lock().unwrap().clone().unwrap_or_default() @@ -185,6 +226,7 @@ fn run_subscription( Ok(events) => { for evt in events { forward_event(server, kiosk_key, &cam.id, &evt, user, pass); + mark_event_received(&cam.id); } } Err(e) => { diff --git a/kiosk/src/ui.rs b/kiosk/src/ui.rs index 94d7dc5..16d00a9 100644 --- a/kiosk/src/ui.rs +++ b/kiosk/src/ui.rs @@ -311,6 +311,7 @@ fn activate(app: &Application) { } maybe_apply_os_update(&server, &key, &tx_progress); maybe_apply_firmware_update(&server, &key, &tx_progress); + maybe_refresh_onvif(&server, &key); std::thread::sleep(std::time::Duration::from_secs(60)); } }); @@ -624,6 +625,30 @@ fn maybe_apply_firmware_update(server_url: &str, kiosk_key: &str, tx: &mpsc::Sen } } +fn maybe_refresh_onvif(server_url: &str, kiosk_key: &str) { + if !onvif_events::needs_refresh() { + return; + } + info!("onvif: refreshing stale/failed subscriptions"); + let bundle = match server::load_cached_bundle() { + Some(b) => b, + None => return, + }; + let displays = bundle.normalized_displays(); + let layout_cam_ids: std::collections::HashSet = displays + .iter() + .flat_map(|d| d.layouts.iter()) + .flat_map(|l| l.cells.iter()) + .filter_map(|c| c.camera_id.clone()) + .collect(); + let layout_cameras: Vec<_> = bundle.cameras.iter() + .filter(|c| layout_cam_ids.contains(&c.id)) + .cloned() + .collect(); + let decrypt_key = server::load_encrypt_key().or_else(|| server::load_cluster_key()); + onvif_events::start(&layout_cameras, decrypt_key.as_deref(), server_url, kiosk_key); +} + /// Install the once-per-second watchdog that enforces idle/sleep timeouts /// per display. Safe to call multiple times — installs at most once. fn install_idle_watchdog() {