fix(onvif-events): fix generation leak + namespace Address parsing + backoff

Three bugs:
1. std::mem::forget(generation) leaked the Arc → old threads never
   stopped on bundle reload. Now stored in a static Mutex; new start()
   replaces it → old Arc drops → old Weak::upgrade() returns None.

2. CreatePullPoint Address uses namespace prefix (wsa5:Address,
   a:Address, etc.). Parser only matched plain <Address>. New
   extract_tag_ns tries common prefixes + fallback regex scan.
   Also validates address starts with "http" and logs response
   preview on failure for debugging.

3. Pull failure → immediate resubscribe with no delay → hammers camera.
   Added 15s backoff after pull failure before resubscribe.
This commit is contained in:
Mitchell R 2026-05-23 00:58:11 +02:00
parent b1e8e00eb1
commit d6e65a4168
No known key found for this signature in database

View file

@ -27,6 +27,12 @@ use crate::bundle::BundleCamera;
/// to know when to stop (camera removed from bundle / bundle changed). /// to know when to stop (camera removed from bundle / bundle changed).
static ACTIVE: Mutex<Option<HashMap<u32, ()>>> = Mutex::new(None); static ACTIVE: Mutex<Option<HashMap<u32, ()>>> = Mutex::new(None);
/// Holds the current generation Arc. When start() replaces it, the old
/// Arc drops → old threads' Weak::upgrade() returns None → they exit.
/// Previous code used std::mem::forget which leaked the Arc and kept
/// old threads alive forever.
static GENERATION: Mutex<Option<Arc<()>>> = Mutex::new(None);
/// Subscription status per camera — reported in heartbeat for admin visibility. /// Subscription status per camera — reported in heartbeat for admin visibility.
static STATUS: Mutex<Option<HashMap<u32, SubStatus>>> = Mutex::new(None); static STATUS: Mutex<Option<HashMap<u32, SubStatus>>> = Mutex::new(None);
@ -98,6 +104,9 @@ pub fn start(
drop(active); drop(active);
let generation = Arc::new(()); let generation = Arc::new(());
// Store in static — replaces old generation → old Arc drops → old
// threads' Weak::upgrade() returns None → they exit cleanly.
*GENERATION.lock().unwrap() = Some(generation.clone());
for cam in onvif_cams { for cam in onvif_cams {
let server = server_url.to_string(); let server = server_url.to_string();
@ -112,11 +121,6 @@ pub fn start(
run_subscription(cam, password.as_deref(), &server, &key, weak_gen); run_subscription(cam, password.as_deref(), &server, &key, weak_gen);
}); });
} }
// Keep the Arc alive as long as the bundle is current. When a new
// start() is called the old Arc drops and Weak::upgrade returns None,
// signalling old threads to exit.
std::mem::forget(generation);
} }
fn run_subscription( fn run_subscription(
@ -184,8 +188,9 @@ fn run_subscription(
} }
Err(e) => { Err(e) => {
warn!("onvif-events: cam {} pull failed: {e}", cam.id); warn!("onvif-events: cam {} pull failed: {e}", cam.id);
std::thread::sleep(Duration::from_secs(5)); set_status(cam.id, "failed", Some(e));
break; // resubscribe std::thread::sleep(Duration::from_secs(15));
break; // resubscribe after backoff
} }
} }
@ -265,11 +270,39 @@ fn create_pullpoint(url: &str, user: &str, pass: &str) -> Result<Subscription, S
</tev:CreatePullPointSubscription>"#, </tev:CreatePullPointSubscription>"#,
); );
let xml = soap_post(url, "http://www.onvif.org/ver10/events/wsdl/EventPortType/CreatePullPointSubscriptionRequest", &body)?; let xml = soap_post(url, "http://www.onvif.org/ver10/events/wsdl/EventPortType/CreatePullPointSubscriptionRequest", &body)?;
let address = extract_tag(&xml, "Address") // Camera may use namespaced Address: <wsa5:Address>, <a:Address>,
.ok_or_else(|| "no Address in CreatePullPointSubscription response".to_string())?; // <wsa:Address>, or plain <Address>. Try all.
let address = extract_tag_ns(&xml, "Address")
.filter(|a| !a.is_empty() && a.starts_with("http"))
.ok_or_else(|| {
// Log first 300 chars of response for debugging.
let preview: String = xml.chars().take(300).collect();
format!("no Address in CreatePullPoint response: {preview}")
})?;
Ok(Subscription { address }) Ok(Subscription { address })
} }
/// Extract tag content, trying with and without namespace prefixes.
fn extract_tag_ns(xml: &str, tag: &str) -> Option<String> {
// Try common namespace prefixes for ONVIF/WS-Addressing.
for prefix in &["", "wsa5:", "wsa:", "a:", "wsnt:", "tev:", "tt:"] {
let full = format!("{prefix}{tag}");
if let Some(val) = extract_tag(xml, &full) {
if !val.is_empty() { return Some(val); }
}
}
// Fallback: regex-style scan for any :Address> content.
let pattern = format!(":{tag}>");
if let Some(pos) = xml.find(&pattern) {
let after = &xml[pos + pattern.len()..];
if let Some(end) = after.find('<') {
let val = after[..end].trim();
if !val.is_empty() { return Some(val.to_string()); }
}
}
None
}
fn pull_messages(sub_url: &str, user: &str, pass: &str) -> Result<Vec<OnvifEvent>, String> { fn pull_messages(sub_url: &str, user: &str, pass: &str) -> Result<Vec<OnvifEvent>, String> {
let header = wsse_header(user, pass); let header = wsse_header(user, pass);
let body = soap_envelope( let body = soap_envelope(