feat(kiosk): track main/sub pipelines independently in warm pool

Pool was keyed by camera_id, so a cell flipping M→S tore down the old
pipeline and started fresh. With (camera_id, badge) keys the main and
sub variants live alongside each other: switching badge promotes the
new one to Warm and leaves the previous one to cool down via the normal
state machine, so flipping back inside the cooldown is instant.

ensure_warm no longer touches sibling badge entries. recompute_global_
state computes warm/hot sets as (cam, badge) pairs by calling
pick_stream per cell with its area fraction, so the planner sees what
ensure_warm will actually create.
This commit is contained in:
Mitchell R 2026-05-13 13:00:35 +02:00
parent d5bd64d05c
commit 77b58c07fd

View file

@ -42,15 +42,20 @@ enum WarmthState {
struct PipelineEntry {
pipeline: gstreamer::Pipeline,
paintable: gtk::gdk::Paintable,
badge: char,
state: WarmthState,
cooling_until: Option<Instant>,
}
/// Pool key. A camera can have multiple concurrent pipelines — typically one
/// per (main, sub, other) stream — each with independent warmth state. When a
/// cell switches M↔S we promote the new variant to Warm/Hot but leave the old
/// one to cool down naturally so a quick swap back is instant.
type PoolKey = (u32, char);
thread_local! {
/// camera_id → PipelineEntry. Pool shared across all displays.
/// (camera_id, badge) → PipelineEntry. Pool shared across all displays.
/// State machine: see WarmthState. Entries dropped when state goes Cold.
static WARM_CAMERAS: RefCell<HashMap<u32, PipelineEntry>>
static WARM_CAMERAS: RefCell<HashMap<PoolKey, PipelineEntry>>
= RefCell::new(HashMap::new());
/// Most recently rendered bundle. Used for layout-switch + idle revert.
@ -690,48 +695,74 @@ fn render_layout(display_id: u32, layout_id: u32) {
const DEFAULT_COOLING_SECS: u32 = 30;
/// Walk all displays' currently-active layouts (plus any priority=hot layouts)
/// and recompute the warm/hot pool. Cameras dropped from active layouts
/// and recompute the warm/hot pool. Pool entries dropped from active layouts
/// transition to Cooling; new entries are NOT added here — `ensure_warm` does
/// that when the layout actually renders.
///
/// Pool keys are (camera_id, badge): a camera's main and sub streams are
/// tracked independently, so flipping a cell from M→S promotes the new sub
/// pipeline to Warm/Hot but leaves the existing main pipeline to cool down
/// naturally (and vice-versa).
fn recompute_global_state() {
let bundle = CURRENT_BUNDLE.with(|b| b.borrow().clone());
let Some(bundle) = bundle else { return };
let displays = bundle.normalized_displays();
let mut warm_set: std::collections::HashSet<u32> = std::collections::HashSet::new();
let mut hot_set: std::collections::HashSet<u32> = std::collections::HashSet::new();
let mut warm_set: std::collections::HashSet<PoolKey> = std::collections::HashSet::new();
let mut hot_set: std::collections::HashSet<PoolKey> = std::collections::HashSet::new();
let mut max_cooling_secs: u32 = 0;
let cam_map: HashMap<u32, &crate::bundle::BundleCamera> =
bundle.cameras.iter().map(|c| (c.id, c)).collect();
// Snapshot per-display active layout id outside any borrow of WARM_CAMERAS.
let active: Vec<(u32, Option<u32>)> = DISPLAYS.with(|ds| {
ds.borrow().iter().map(|(id, st)| (*id, st.current_layout_id)).collect()
});
// Helper: compute the pool key (camera_id, badge) for a given cell in a
// layout. Falls back to a "?" badge if pick_stream can't decide (camera
// missing or no streams).
fn cell_keys(
layout: &crate::bundle::BundleLayout,
cam_map: &HashMap<u32, &crate::bundle::BundleCamera>,
out: &mut std::collections::HashSet<PoolKey>,
) {
let total_area = (layout.grid_cols.max(1) * layout.grid_rows.max(1)) as f32;
for cell in &layout.cells {
if cell.content_type != "camera" { continue; }
let Some(cam_id) = cell.camera_id else { continue };
let Some(cam) = cam_map.get(&cam_id) else { continue };
let area = (cell.col_span * cell.row_span) as f32 / total_area;
if let Some((_, badge)) = cam.pick_stream(cell.stream_selector.as_deref(), area) {
out.insert((cam_id, badge));
}
}
// Preload cameras have no cell context — let pick_stream choose
// (typically sub). Different layouts that actually render them will
// promote whichever badge they end up using.
for cam_id in &layout.preload_camera_ids {
if let Some(cam) = cam_map.get(cam_id) {
if let Some((_, badge)) = cam.pick_stream(None, 0.0) {
out.insert((*cam_id, badge));
}
}
}
}
for bd in &displays {
// Find this display's active layout (if any) and harvest its cameras.
let active_id = active.iter().find(|(id, _)| *id == bd.id).and_then(|(_, l)| *l);
if let Some(cur_id) = active_id {
if let Some(layout) = bd.layouts.iter().find(|l| l.id == cur_id) {
for cell in &layout.cells {
if cell.content_type == "camera" {
if let Some(id) = cell.camera_id { warm_set.insert(id); }
}
}
for id in &layout.preload_camera_ids { warm_set.insert(*id); }
cell_keys(layout, &cam_map, &mut warm_set);
let t = layout.cooling_timeout_seconds.unwrap_or(0);
let t = if t == 0 { DEFAULT_COOLING_SECS } else { t };
max_cooling_secs = max_cooling_secs.max(t);
}
}
// Hot layouts keep their cameras warm even when not active.
for layout in &bd.layouts {
if layout.priority == "hot" {
for cell in &layout.cells {
if cell.content_type == "camera" {
if let Some(id) = cell.camera_id { hot_set.insert(id); }
}
}
for id in &layout.preload_camera_ids { hot_set.insert(*id); }
cell_keys(layout, &cam_map, &mut hot_set);
}
}
}
@ -743,36 +774,34 @@ fn recompute_global_state() {
/// Apply the hot/warm/cooling/cold state machine to the existing WARM_CAMERAS
/// pool. Does NOT create new entries — `ensure_warm` handles that.
///
/// - cam in hot_set → Hot (clear cooling)
/// - cam in warm_set → Warm (clear cooling)
/// - cam in neither & was Cooling → keep cooling_until unchanged
/// - cam in neither & not yet cooling → transition to Cooling
/// - key in hot_set → Hot (clear cooling)
/// - key in warm_set → Warm (clear cooling)
/// - key in neither & was Cooling → keep cooling_until unchanged
/// - key in neither & not yet cooling → transition to Cooling
/// - if max_cooling_secs == 0, remove immediately (Cold)
fn recompute_pool_states(
warm_set: &std::collections::HashSet<u32>,
hot_set: &std::collections::HashSet<u32>,
warm_set: &std::collections::HashSet<PoolKey>,
hot_set: &std::collections::HashSet<PoolKey>,
max_cooling_secs: u32,
) {
let mut to_remove: Vec<u32> = Vec::new();
let mut to_remove: Vec<PoolKey> = Vec::new();
let mut to_stop: Vec<gstreamer::Pipeline> = Vec::new();
WARM_CAMERAS.with(|w| {
let mut warm = w.borrow_mut();
for (cam_id, entry) in warm.iter_mut() {
if hot_set.contains(cam_id) {
for (key, entry) in warm.iter_mut() {
if hot_set.contains(key) {
entry.state = WarmthState::Hot;
entry.cooling_until = None;
} else if warm_set.contains(cam_id) {
} else if warm_set.contains(key) {
entry.state = WarmthState::Warm;
entry.cooling_until = None;
} else {
// Was hot/warm, no longer needed.
if entry.state == WarmthState::Cooling {
// Already cooling — leave cooling_until alone.
continue;
}
if max_cooling_secs == 0 {
to_remove.push(*cam_id);
to_remove.push(*key);
to_stop.push(entry.pipeline.clone());
} else {
entry.state = WarmthState::Cooling;
@ -780,12 +809,13 @@ fn recompute_pool_states(
Instant::now() + Duration::from_secs(max_cooling_secs as u64),
);
info!(
"camera {cam_id}: cooling for {max_cooling_secs}s before drop"
"camera {} ({}): cooling for {}s before drop",
key.0, key.1, max_cooling_secs
);
}
}
}
for id in &to_remove { warm.remove(id); }
for k in &to_remove { warm.remove(k); }
});
for pipe in to_stop {
@ -797,25 +827,25 @@ fn recompute_pool_states(
/// 1s watchdog tick.
fn expire_cooling_pipelines() {
let now = Instant::now();
let mut expired: Vec<(u32, gstreamer::Pipeline)> = Vec::new();
let mut expired: Vec<(PoolKey, gstreamer::Pipeline)> = Vec::new();
WARM_CAMERAS.with(|w| {
let mut warm = w.borrow_mut();
let ids: Vec<u32> = warm
let keys: Vec<PoolKey> = warm
.iter()
.filter(|(_, e)| {
e.state == WarmthState::Cooling
&& e.cooling_until.is_some_and(|t| now >= t)
})
.map(|(id, _)| *id)
.map(|(k, _)| *k)
.collect();
for id in ids {
if let Some(e) = warm.remove(&id) {
expired.push((id, e.pipeline));
for k in keys {
if let Some(e) = warm.remove(&k) {
expired.push((k, e.pipeline));
}
}
});
for (id, pipe) in expired {
info!("camera {id}: cooling expired → stopping pipeline");
for (key, pipe) in expired {
info!("camera {} ({}): cooling expired → stopping pipeline", key.0, key.1);
pipeline::stop(&pipe);
}
}
@ -847,9 +877,11 @@ fn should_attach_kiosk_auth(url: &str, server_url: &str) -> bool {
path.starts_with("/dash/") || path.starts_with("/in/kiosk/")
}
/// Returns (paintable, badge_char) for a camera, creating a warm pipeline if missing.
/// If cached pipeline's stream differs from what the cell needs (M↔S swap due
/// to layout change), tear down old and spin up new.
/// Returns (paintable, badge_char) for a camera, creating a warm pipeline if
/// the (cam, badge) variant isn't already in the pool. If the camera's OTHER
/// stream variant is cached (e.g. cell switched from main to sub), we leave
/// that sibling entry alone — recompute_pool_states will demote it to Cooling
/// so it can be reused if the cell flips back before the cooldown elapses.
fn ensure_warm(
cam_id: u32,
cam: &crate::bundle::BundleCamera,
@ -857,37 +889,32 @@ fn ensure_warm(
area_fraction: f32,
) -> Option<(gtk::gdk::Paintable, char)> {
let (uri, desired_badge) = cam.pick_stream(selector, area_fraction)?;
let key: PoolKey = (cam_id, desired_badge);
let cached = WARM_CAMERAS.with(|w| {
w.borrow().get(&cam_id).map(|e| (e.pipeline.clone(), e.paintable.clone(), e.badge))
w.borrow().get(&key).map(|e| (e.pipeline.clone(), e.paintable.clone()))
});
if let Some((pipe, paintable, badge)) = cached {
if badge == desired_badge {
// Promote out of Cooling if we're rendering it again.
WARM_CAMERAS.with(|w| {
if let Some(e) = w.borrow_mut().get_mut(&cam_id) {
if e.state == WarmthState::Cooling {
info!("camera {cam_id}: rescued from cooling → warm");
e.state = WarmthState::Warm;
e.cooling_until = None;
}
if let Some((_pipe, paintable)) = cached {
// Promote out of Cooling if we're rendering it again.
WARM_CAMERAS.with(|w| {
if let Some(e) = w.borrow_mut().get_mut(&key) {
if e.state == WarmthState::Cooling {
info!("camera {} ({}): rescued from cooling → warm", cam_id, desired_badge);
e.state = WarmthState::Warm;
e.cooling_until = None;
}
});
return Some((paintable, badge));
}
info!("camera {cam_id}: stream change {badge} → {desired_badge}, swapping");
pipeline::stop(&pipe);
WARM_CAMERAS.with(|w| { w.borrow_mut().remove(&cam_id); });
}
});
return Some((paintable, desired_badge));
}
let (pipe, sink) = pipeline::create_camera_pipeline(&cam.name, &uri)?;
let paintable = sink.property::<gtk::gdk::Paintable>("paintable");
pipeline::play(&pipe);
WARM_CAMERAS.with(|w| {
w.borrow_mut().insert(cam_id, PipelineEntry {
w.borrow_mut().insert(key, PipelineEntry {
pipeline: pipe,
paintable: paintable.clone(),
badge: desired_badge,
state: WarmthState::Warm,
cooling_until: None,
});