feat: ONVIF subscription auto-refresh on failure or 24h staleness

- Track subscribed_at timestamp per camera in SubStatus
- Fix mark_event_received to use epoch seconds (was OS version string)
- needs_refresh() returns true when any sub is failed/stopped or >24h old
- Heartbeat loop calls maybe_refresh_onvif() every 60s — reloads
  cameras from cached bundle and restarts onvif_events::start() which
  kills old generation threads and creates fresh PullPoint subscriptions
- mark_event_received called on each successful event forward

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mitchell R 2026-05-26 15:34:22 +02:00
parent 8c59bb6b02
commit eb8abbdff9
No known key found for this signature in database
2 changed files with 69 additions and 2 deletions

View file

@ -38,32 +38,73 @@ static STATUS: Mutex<Option<HashMap<String, SubStatus>>> = Mutex::new(None);
#[derive(Clone, serde::Serialize)]
pub struct SubStatus {
pub state: &'static str, // "subscribing", "active", "failed", "stopped"
pub state: &'static str,
pub last_event_at: Option<String>,
pub subscribed_at: Option<String>,
pub error: Option<String>,
}
fn epoch_now() -> String {
let secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
format!("{secs}")
}
fn epoch_now_secs() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
fn set_status(cam_id: &str, state: &'static str, error: Option<String>) {
let mut map = STATUS.lock().unwrap();
let map = map.get_or_insert_with(HashMap::new);
let entry = map.entry(cam_id.to_string()).or_insert_with(|| SubStatus {
state: "subscribing",
last_event_at: None,
subscribed_at: None,
error: None,
});
entry.state = state;
entry.error = error;
if state == "active" {
entry.subscribed_at = Some(epoch_now());
}
}
fn mark_event_received(cam_id: &str) {
let mut map = STATUS.lock().unwrap();
if let Some(map) = map.as_mut() {
if let Some(entry) = map.get_mut(cam_id) {
entry.last_event_at = Some(crate::os_update::current_os_version_public()); // reuse timestamp helper... actually just use epoch
entry.last_event_at = Some(epoch_now());
}
}
}
/// Check if any subscription needs a forced refresh (>24h since subscribe,
/// or currently in failed/stopped state).
pub fn needs_refresh() -> bool {
let map = STATUS.lock().unwrap();
let Some(map) = map.as_ref() else { return false };
let now = epoch_now_secs();
for status in map.values() {
if status.state == "failed" || status.state == "stopped" {
return true;
}
if let Some(ref sub_at) = status.subscribed_at {
if let Ok(ts) = sub_at.parse::<u64>() {
if now.saturating_sub(ts) > 24 * 3600 {
return true;
}
}
}
}
false
}
/// Get current subscription statuses for all cameras. Used by heartbeat.
pub fn get_statuses() -> HashMap<String, SubStatus> {
STATUS.lock().unwrap().clone().unwrap_or_default()
@ -185,6 +226,7 @@ fn run_subscription(
Ok(events) => {
for evt in events {
forward_event(server, kiosk_key, &cam.id, &evt, user, pass);
mark_event_received(&cam.id);
}
}
Err(e) => {

View file

@ -311,6 +311,7 @@ fn activate(app: &Application) {
}
maybe_apply_os_update(&server, &key, &tx_progress);
maybe_apply_firmware_update(&server, &key, &tx_progress);
maybe_refresh_onvif(&server, &key);
std::thread::sleep(std::time::Duration::from_secs(60));
}
});
@ -624,6 +625,30 @@ fn maybe_apply_firmware_update(server_url: &str, kiosk_key: &str, tx: &mpsc::Sen
}
}
fn maybe_refresh_onvif(server_url: &str, kiosk_key: &str) {
if !onvif_events::needs_refresh() {
return;
}
info!("onvif: refreshing stale/failed subscriptions");
let bundle = match server::load_cached_bundle() {
Some(b) => b,
None => return,
};
let displays = bundle.normalized_displays();
let layout_cam_ids: std::collections::HashSet<String> = displays
.iter()
.flat_map(|d| d.layouts.iter())
.flat_map(|l| l.cells.iter())
.filter_map(|c| c.camera_id.clone())
.collect();
let layout_cameras: Vec<_> = bundle.cameras.iter()
.filter(|c| layout_cam_ids.contains(&c.id))
.cloned()
.collect();
let decrypt_key = server::load_encrypt_key().or_else(|| server::load_cluster_key());
onvif_events::start(&layout_cameras, decrypt_key.as_deref(), server_url, kiosk_key);
}
/// Install the once-per-second watchdog that enforces idle/sleep timeouts
/// per display. Safe to call multiple times — installs at most once.
fn install_idle_watchdog() {