mirror of
https://github.com/BetterCorp/BetterFrame.git
synced 2026-05-26 23:26:34 +00:00
feat: ONVIF subscription auto-refresh on failure or 24h staleness
- Track subscribed_at timestamp per camera in SubStatus - Fix mark_event_received to use epoch seconds (was OS version string) - needs_refresh() returns true when any sub is failed/stopped or >24h old - Heartbeat loop calls maybe_refresh_onvif() every 60s — reloads cameras from cached bundle and restarts onvif_events::start() which kills old generation threads and creates fresh PullPoint subscriptions - mark_event_received called on each successful event forward Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
8c59bb6b02
commit
bca3df1b83
2 changed files with 69 additions and 2 deletions
|
|
@ -38,32 +38,73 @@ static STATUS: Mutex<Option<HashMap<String, SubStatus>>> = Mutex::new(None);
|
||||||
|
|
||||||
#[derive(Clone, serde::Serialize)]
|
#[derive(Clone, serde::Serialize)]
|
||||||
pub struct SubStatus {
|
pub struct SubStatus {
|
||||||
pub state: &'static str, // "subscribing", "active", "failed", "stopped"
|
pub state: &'static str,
|
||||||
pub last_event_at: Option<String>,
|
pub last_event_at: Option<String>,
|
||||||
|
pub subscribed_at: Option<String>,
|
||||||
pub error: Option<String>,
|
pub error: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn epoch_now() -> String {
|
||||||
|
let secs = std::time::SystemTime::now()
|
||||||
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
|
.map(|d| d.as_secs())
|
||||||
|
.unwrap_or(0);
|
||||||
|
format!("{secs}")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn epoch_now_secs() -> u64 {
|
||||||
|
std::time::SystemTime::now()
|
||||||
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
|
.map(|d| d.as_secs())
|
||||||
|
.unwrap_or(0)
|
||||||
|
}
|
||||||
|
|
||||||
fn set_status(cam_id: &str, state: &'static str, error: Option<String>) {
|
fn set_status(cam_id: &str, state: &'static str, error: Option<String>) {
|
||||||
let mut map = STATUS.lock().unwrap();
|
let mut map = STATUS.lock().unwrap();
|
||||||
let map = map.get_or_insert_with(HashMap::new);
|
let map = map.get_or_insert_with(HashMap::new);
|
||||||
let entry = map.entry(cam_id.to_string()).or_insert_with(|| SubStatus {
|
let entry = map.entry(cam_id.to_string()).or_insert_with(|| SubStatus {
|
||||||
state: "subscribing",
|
state: "subscribing",
|
||||||
last_event_at: None,
|
last_event_at: None,
|
||||||
|
subscribed_at: None,
|
||||||
error: None,
|
error: None,
|
||||||
});
|
});
|
||||||
entry.state = state;
|
entry.state = state;
|
||||||
entry.error = error;
|
entry.error = error;
|
||||||
|
if state == "active" {
|
||||||
|
entry.subscribed_at = Some(epoch_now());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn mark_event_received(cam_id: &str) {
|
fn mark_event_received(cam_id: &str) {
|
||||||
let mut map = STATUS.lock().unwrap();
|
let mut map = STATUS.lock().unwrap();
|
||||||
if let Some(map) = map.as_mut() {
|
if let Some(map) = map.as_mut() {
|
||||||
if let Some(entry) = map.get_mut(cam_id) {
|
if let Some(entry) = map.get_mut(cam_id) {
|
||||||
entry.last_event_at = Some(crate::os_update::current_os_version_public()); // reuse timestamp helper... actually just use epoch
|
entry.last_event_at = Some(epoch_now());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check if any subscription needs a forced refresh (>24h since subscribe,
|
||||||
|
/// or currently in failed/stopped state).
|
||||||
|
pub fn needs_refresh() -> bool {
|
||||||
|
let map = STATUS.lock().unwrap();
|
||||||
|
let Some(map) = map.as_ref() else { return false };
|
||||||
|
let now = epoch_now_secs();
|
||||||
|
for status in map.values() {
|
||||||
|
if status.state == "failed" || status.state == "stopped" {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if let Some(ref sub_at) = status.subscribed_at {
|
||||||
|
if let Ok(ts) = sub_at.parse::<u64>() {
|
||||||
|
if now.saturating_sub(ts) > 24 * 3600 {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
/// Get current subscription statuses for all cameras. Used by heartbeat.
|
/// Get current subscription statuses for all cameras. Used by heartbeat.
|
||||||
pub fn get_statuses() -> HashMap<String, SubStatus> {
|
pub fn get_statuses() -> HashMap<String, SubStatus> {
|
||||||
STATUS.lock().unwrap().clone().unwrap_or_default()
|
STATUS.lock().unwrap().clone().unwrap_or_default()
|
||||||
|
|
@ -185,6 +226,7 @@ fn run_subscription(
|
||||||
Ok(events) => {
|
Ok(events) => {
|
||||||
for evt in events {
|
for evt in events {
|
||||||
forward_event(server, kiosk_key, &cam.id, &evt, user, pass);
|
forward_event(server, kiosk_key, &cam.id, &evt, user, pass);
|
||||||
|
mark_event_received(&cam.id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|
|
||||||
|
|
@ -311,6 +311,7 @@ fn activate(app: &Application) {
|
||||||
}
|
}
|
||||||
maybe_apply_os_update(&server, &key, &tx_progress);
|
maybe_apply_os_update(&server, &key, &tx_progress);
|
||||||
maybe_apply_firmware_update(&server, &key, &tx_progress);
|
maybe_apply_firmware_update(&server, &key, &tx_progress);
|
||||||
|
maybe_refresh_onvif(&server, &key);
|
||||||
std::thread::sleep(std::time::Duration::from_secs(60));
|
std::thread::sleep(std::time::Duration::from_secs(60));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
@ -624,6 +625,30 @@ fn maybe_apply_firmware_update(server_url: &str, kiosk_key: &str, tx: &mpsc::Sen
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn maybe_refresh_onvif(server_url: &str, kiosk_key: &str) {
|
||||||
|
if !onvif_events::needs_refresh() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
info!("onvif: refreshing stale/failed subscriptions");
|
||||||
|
let bundle = match server::load_cached_bundle() {
|
||||||
|
Some(b) => b,
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
let displays = bundle.normalized_displays();
|
||||||
|
let layout_cam_ids: std::collections::HashSet<String> = displays
|
||||||
|
.iter()
|
||||||
|
.flat_map(|d| d.layouts.iter())
|
||||||
|
.flat_map(|l| l.cells.iter())
|
||||||
|
.filter_map(|c| c.camera_id.clone())
|
||||||
|
.collect();
|
||||||
|
let layout_cameras: Vec<_> = bundle.cameras.iter()
|
||||||
|
.filter(|c| layout_cam_ids.contains(&c.id))
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
let decrypt_key = server::load_encrypt_key().or_else(|| server::load_cluster_key());
|
||||||
|
onvif_events::start(&layout_cameras, decrypt_key.as_deref(), server_url, kiosk_key);
|
||||||
|
}
|
||||||
|
|
||||||
/// Install the once-per-second watchdog that enforces idle/sleep timeouts
|
/// Install the once-per-second watchdog that enforces idle/sleep timeouts
|
||||||
/// per display. Safe to call multiple times — installs at most once.
|
/// per display. Safe to call multiple times — installs at most once.
|
||||||
fn install_idle_watchdog() {
|
fn install_idle_watchdog() {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue