feat: live updates via WebSocket — server pushes, kiosk reloads

Server side:
- service-coordinator-ws: full WS implementation using ws package
- Auth via ?token=<kiosk_key> query param
- Coordinator registry for cross-plugin notification
- Admin mutations call notifyKiosks() → server pushes reload-bundle
- 30s ping/pong heartbeat

Kiosk side:
- Rust ws_client with tokio runtime + tokio-tungstenite
- Auto-reconnect with exponential backoff (1s → 60s cap)
- On reload-bundle: re-fetches bundle, re-renders layout
- Pong replies to server pings

Also fix: auto-suffix kiosk name on UNIQUE collision (re-pair with
same hostname no longer fails).
This commit is contained in:
Mitchell R 2026-05-10 22:15:58 +02:00
parent 2398be6853
commit 16ab165b06
10 changed files with 372 additions and 21 deletions

View file

@ -29,3 +29,6 @@ dirs = "6"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
hostname = "0.4"
tokio-tungstenite = { version = "0.24", features = ["native-tls"] }
futures-util = "0.3"
url = "2"

View file

@ -2,6 +2,11 @@ mod server;
mod bundle;
mod pipeline;
mod ui;
mod ws_client;
pub enum ServerMsg {
ReloadBundle,
}
use gtk4::prelude::{ApplicationExt, ApplicationExtManual};
use gstreamer::prelude::PluginFeatureExtManual;

View file

@ -9,6 +9,8 @@ use tracing::{info, warn};
use crate::bundle::KioskBundle;
use crate::pipeline;
use crate::server;
use crate::ws_client;
use crate::ServerMsg;
const APP_ID: &str = "dev.betterframe.kiosk";
@ -61,6 +63,32 @@ fn activate(app: &Application) {
info!("bundle: {} cameras, {} layouts", bundle.cameras.len(), bundle.layouts.len());
let _ = tx.send(WorkerMsg::RenderBundle(bundle));
// Spawn WS client in a separate thread for live updates
let server_ws = server.clone();
let key_ws = key.clone();
let (ws_tx, ws_rx) = mpsc::channel::<ServerMsg>();
let tx_for_reload = tx.clone();
let server_for_reload = server.clone();
let key_for_reload = key.clone();
std::thread::spawn(move || {
ws_client::run(&server_ws, &key_ws, ws_tx);
});
// Listen for WS messages and re-fetch bundle on reload
std::thread::spawn(move || {
for msg in ws_rx {
match msg {
ServerMsg::ReloadBundle => {
info!("reloading bundle");
let bundle = server::fetch_bundle(&server_for_reload, &key_for_reload);
let _ = tx_for_reload.send(WorkerMsg::RenderBundle(bundle));
}
}
}
});
// Heartbeat loop
loop {
std::thread::sleep(std::time::Duration::from_secs(60));
server::heartbeat(&server, &key);

106
kiosk/src/ws_client.rs Normal file
View file

@ -0,0 +1,106 @@
use std::sync::mpsc::Sender;
use std::time::Duration;
use futures_util::{SinkExt, StreamExt};
use tokio_tungstenite::{connect_async, tungstenite::Message};
use tracing::{info, warn};
use crate::ServerMsg;
/// Run the WebSocket client in a tokio runtime. Blocks the calling thread.
/// Reconnects on disconnect with exponential backoff.
pub fn run(server_url: &str, kiosk_key: &str, tx: Sender<ServerMsg>) {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
warn!("ws: failed to build runtime: {e}");
return;
}
};
let ws_url = build_ws_url(server_url, kiosk_key);
info!("ws: connecting to {ws_url}");
rt.block_on(async {
let mut backoff = 1u64;
loop {
match connect_async(&ws_url).await {
Ok((mut ws, _resp)) => {
info!("ws: connected");
backoff = 1;
while let Some(msg) = ws.next().await {
match msg {
Ok(Message::Text(text)) => {
if text.contains("\"type\":\"ping\"") {
let _ = ws.send(Message::Text(r#"{"type":"pong"}"#.to_string())).await;
} else if text.contains("\"type\":\"reload-bundle\"") {
info!("ws: reload-bundle received");
let _ = tx.send(ServerMsg::ReloadBundle);
} else {
info!("ws: msg: {text}");
}
}
Ok(Message::Close(_)) => {
info!("ws: server closed connection");
break;
}
Err(e) => {
warn!("ws: error: {e}");
break;
}
_ => {}
}
}
}
Err(e) => {
warn!("ws: connect failed: {e}");
}
}
info!("ws: reconnecting in {backoff}s");
tokio::time::sleep(Duration::from_secs(backoff)).await;
backoff = (backoff * 2).min(60);
}
});
}
fn build_ws_url(http_url: &str, token: &str) -> String {
// Replace http:// → ws://, https:// → wss://. Strip any trailing path.
let base = if let Some(rest) = http_url.strip_prefix("https://") {
format!("wss://{}", rest.split('/').next().unwrap_or(rest))
} else if let Some(rest) = http_url.strip_prefix("http://") {
format!("ws://{}", rest.split('/').next().unwrap_or(rest))
} else {
format!("ws://{http_url}")
};
// coordinator-ws runs on a different port (18082 vs api-http on 18081)
let base_port = base.rsplit(':').next().unwrap_or("");
let base = if base_port == "18081" {
base.replace(":18081", ":18082")
} else if !base.contains(':') {
format!("{base}:18082")
} else {
base
};
format!("{base}/ws/kiosk?token={}", urlencoding(token))
}
fn urlencoding(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for ch in s.chars() {
if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.' | '~') {
out.push(ch);
} else {
for b in ch.to_string().bytes() {
out.push_str(&format!("%{b:02X}"));
}
}
}
out
}

36
package-lock.json generated
View file

@ -524,12 +524,20 @@
"version": "25.6.2",
"resolved": "https://registry.npmjs.org/@types/node/-/node-25.6.2.tgz",
"integrity": "sha512-sokuT28dxf9JT5Kady1fsXOvI4HVpjZa95NKT5y9PNTIrs2AsobR4GFAA90ZG8M+nxVRLysCXsVj6eGC7Vbrlw==",
"dev": true,
"license": "MIT",
"dependencies": {
"undici-types": "~7.19.0"
}
},
"node_modules/@types/ws": {
"version": "8.18.1",
"resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.18.1.tgz",
"integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==",
"license": "MIT",
"dependencies": {
"@types/node": "*"
}
},
"node_modules/argon2": {
"version": "0.44.0",
"resolved": "https://registry.npmjs.org/argon2/-/argon2-0.44.0.tgz",
@ -851,7 +859,6 @@
"version": "7.19.2",
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-7.19.2.tgz",
"integrity": "sha512-qYVnV5OEm2AW8cJMCpdV20CDyaN3g0AjDlOGf1OW4iaDEx8MwdtChUp4zu4H0VP3nDRF/8RKWH+IPp9uW0YGZg==",
"dev": true,
"license": "MIT"
},
"node_modules/uuid": {
@ -882,6 +889,27 @@
"node": ">= 8"
}
},
"node_modules/ws": {
"version": "8.20.0",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.20.0.tgz",
"integrity": "sha512-sAt8BhgNbzCtgGbt2OxmpuryO63ZoDk/sqaB/znQm94T4fCEsy/yV+7CdC1kJhOU9lboAEU7R3kquuycDoibVA==",
"license": "MIT",
"engines": {
"node": ">=10.0.0"
},
"peerDependencies": {
"bufferutil": "^4.0.1",
"utf-8-validate": ">=5.0.2"
},
"peerDependenciesMeta": {
"bufferutil": {
"optional": true
},
"utf-8-validate": {
"optional": true
}
}
},
"node_modules/yaml": {
"version": "2.8.4",
"resolved": "https://registry.npmjs.org/yaml/-/yaml-2.8.4.tgz",
@ -904,10 +932,12 @@
"dependencies": {
"@anyvali/js": "^0.2.0",
"@bsb/base": "^9.1.11",
"@types/ws": "^8.18.1",
"argon2": "^0.44.0",
"h3": "^2.0.1-rc.22",
"jsx-htmx": "^2.0.2",
"otpauth": "^9.5.1"
"otpauth": "^9.5.1",
"ws": "^8.20.0"
},
"devDependencies": {
"@types/node": "^25.0.0",

View file

@ -23,10 +23,12 @@
"dependencies": {
"@anyvali/js": "^0.2.0",
"@bsb/base": "^9.1.11",
"@types/ws": "^8.18.1",
"argon2": "^0.44.0",
"h3": "^2.0.1-rc.22",
"jsx-htmx": "^2.0.2",
"otpauth": "^9.5.1"
"otpauth": "^9.5.1",
"ws": "^8.20.0"
},
"devDependencies": {
"@types/node": "^25.0.0",

View file

@ -5,6 +5,7 @@ import { type H3, readBody, getRouterParam, getQuery } from "h3";
import { htmlPage } from "./html-response.js";
import type { AdminDeps } from "./index.js";
import { confirmPairing } from "../../shared/pairing.js";
import { getCoordinator } from "../../shared/coordinator-registry.js";
import {
OverviewPage,
CamerasPage,
@ -20,6 +21,10 @@ import {
DisplayEditPage,
} from "../../web-templates/admin-pages.js";
function notifyKiosks(): void {
try { getCoordinator().notifyBundleChanged(); } catch { /* ignore */ }
}
function sanitizeRtspUrl(raw: string): string {
const match = raw.match(/^(rtsp:\/\/)([^@]+)@(.+)$/);
if (!match) return raw;
@ -275,6 +280,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void {
cooling_timeout_seconds: coolingTimeout,
resets_idle_timer: body?.["resets_idle_timer"] === "1",
});
notifyKiosks();
return new Response(null, { status: 302, headers: { location: `/admin/layouts/${id}` } });
});
@ -343,6 +349,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void {
content_type: "html",
html_content: null,
});
notifyKiosks();
return new Response(null, { status: 302, headers: { location: `/admin/layouts/${layoutId}` } });
});
@ -363,6 +370,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void {
web_url: contentType === "web" ? (body?.["web_url"] ?? null) : null,
html_content: contentType === "html" ? (body?.["html_content"] ?? null) : null,
});
notifyKiosks();
return new Response(null, { status: 302, headers: { location: `/admin/layouts/${layoutId}` } });
});
@ -371,12 +379,14 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void {
const layoutId = Number(getRouterParam(event, "id"));
const cellId = Number(getRouterParam(event, "cellId"));
deps.repo.deleteLayoutCell(cellId);
notifyKiosks();
return new Response(null, { status: 302, headers: { location: `/admin/layouts/${layoutId}` } });
});
app.post("/admin/layouts/:id/delete", (event) => {
const id = Number(getRouterParam(event, "id"));
deps.repo.deleteLayout(id);
notifyKiosks();
return new Response(null, { status: 302, headers: { location: "/admin/layouts" } });
});
@ -429,6 +439,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void {
idle_timeout_seconds: parseInt(body?.["idle_timeout_seconds"] ?? "0", 10),
sleep_timeout_seconds: parseInt(body?.["sleep_timeout_seconds"] ?? "0", 10),
} as any);
notifyKiosks();
return new Response(null, { status: 302, headers: { location: `/admin/displays/${id}` } });
});
@ -439,6 +450,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void {
const layoutId = body?.["layout_id"] ? Number(body["layout_id"]) : null;
if (layoutId && Number.isFinite(layoutId)) {
deps.repo.attachLayoutToDisplay(displayId, layoutId);
notifyKiosks();
}
return new Response(null, { status: 302, headers: { location: `/admin/displays/${displayId}` } });
});
@ -448,6 +460,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void {
const displayId = Number(getRouterParam(event, "id"));
const layoutId = Number(getRouterParam(event, "layoutId"));
deps.repo.detachLayoutFromDisplay(displayId, layoutId);
notifyKiosks();
return new Response(null, { status: 302, headers: { location: `/admin/displays/${displayId}` } });
});
@ -543,6 +556,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void {
deps.repo.updateCameraStream(mainStream.id, { rtsp_uri: rtspUrl });
}
}
notifyKiosks();
return new Response(null, { status: 302, headers: { location: `/admin/cameras/${id}` } });
});
@ -574,6 +588,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void {
app.post("/admin/cameras/:id/delete", (event) => {
const id = Number(getRouterParam(event, "id"));
deps.repo.deleteCamera(id);
notifyKiosks();
return new Response(null, { status: 302, headers: { location: "/admin/cameras" } });
});

View file

@ -1,11 +1,15 @@
/**
* service-coordinator-ws WebSocket hub for live kiosk channel.
*
* Uses raw Node.js WebSocket server (ws package via h3's optional crossws).
* For v0.1, uses a standalone HTTP server + ws upgrade.
*
* Kiosks connect with ?token=<kiosk_key>. Server pushes:
* - layout-switch, power, reload-bundle, ping
* - reload-bundle: kiosk should re-fetch bundle
* - layout-switch: change active layout (future)
* - power: CEC commands (future)
* - ping: keepalive
*
* Kiosks send:
* - pong: keepalive reply
* - status: current state
*/
import * as av from "@anyvali/js";
import {
@ -15,13 +19,13 @@ import {
createEventSchemas,
type Observable,
} from "@bsb/base";
import { createServer } from "node:http";
import type { IncomingMessage } from "node:http";
import type { Duplex } from "node:stream";
import { createServer, type IncomingMessage, type Server as HttpServer } from "node:http";
import { WebSocketServer, WebSocket } from "ws";
import { getRepo } from "../../shared/plugin-registry.js";
import { initSecrets } from "../../shared/secrets.js";
import { createAuth, type AuthApi } from "../../shared/auth.js";
import { createAuth } from "../../shared/auth.js";
import { setCoordinator } from "../../shared/coordinator-registry.js";
// ---- Config -----------------------------------------------------------------
@ -62,6 +66,30 @@ export const EventSchemas = createEventSchemas({
onBroadcast: {},
});
// ---- Connected kiosks -------------------------------------------------------
interface ConnectedKiosk {
id: number;
name: string;
ws: WebSocket;
}
const connectedKiosks = new Map<number, ConnectedKiosk>();
function sendToKiosk(kioskId: number, message: object): boolean {
const k = connectedKiosks.get(kioskId);
if (!k || k.ws.readyState !== WebSocket.OPEN) return false;
k.ws.send(JSON.stringify(message));
return true;
}
function broadcastAll(message: object): void {
const payload = JSON.stringify(message);
for (const k of connectedKiosks.values()) {
if (k.ws.readyState === WebSocket.OPEN) k.ws.send(payload);
}
}
// ---- Plugin -----------------------------------------------------------------
export class Plugin extends BSBService<InstanceType<typeof Config>, typeof EventSchemas> {
@ -73,7 +101,8 @@ export class Plugin extends BSBService<InstanceType<typeof Config>, typeof Event
runBeforePlugins?: string[];
runAfterPlugins?: string[];
private httpServer?: ReturnType<typeof createServer>;
private httpServer?: HttpServer;
private wss?: WebSocketServer;
private pingInterval?: ReturnType<typeof setInterval>;
constructor(cfg: BSBServiceConstructor<InstanceType<typeof Config>, typeof EventSchemas>) {
@ -81,34 +110,131 @@ export class Plugin extends BSBService<InstanceType<typeof Config>, typeof Event
}
async init(obs: Observable): Promise<void> {
// Placeholder — full WS implementation requires 'ws' package or crossws.
// For now, start a basic HTTP server that responds to health checks.
// WS upgrade will be added when crossws or ws is installed.
const server = createServer((req, res) => {
const repo = getRepo();
const secrets = initSecrets(
{ dataDir: this.config.dataDir },
{ info: (m) => obs.log.info(m as any, {}), warn: (m) => obs.log.warn(m as any, {}) },
);
const auth = createAuth(repo, secrets, {
sessionIdleSeconds: this.config.sessionIdleSeconds,
sessionMaxSeconds: this.config.sessionMaxSeconds,
loginLockoutThreshold: this.config.loginLockoutThreshold,
loginLockoutSeconds: this.config.loginLockoutSeconds,
argon2Memory: this.config.argon2Memory,
argon2TimeCost: this.config.argon2TimeCost,
argon2Parallelism: this.config.argon2Parallelism,
totpIssuer: this.config.totpIssuer,
cookieName: this.config.cookieName,
});
const httpServer = createServer((req, res) => {
if (req.url === "/healthz") {
res.writeHead(200, { "content-type": "application/json" });
res.end(JSON.stringify({ status: "ok" }));
res.end(JSON.stringify({ status: "ok", connected_kiosks: connectedKiosks.size }));
return;
}
res.writeHead(404);
res.end();
});
server.listen(this.config.port, this.config.host, () => {
const wss = new WebSocketServer({ noServer: true });
httpServer.on("upgrade", async (req: IncomingMessage, socket, head) => {
const url = new URL(req.url ?? "/", `http://${req.headers.host}`);
if (url.pathname !== "/ws/kiosk") {
socket.write("HTTP/1.1 404 Not Found\r\n\r\n");
socket.destroy();
return;
}
const token = url.searchParams.get("token");
if (!token) {
socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n");
socket.destroy();
return;
}
try {
const kiosk = await auth.verifyKioskKey(token);
if (!kiosk) {
socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n");
socket.destroy();
return;
}
const kioskData = repo.getKioskById(kiosk.id);
if (!kioskData) {
socket.write("HTTP/1.1 404 Not Found\r\n\r\n");
socket.destroy();
return;
}
wss.handleUpgrade(req, socket, head, (ws) => {
connectedKiosks.set(kiosk.id, { id: kiosk.id, name: kioskData.name, ws });
obs.log.info("kiosk connected: {name}", { name: kioskData.name });
ws.send(JSON.stringify({ type: "connected", kiosk_id: kiosk.id }));
ws.on("message", (data) => {
try {
const msg = JSON.parse(data.toString());
if (msg.type === "pong") return;
if (msg.type === "status") {
obs.log.info("kiosk status: {data}", { data: data.toString() });
}
} catch {
// ignore malformed
}
});
ws.on("close", () => {
connectedKiosks.delete(kiosk.id);
obs.log.info("kiosk disconnected: {name}", { name: kioskData.name });
});
});
} catch (err) {
obs.log.warn("ws upgrade error: {err}", { err: (err as Error).message });
socket.destroy();
}
});
httpServer.listen(this.config.port, this.config.host, () => {
obs.log.info("coordinator-ws listening on {host}:{port}", {
host: this.config.host,
port: this.config.port,
});
});
this.httpServer = server;
// Register coordinator API for other plugins to use
setCoordinator({
sendToKiosk,
broadcastAll,
notifyBundleChanged: () => broadcastAll({ type: "reload-bundle" }),
notifyKioskBundleChanged: (kioskId: number) =>
sendToKiosk(kioskId, { type: "reload-bundle" }),
});
this.httpServer = httpServer;
this.wss = wss;
// Ping connected kiosks every 30s
this.pingInterval = setInterval(() => {
const payload = JSON.stringify({ type: "ping", t: Date.now() });
for (const k of connectedKiosks.values()) {
try {
if (k.ws.readyState === WebSocket.OPEN) k.ws.send(payload);
} catch {
// ignore
}
}
}, 30_000);
}
async run(_obs: Observable): Promise<void> {}
async dispose(): Promise<void> {
if (this.pingInterval) clearInterval(this.pingInterval);
for (const k of connectedKiosks.values()) {
try { k.ws.close(); } catch { /* ignore */ }
}
connectedKiosks.clear();
return new Promise((resolve) => {
if (this.wss) this.wss.close();
if (this.httpServer) {
this.httpServer.close(() => resolve());
} else {

View file

@ -0,0 +1,27 @@
/**
* Coordinator registry admin-http calls these to notify kiosks of changes.
* service-coordinator-ws sets the implementation in its init().
*/
export interface CoordinatorApi {
sendToKiosk(kioskId: number, message: object): boolean;
broadcastAll(message: object): void;
notifyBundleChanged(): void;
notifyKioskBundleChanged(kioskId: number): void;
}
const noop: CoordinatorApi = {
sendToKiosk: () => false,
broadcastAll: () => {},
notifyBundleChanged: () => {},
notifyKioskBundleChanged: () => {},
};
let _coordinator: CoordinatorApi = noop;
export function setCoordinator(c: CoordinatorApi): void {
_coordinator = c;
}
export function getCoordinator(): CoordinatorApi {
return _coordinator;
}

View file

@ -118,7 +118,16 @@ export async function confirmPairing(
if (pc.consumed_at) throw new Error("pairing code already used");
if (new Date(pc.expires_at) < new Date()) throw new Error("pairing code expired");
const kioskName = input.nameOverride || pc.kiosk_proposed_name || `kiosk-${input.code.toLowerCase()}`;
const baseName = input.nameOverride || pc.kiosk_proposed_name || `kiosk-${input.code.toLowerCase()}`;
// Auto-suffix if name collides (kiosks.name is UNIQUE)
let kioskName = baseName;
let suffix = 2;
while (repo.getKioskByName(kioskName)) {
kioskName = `${baseName}-${suffix}`;
suffix++;
if (suffix > 100) throw new Error("could not generate unique kiosk name");
}
const kioskKeyPlaintext = `bf-${randomBytes(24).toString("base64url")}`;
const kioskKeyHash = await auth.hashPassword(kioskKeyPlaintext);
const kioskKeyPrefix = kioskKeyPlaintext.slice(0, 8);