feat(preview): pull entity snapshot from active kiosk first

When admin opens an entity preview, find a kiosk whose active layout
references the camera (new repo.listKiosksRenderingCamera). Probe each
candidate's LAN snapshot endpoint with a 4s timeout. On success, stream
the bytes back with x-bf-snapshot-source: kiosk:<id>. Falls through to
the existing server-direct ffmpeg/gst pull only when no kiosk is reachable
or has the camera in its active layout.

Kiosk side adds /local/snapshot/:camera_id?key=<local_key>. Spawns a
one-shot gst-launch (rtspsrc → decodebin → jpegenc ! filesink
num-buffers=1) on a blocking worker so axum's reactor stays free.
Prefers sub stream for snapshots to keep bandwidth low. Single-frame
pipeline tears down after the first JPEG.

LAN IP picking extracted to shared/kiosk-lan.ts so route handler and
KioskLocalPanel agree on which interface to talk to (the previously-
duplicated logic in admin-pages stays for now since it also renders the
interface list).

Why a parallel pipeline instead of teeing the warm one: cross-thread
gtk4paintablesink → appsink sample extraction is non-trivial. A 1-frame
parallel pull is cheap when the kiosk's RTSP session to that camera is
already known to work (precondition: it's in the active layout).
This commit is contained in:
Mitchell R 2026-05-21 10:35:27 +02:00
parent 88bbe040e5
commit 334ee8fb93
No known key found for this signature in database
4 changed files with 237 additions and 4 deletions

View file

@ -79,6 +79,7 @@ pub fn start(state: LocalServerState) {
let app = Router::new()
.route("/local/info", get(local_info_handler))
.route("/local/layout/:id", get(local_layout_handler))
.route("/local/snapshot/:camera_id", get(local_snapshot_handler))
.route("/proxy/*path", any(proxy_handler))
.with_state(state);
@ -135,6 +136,109 @@ async fn local_layout_handler(
(StatusCode::NO_CONTENT, "").into_response()
}
/// One-shot JPEG snapshot of `camera_id` from THIS kiosk. Resolves the
/// camera's RTSP URI from the on-disk cached bundle (written by
/// server::save_bundle), then spawns a one-off gstreamer pipeline:
///
/// rtspsrc → decodebin → videoconvert → jpegenc ! filesink
///
/// Identical pattern to the server's fallback path, just running on the
/// kiosk so the admin preview hits the device closest to the camera.
/// Server-side caller selects this when a kiosk already has the camera
/// in its active layout — the assumption is the kiosk's RTSP session
/// already works, so a parallel one-frame pull is cheap. We do NOT
/// reuse the warm GTK4 paintable pipeline because cross-thread paintable
/// access + sample extraction would need significant rework; this is
/// "good enough" and isolated.
async fn local_snapshot_handler(
State(state): State<LocalServerState>,
Path(camera_id): Path<u32>,
Query(auth): Query<LocalAuth>,
) -> Response {
if !constant_time_eq(&auth.key, &state.local_key) {
return (StatusCode::UNAUTHORIZED, "bad key").into_response();
}
let Some(bundle) = crate::server::load_cached_bundle() else {
return (StatusCode::SERVICE_UNAVAILABLE, "no bundle cached yet").into_response();
};
let Some(cam) = bundle.cameras.iter().find(|c| c.id == camera_id) else {
return (StatusCode::NOT_FOUND, "camera not in bundle").into_response();
};
// Use sub stream when present (lower-bandwidth snapshot), else main.
let Some((uri, _)) = cam.pick_stream(Some("sub"), 0.0)
.or_else(|| cam.pick_stream(Some("main"), 1.0)) else {
return (StatusCode::NOT_FOUND, "no stream for camera").into_response();
};
// Blocking gst-launch on a worker thread so we don't block axum's reactor.
let jpeg = tokio::task::spawn_blocking(move || capture_jpeg_blocking(&uri)).await;
match jpeg {
Ok(Ok(bytes)) => Response::builder()
.status(StatusCode::OK)
.header("content-type", "image/jpeg")
.header("cache-control", "no-store")
.body(Body::from(bytes))
.unwrap_or_else(|_| (StatusCode::INTERNAL_SERVER_ERROR, "build").into_response()),
Ok(Err(e)) => {
warn!("local-server: snapshot for cam {camera_id} failed: {e}");
(StatusCode::BAD_GATEWAY, format!("snapshot failed: {e}")).into_response()
}
Err(e) => {
warn!("local-server: snapshot task join failed: {e}");
(StatusCode::INTERNAL_SERVER_ERROR, "task error").into_response()
}
}
}
fn capture_jpeg_blocking(rtsp_uri: &str) -> Result<Vec<u8>, String> {
use std::process::Command;
let tmp = std::env::temp_dir().join(format!(
"bf-snap-{}.jpg",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
));
// 5s ceiling: rtspsrc handshake + a couple of decoded frames. jpegenc
// emits one JPEG, filesink writes it. num-buffers=1 on filesink stops
// the pipeline after the first sample so we don't dangle.
let status = Command::new("gst-launch-1.0")
.args([
"-q",
"rtspsrc",
&format!("location={rtsp_uri}"),
"latency=200",
"protocols=tcp",
"!",
"decodebin",
"!",
"videoconvert",
"!",
"jpegenc",
"!",
"filesink",
"num-buffers=1",
&format!("location={}", tmp.display()),
])
.stderr(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.status()
.map_err(|e| format!("gst-launch-1.0 spawn: {e}"))?;
let result = if status.success() {
std::fs::read(&tmp).map_err(|e| format!("read snapshot: {e}"))
} else {
Err(format!("gst-launch-1.0 exit {status:?}"))
};
let _ = std::fs::remove_file(&tmp);
result.and_then(|bytes| {
if bytes.is_empty() {
Err("snapshot file empty".to_string())
} else {
Ok(bytes)
}
})
}
/// Forward any request under /proxy/* to the BF server. Method, query
/// string, body, and Authorization header are preserved. Kiosk adds NO auth
/// — caller must supply their own admin API key (Bearer) which server-side

View file

@ -41,6 +41,7 @@ import { captureSnapshot } from "../../shared/snapshot.js";
import { stripSecrets } from "../../shared/strip-secrets.js";
import { audit } from "../../shared/audit.js";
import { createBackup, restoreBackup } from "../../shared/backup.js";
import { pickKioskLanIp } from "../../shared/kiosk-lan.js";
interface DiscoverAddStream {
profile_name: string;
@ -700,17 +701,53 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void {
return new Response(null, { status: 302, headers: { location: "/admin/entities" } });
});
// Camera snapshot — pulls one frame from the entity's main stream and
// returns it as JPEG. Used by the EntityEditPage "Test" preview.
// Camera snapshot — prefer a kiosk already rendering this camera so we don't
// double the RTSP load on the source. Fall back to server-direct only when
// no kiosk currently has the camera in its active layout (or every kiosk
// attempt times out). Used by the EntityEditPage "Test" preview.
app.get("/admin/entities/:id/snapshot", async (event) => {
const id = Number(getRouterParam(event, "id"));
const ent = deps.repo.getEntityById(id);
if (!ent || ent.type !== "camera" || ent.camera_id == null) {
return new Response("Not a camera entity", { status: 404 });
}
const streams = deps.repo.listCameraStreams(ent.camera_id);
const cameraId = ent.camera_id;
// 1. Try kiosks currently rendering this camera. listKiosksRenderingCamera
// returns kiosks whose active_layout_id has at least one layout_cell
// pointing at cameraId. Filter to ones we can actually reach.
const candidates = deps.repo.listKiosksRenderingCamera(cameraId);
const STALE_MS = 2 * 60 * 1000; // kiosk silent > 2 min → don't bother
const now = Date.now();
for (const k of candidates) {
if (!k.local_port || !k.local_key) continue;
if (k.last_seen_at && now - new Date(k.last_seen_at).getTime() > STALE_MS) continue;
const ip = pickKioskLanIp(k);
if (!ip) continue;
const url = `http://${ip}:${String(k.local_port)}/local/snapshot/${String(cameraId)}?key=${encodeURIComponent(k.local_key)}`;
try {
const res = await fetch(url, { signal: AbortSignal.timeout(4000) });
if (res.ok) {
const bytes = new Uint8Array(await res.arrayBuffer());
return new Response(bytes, {
status: 200,
headers: {
"content-type": res.headers.get("content-type") ?? "image/jpeg",
"cache-control": "no-store",
"x-bf-snapshot-source": `kiosk:${String(k.id)}`,
},
});
}
} catch {
// Network error / timeout — try next kiosk.
}
}
// 2. Fall back to server-direct RTSP pull (ffmpeg/gst).
const streams = deps.repo.listCameraStreams(cameraId);
const main = streams.find((s) => s.role === "main") ?? streams[0];
const cam = deps.repo.getCameraById(ent.camera_id);
const cam = deps.repo.getCameraById(cameraId);
const rtsp = main?.rtsp_uri ?? cam?.rtsp_url ?? null;
if (!rtsp) return new Response("No RTSP URL", { status: 404 });
@ -723,6 +760,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void {
headers: {
"content-type": "image/jpeg",
"cache-control": "no-store",
"x-bf-snapshot-source": "server",
},
});
});

View file

@ -413,6 +413,28 @@ export class Repository {
return rs.map((r) => rowToDisplay(r as Record<string, unknown>));
}
/**
* Kiosks currently rendering this camera. Join chain:
* displays.active_layout_id == layouts.id
* layout_cells.layout_id == layouts.id
* layout_cells.camera_id == ?
* kiosks via displays.kiosk_id
* Active layout is set by kiosk's layout.changed event. Stale ones may
* still appear here caller filters by last_seen_at + local_port.
*/
listKiosksRenderingCamera(cameraId: number): Kiosk[] {
const rs = this.prep(
`SELECT DISTINCT k.*
FROM kiosks k
JOIN displays d ON d.kiosk_id = k.id
JOIN layout_cells lc ON lc.layout_id = d.active_layout_id
WHERE lc.camera_id = ?
AND d.active_layout_id IS NOT NULL
AND k.enabled = 1`,
).all(cameraId);
return rs.map((r) => rowToKiosk(r as Record<string, unknown>));
}
private nextDisplayIndexForKiosk(kioskId: number): number {
const r = this.prep('SELECT MAX("index") AS m FROM displays WHERE kiosk_id = ?').get(kioskId) as { m: number | null } | undefined;
return (r?.m ?? -1) + 1;

View file

@ -0,0 +1,69 @@
/**
* Pick the kiosk's preferred LAN IP for direct HTTP reach.
*
* Behind Docker/Angie the heartbeat source IP we see (kiosk.local_last_ip)
* is the proxy/container bridge (e.g. 172.31.0.2), not the kiosk's real LAN
* address. Kiosks report all their interfaces via the heartbeat
* (network_interfaces_json from `ip -j addr show`). Prefer the first
* non-loopback / non-link-local IP from that list; fall back to the
* heartbeat source only when we have nothing reported.
*/
import type { Kiosk } from "./types.js";
interface ReportedInterface {
name: string;
mac?: string | null;
operstate?: string | null;
ips: string[];
}
function ipWithoutCidr(ip: string): string {
return ip.includes("/") ? ip.slice(0, ip.indexOf("/")) : ip;
}
function isUsableLanIp(ip: string): boolean {
const bare = ipWithoutCidr(ip);
return bare !== "::1"
&& !bare.startsWith("127.")
&& !bare.startsWith("169.254.")
&& !bare.startsWith("fe80:");
}
function parseInterfaces(raw: string | null): ReportedInterface[] {
if (!raw) return [];
try {
const parsed = JSON.parse(raw);
if (!Array.isArray(parsed)) return [];
return parsed
.map((item) => ({
name: typeof item?.name === "string" ? item.name : "unknown",
operstate: typeof item?.operstate === "string" ? item.operstate : null,
ips: Array.isArray(item?.ips)
? item.ips.filter((ip: unknown) => typeof ip === "string")
: [],
}))
.filter((item) => item.ips.length > 0);
} catch {
return [];
}
}
/** Returns a bare IP (no CIDR) suitable for `http://<ip>:<port>/...`. */
export function pickKioskLanIp(kiosk: Kiosk): string | null {
const ifaces = parseInterfaces(kiosk.network_interfaces_json);
// Prefer interfaces marked UP, then any with usable IPs.
const sorted = [...ifaces].sort((a, b) => {
const aUp = a.operstate?.toLowerCase() === "up" ? 0 : 1;
const bUp = b.operstate?.toLowerCase() === "up" ? 0 : 1;
return aUp - bUp;
});
for (const iface of sorted) {
for (const ip of iface.ips) {
if (isUsableLanIp(ip)) return ipWithoutCidr(ip);
}
}
if (kiosk.local_last_ip && isUsableLanIp(kiosk.local_last_ip)) {
return ipWithoutCidr(kiosk.local_last_ip);
}
return null;
}