fix: ONVIF single-kiosk ownership + rate limiting

Server-side:
- Bundle gen: when camera event_source is "auto", first kiosk to fetch
  bundle claims ownership → writes "kiosk:<id>" to camera row. Other
  kiosks see assigned owner and skip ONVIF subscription.
- Kiosk deletion resets event_source back to "auto" so next kiosk
  takes over.
- repo.getActiveOnvifOwners() for future use.

Kiosk-side:
- Only subscribe when event_source is "auto" or "kiosk:<MY_ID>".
  Skips "kiosk:<other_id>", "server", "none", "disabled".
- Poll interval: 3s → 10s (cameras were getting overwhelmed)
- CreatePullPoint backoff: exponential 30s→60s→120s...→600s max
- Pull errors: exponential 15s→30s→45s...→300s, resubscribe after 5
  consecutive failures instead of immediately.
- load_kiosk_id() helper reads from cached bundle.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mitchell R 2026-05-27 01:40:44 +02:00
parent a518fe17ea
commit e1a3cd1d05
No known key found for this signature in database
4 changed files with 54 additions and 17 deletions

View file

@ -110,25 +110,27 @@ pub fn get_statuses() -> HashMap<String, SubStatus> {
STATUS.lock().unwrap().clone().unwrap_or_default() STATUS.lock().unwrap().clone().unwrap_or_default()
} }
/// Start event subscription workers for all ONVIF cameras in the bundle. /// Start event subscription workers for ONVIF cameras assigned to this kiosk.
/// Idempotent — stops old workers (via ACTIVE flag) before starting new. /// Only subscribes when event_source is "auto" or "kiosk:<this_kiosk_id>".
pub fn start( pub fn start(
cameras: &[BundleCamera], cameras: &[BundleCamera],
cluster_key: Option<&str>, cluster_key: Option<&str>,
server_url: &str, server_url: &str,
kiosk_key: &str, kiosk_key: &str,
) { ) {
// Only subscribe to cameras where event_source is "auto" or "kiosk:<this_id>" let my_kiosk_id = crate::server::load_kiosk_id();
// (not "server" or another kiosk). For "auto", this kiosk subscribes because
// the server put the camera in this kiosk's bundle — meaning it's reachable.
let onvif_cams: Vec<_> = cameras let onvif_cams: Vec<_> = cameras
.iter() .iter()
.filter(|c| { .filter(|c| {
if c.cam_type != "onvif" || c.onvif_host.is_none() { return false; } if c.cam_type != "onvif" || c.onvif_host.is_none() { return false; }
match c.event_source.as_deref() { match c.event_source.as_deref() {
Some("server") => false, // server handles this one Some("server") => false,
Some(s) if s.starts_with("kiosk:") => true, // pinned to a kiosk (might be us) Some("none") | Some("disabled") => false,
_ => true, // "auto" or missing → this kiosk subscribes Some(s) if s.starts_with("kiosk:") => {
let assigned = &s[6..];
my_kiosk_id.as_deref() == Some(assigned)
}
_ => true, // "auto" or missing
} }
}) })
.cloned() .cloned()
@ -180,30 +182,32 @@ fn run_subscription(
let has_pass = !pass.is_empty(); let has_pass = !pass.is_empty();
info!("onvif-events: cam {} ({}) subscribing at {event_url} user={user} has_pass={has_pass}", cam.id, cam.name); info!("onvif-events: cam {} ({}) subscribing at {event_url} user={user} has_pass={has_pass}", cam.id, cam.name);
let mut backoff_secs: u64 = 30;
loop { loop {
if generation.upgrade().is_none() { if generation.upgrade().is_none() {
info!("onvif-events: cam {} generation expired, exiting", cam.id); info!("onvif-events: cam {} generation expired, exiting", cam.id);
return; return;
} }
// 1. CreatePullPointSubscription
set_status(&cam.id, "subscribing", None); set_status(&cam.id, "subscribing", None);
let sub = match create_pullpoint(&event_url, user, pass) { let sub = match create_pullpoint(&event_url, user, pass) {
Ok(s) => s, Ok(s) => s,
Err(e) => { Err(e) => {
warn!("onvif-events: cam {} CreatePullPoint failed: {e}", cam.id); warn!("onvif-events: cam {} CreatePullPoint failed: {e} (backoff {backoff_secs}s)", cam.id);
set_status(&cam.id, "failed", Some(e)); set_status(&cam.id, "failed", Some(e));
std::thread::sleep(Duration::from_secs(30)); std::thread::sleep(Duration::from_secs(backoff_secs));
backoff_secs = (backoff_secs * 2).min(600);
continue; continue;
} }
}; };
backoff_secs = 30;
info!("onvif-events: cam {} subscribed, address={}", cam.id, sub.address); info!("onvif-events: cam {} subscribed, address={}", cam.id, sub.address);
set_status(&cam.id, "active", None); set_status(&cam.id, "active", None);
// 2. Poll loop let poll_interval = Duration::from_secs(10);
let poll_interval = Duration::from_secs(3); let renew_interval = Duration::from_secs(55);
let renew_interval = Duration::from_secs(55); // renew before 60s timeout
let mut since_renew = std::time::Instant::now(); let mut since_renew = std::time::Instant::now();
let mut consecutive_errors: u32 = 0;
loop { loop {
if generation.upgrade().is_none() { if generation.upgrade().is_none() {
@ -224,16 +228,22 @@ fn run_subscription(
match pull_messages(&sub.address, user, pass) { match pull_messages(&sub.address, user, pass) {
Ok(events) => { Ok(events) => {
consecutive_errors = 0;
for evt in events { for evt in events {
forward_event(server, kiosk_key, &cam.id, &evt, user, pass); forward_event(server, kiosk_key, &cam.id, &evt, user, pass);
mark_event_received(&cam.id); mark_event_received(&cam.id);
} }
} }
Err(e) => { Err(e) => {
warn!("onvif-events: cam {} pull failed: {e}", cam.id); consecutive_errors += 1;
let error_backoff = (15 * consecutive_errors as u64).min(300);
warn!("onvif-events: cam {} pull failed ({consecutive_errors}x): {e}, backoff {error_backoff}s", cam.id);
set_status(&cam.id, "failed", Some(e)); set_status(&cam.id, "failed", Some(e));
std::thread::sleep(Duration::from_secs(15)); if consecutive_errors >= 5 {
break; // resubscribe after backoff break; // resubscribe from scratch
}
std::thread::sleep(Duration::from_secs(error_backoff));
continue;
} }
} }

View file

@ -162,6 +162,10 @@ pub fn load_cached_bundle() -> Option<KioskBundle> {
} }
} }
pub fn load_kiosk_id() -> Option<String> {
load_cached_bundle().map(|b| b.kiosk_id)
}
/// Discover the BetterFrame server. /// Discover the BetterFrame server.
pub fn discover_server(override_url: Option<&str>) -> String { pub fn discover_server(override_url: Option<&str>) -> String {
if let Some(url) = override_url { if let Some(url) = override_url {

View file

@ -302,6 +302,16 @@ export async function generateBundle(
}); });
} }
// ONVIF event ownership: for "auto" cameras, first kiosk to fetch bundle
// takes ownership. Server writes "kiosk:<id>" into event_source so
// subsequent kiosks see it's taken and skip.
for (const cam of cameras) {
if (cam.type === "onvif" && cam.event_source === "auto") {
await repo.updateCamera(cam.id, { event_source: `kiosk:${kioskId}` } as any);
cam.event_source = `kiosk:${kioskId}`;
}
}
const bundleCameras: BundleCamera[] = []; const bundleCameras: BundleCamera[] = [];
for (const cam of cameras) { for (const cam of cameras) {
const streams = await repo.listCameraStreams(cam.id); const streams = await repo.listCameraStreams(cam.id);

View file

@ -2435,6 +2435,7 @@ export class Repository {
await this._run(`DELETE FROM displays WHERE kiosk_id = ?`, [id]); await this._run(`DELETE FROM displays WHERE kiosk_id = ?`, [id]);
await this._run(`DELETE FROM kiosk_labels WHERE kiosk_id = ?`, [id]); await this._run(`DELETE FROM kiosk_labels WHERE kiosk_id = ?`, [id]);
await this._run(`DELETE FROM kiosk_gpio_bindings WHERE kiosk_id = ?`, [id]); await this._run(`DELETE FROM kiosk_gpio_bindings WHERE kiosk_id = ?`, [id]);
await this._run(`UPDATE cameras SET event_source = 'auto' WHERE event_source = ?`, [`kiosk:${id}`]);
await this._run(`DELETE FROM kiosks WHERE id = ?`, [id]); await this._run(`DELETE FROM kiosks WHERE id = ?`, [id]);
}); });
for (const display of displays) { for (const display of displays) {
@ -2529,6 +2530,18 @@ export class Repository {
// camera_event_subscriptions // camera_event_subscriptions
// =========================================================================== // ===========================================================================
async getActiveOnvifOwners(): Promise<Map<string, string>> {
const rs = await this._all<{ camera_id: string; subscribed_by_kiosk_id: string }>(
`SELECT DISTINCT camera_id, subscribed_by_kiosk_id FROM camera_event_subscriptions
WHERE subscribed_by_kiosk_id IS NOT NULL AND status = 'active'`,
);
const map = new Map<string, string>();
for (const r of rs) {
map.set(r.camera_id, r.subscribed_by_kiosk_id);
}
return map;
}
async listEventSubscriptions(cameraId: string): Promise<CameraEventSubscription[]> { async listEventSubscriptions(cameraId: string): Promise<CameraEventSubscription[]> {
const rs = await this._all( const rs = await this._all(
"SELECT * FROM camera_event_subscriptions WHERE camera_id = ? ORDER BY topic", "SELECT * FROM camera_event_subscriptions WHERE camera_id = ? ORDER BY topic",