diff --git a/kiosk/Cargo.toml b/kiosk/Cargo.toml index cfec0fa..026a588 100644 --- a/kiosk/Cargo.toml +++ b/kiosk/Cargo.toml @@ -29,3 +29,6 @@ dirs = "6" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } hostname = "0.4" +tokio-tungstenite = { version = "0.24", features = ["native-tls"] } +futures-util = "0.3" +url = "2" diff --git a/kiosk/src/main.rs b/kiosk/src/main.rs index a489b3c..e098181 100644 --- a/kiosk/src/main.rs +++ b/kiosk/src/main.rs @@ -2,6 +2,11 @@ mod server; mod bundle; mod pipeline; mod ui; +mod ws_client; + +pub enum ServerMsg { + ReloadBundle, +} use gtk4::prelude::{ApplicationExt, ApplicationExtManual}; use gstreamer::prelude::PluginFeatureExtManual; diff --git a/kiosk/src/ui.rs b/kiosk/src/ui.rs index a8e3767..cc4d0a2 100644 --- a/kiosk/src/ui.rs +++ b/kiosk/src/ui.rs @@ -9,6 +9,8 @@ use tracing::{info, warn}; use crate::bundle::KioskBundle; use crate::pipeline; use crate::server; +use crate::ws_client; +use crate::ServerMsg; const APP_ID: &str = "dev.betterframe.kiosk"; @@ -61,6 +63,32 @@ fn activate(app: &Application) { info!("bundle: {} cameras, {} layouts", bundle.cameras.len(), bundle.layouts.len()); let _ = tx.send(WorkerMsg::RenderBundle(bundle)); + // Spawn WS client in a separate thread for live updates + let server_ws = server.clone(); + let key_ws = key.clone(); + let (ws_tx, ws_rx) = mpsc::channel::(); + let tx_for_reload = tx.clone(); + let server_for_reload = server.clone(); + let key_for_reload = key.clone(); + + std::thread::spawn(move || { + ws_client::run(&server_ws, &key_ws, ws_tx); + }); + + // Listen for WS messages and re-fetch bundle on reload + std::thread::spawn(move || { + for msg in ws_rx { + match msg { + ServerMsg::ReloadBundle => { + info!("reloading bundle"); + let bundle = server::fetch_bundle(&server_for_reload, &key_for_reload); + let _ = tx_for_reload.send(WorkerMsg::RenderBundle(bundle)); + } + } + } + }); + + // Heartbeat loop loop { std::thread::sleep(std::time::Duration::from_secs(60)); server::heartbeat(&server, &key); diff --git a/kiosk/src/ws_client.rs b/kiosk/src/ws_client.rs new file mode 100644 index 0000000..627c50f --- /dev/null +++ b/kiosk/src/ws_client.rs @@ -0,0 +1,106 @@ +use std::sync::mpsc::Sender; +use std::time::Duration; + +use futures_util::{SinkExt, StreamExt}; +use tokio_tungstenite::{connect_async, tungstenite::Message}; +use tracing::{info, warn}; + +use crate::ServerMsg; + +/// Run the WebSocket client in a tokio runtime. Blocks the calling thread. +/// Reconnects on disconnect with exponential backoff. +pub fn run(server_url: &str, kiosk_key: &str, tx: Sender) { + let rt = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(e) => { + warn!("ws: failed to build runtime: {e}"); + return; + } + }; + + let ws_url = build_ws_url(server_url, kiosk_key); + info!("ws: connecting to {ws_url}"); + + rt.block_on(async { + let mut backoff = 1u64; + loop { + match connect_async(&ws_url).await { + Ok((mut ws, _resp)) => { + info!("ws: connected"); + backoff = 1; + + while let Some(msg) = ws.next().await { + match msg { + Ok(Message::Text(text)) => { + if text.contains("\"type\":\"ping\"") { + let _ = ws.send(Message::Text(r#"{"type":"pong"}"#.to_string())).await; + } else if text.contains("\"type\":\"reload-bundle\"") { + info!("ws: reload-bundle received"); + let _ = tx.send(ServerMsg::ReloadBundle); + } else { + info!("ws: msg: {text}"); + } + } + Ok(Message::Close(_)) => { + info!("ws: server closed connection"); + break; + } + Err(e) => { + warn!("ws: error: {e}"); + break; + } + _ => {} + } + } + } + Err(e) => { + warn!("ws: connect failed: {e}"); + } + } + + info!("ws: reconnecting in {backoff}s"); + tokio::time::sleep(Duration::from_secs(backoff)).await; + backoff = (backoff * 2).min(60); + } + }); +} + +fn build_ws_url(http_url: &str, token: &str) -> String { + // Replace http:// → ws://, https:// → wss://. Strip any trailing path. + let base = if let Some(rest) = http_url.strip_prefix("https://") { + format!("wss://{}", rest.split('/').next().unwrap_or(rest)) + } else if let Some(rest) = http_url.strip_prefix("http://") { + format!("ws://{}", rest.split('/').next().unwrap_or(rest)) + } else { + format!("ws://{http_url}") + }; + + // coordinator-ws runs on a different port (18082 vs api-http on 18081) + let base_port = base.rsplit(':').next().unwrap_or(""); + let base = if base_port == "18081" { + base.replace(":18081", ":18082") + } else if !base.contains(':') { + format!("{base}:18082") + } else { + base + }; + + format!("{base}/ws/kiosk?token={}", urlencoding(token)) +} + +fn urlencoding(s: &str) -> String { + let mut out = String::with_capacity(s.len()); + for ch in s.chars() { + if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.' | '~') { + out.push(ch); + } else { + for b in ch.to_string().bytes() { + out.push_str(&format!("%{b:02X}")); + } + } + } + out +} diff --git a/package-lock.json b/package-lock.json index 7650184..d034010 100644 --- a/package-lock.json +++ b/package-lock.json @@ -524,12 +524,20 @@ "version": "25.6.2", "resolved": "https://registry.npmjs.org/@types/node/-/node-25.6.2.tgz", "integrity": "sha512-sokuT28dxf9JT5Kady1fsXOvI4HVpjZa95NKT5y9PNTIrs2AsobR4GFAA90ZG8M+nxVRLysCXsVj6eGC7Vbrlw==", - "dev": true, "license": "MIT", "dependencies": { "undici-types": "~7.19.0" } }, + "node_modules/@types/ws": { + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.18.1.tgz", + "integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==", + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/argon2": { "version": "0.44.0", "resolved": "https://registry.npmjs.org/argon2/-/argon2-0.44.0.tgz", @@ -851,7 +859,6 @@ "version": "7.19.2", "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-7.19.2.tgz", "integrity": "sha512-qYVnV5OEm2AW8cJMCpdV20CDyaN3g0AjDlOGf1OW4iaDEx8MwdtChUp4zu4H0VP3nDRF/8RKWH+IPp9uW0YGZg==", - "dev": true, "license": "MIT" }, "node_modules/uuid": { @@ -882,6 +889,27 @@ "node": ">= 8" } }, + "node_modules/ws": { + "version": "8.20.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.20.0.tgz", + "integrity": "sha512-sAt8BhgNbzCtgGbt2OxmpuryO63ZoDk/sqaB/znQm94T4fCEsy/yV+7CdC1kJhOU9lboAEU7R3kquuycDoibVA==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/yaml": { "version": "2.8.4", "resolved": "https://registry.npmjs.org/yaml/-/yaml-2.8.4.tgz", @@ -904,10 +932,12 @@ "dependencies": { "@anyvali/js": "^0.2.0", "@bsb/base": "^9.1.11", + "@types/ws": "^8.18.1", "argon2": "^0.44.0", "h3": "^2.0.1-rc.22", "jsx-htmx": "^2.0.2", - "otpauth": "^9.5.1" + "otpauth": "^9.5.1", + "ws": "^8.20.0" }, "devDependencies": { "@types/node": "^25.0.0", diff --git a/server/package.json b/server/package.json index 269e8f9..0ab7d24 100644 --- a/server/package.json +++ b/server/package.json @@ -23,10 +23,12 @@ "dependencies": { "@anyvali/js": "^0.2.0", "@bsb/base": "^9.1.11", + "@types/ws": "^8.18.1", "argon2": "^0.44.0", "h3": "^2.0.1-rc.22", "jsx-htmx": "^2.0.2", - "otpauth": "^9.5.1" + "otpauth": "^9.5.1", + "ws": "^8.20.0" }, "devDependencies": { "@types/node": "^25.0.0", diff --git a/server/src/plugins/service-admin-http/routes-admin.ts b/server/src/plugins/service-admin-http/routes-admin.ts index c71496f..079a40a 100644 --- a/server/src/plugins/service-admin-http/routes-admin.ts +++ b/server/src/plugins/service-admin-http/routes-admin.ts @@ -5,6 +5,7 @@ import { type H3, readBody, getRouterParam, getQuery } from "h3"; import { htmlPage } from "./html-response.js"; import type { AdminDeps } from "./index.js"; import { confirmPairing } from "../../shared/pairing.js"; +import { getCoordinator } from "../../shared/coordinator-registry.js"; import { OverviewPage, CamerasPage, @@ -20,6 +21,10 @@ import { DisplayEditPage, } from "../../web-templates/admin-pages.js"; +function notifyKiosks(): void { + try { getCoordinator().notifyBundleChanged(); } catch { /* ignore */ } +} + function sanitizeRtspUrl(raw: string): string { const match = raw.match(/^(rtsp:\/\/)([^@]+)@(.+)$/); if (!match) return raw; @@ -275,6 +280,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void { cooling_timeout_seconds: coolingTimeout, resets_idle_timer: body?.["resets_idle_timer"] === "1", }); + notifyKiosks(); return new Response(null, { status: 302, headers: { location: `/admin/layouts/${id}` } }); }); @@ -343,6 +349,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void { content_type: "html", html_content: null, }); + notifyKiosks(); return new Response(null, { status: 302, headers: { location: `/admin/layouts/${layoutId}` } }); }); @@ -363,6 +370,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void { web_url: contentType === "web" ? (body?.["web_url"] ?? null) : null, html_content: contentType === "html" ? (body?.["html_content"] ?? null) : null, }); + notifyKiosks(); return new Response(null, { status: 302, headers: { location: `/admin/layouts/${layoutId}` } }); }); @@ -371,12 +379,14 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void { const layoutId = Number(getRouterParam(event, "id")); const cellId = Number(getRouterParam(event, "cellId")); deps.repo.deleteLayoutCell(cellId); + notifyKiosks(); return new Response(null, { status: 302, headers: { location: `/admin/layouts/${layoutId}` } }); }); app.post("/admin/layouts/:id/delete", (event) => { const id = Number(getRouterParam(event, "id")); deps.repo.deleteLayout(id); + notifyKiosks(); return new Response(null, { status: 302, headers: { location: "/admin/layouts" } }); }); @@ -429,6 +439,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void { idle_timeout_seconds: parseInt(body?.["idle_timeout_seconds"] ?? "0", 10), sleep_timeout_seconds: parseInt(body?.["sleep_timeout_seconds"] ?? "0", 10), } as any); + notifyKiosks(); return new Response(null, { status: 302, headers: { location: `/admin/displays/${id}` } }); }); @@ -439,6 +450,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void { const layoutId = body?.["layout_id"] ? Number(body["layout_id"]) : null; if (layoutId && Number.isFinite(layoutId)) { deps.repo.attachLayoutToDisplay(displayId, layoutId); + notifyKiosks(); } return new Response(null, { status: 302, headers: { location: `/admin/displays/${displayId}` } }); }); @@ -448,6 +460,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void { const displayId = Number(getRouterParam(event, "id")); const layoutId = Number(getRouterParam(event, "layoutId")); deps.repo.detachLayoutFromDisplay(displayId, layoutId); + notifyKiosks(); return new Response(null, { status: 302, headers: { location: `/admin/displays/${displayId}` } }); }); @@ -543,6 +556,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void { deps.repo.updateCameraStream(mainStream.id, { rtsp_uri: rtspUrl }); } } + notifyKiosks(); return new Response(null, { status: 302, headers: { location: `/admin/cameras/${id}` } }); }); @@ -574,6 +588,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void { app.post("/admin/cameras/:id/delete", (event) => { const id = Number(getRouterParam(event, "id")); deps.repo.deleteCamera(id); + notifyKiosks(); return new Response(null, { status: 302, headers: { location: "/admin/cameras" } }); }); diff --git a/server/src/plugins/service-coordinator-ws/index.ts b/server/src/plugins/service-coordinator-ws/index.ts index f42180b..24fa542 100644 --- a/server/src/plugins/service-coordinator-ws/index.ts +++ b/server/src/plugins/service-coordinator-ws/index.ts @@ -1,11 +1,15 @@ /** * service-coordinator-ws — WebSocket hub for live kiosk channel. * - * Uses raw Node.js WebSocket server (ws package via h3's optional crossws). - * For v0.1, uses a standalone HTTP server + ws upgrade. - * * Kiosks connect with ?token=. Server pushes: - * - layout-switch, power, reload-bundle, ping + * - reload-bundle: kiosk should re-fetch bundle + * - layout-switch: change active layout (future) + * - power: CEC commands (future) + * - ping: keepalive + * + * Kiosks send: + * - pong: keepalive reply + * - status: current state */ import * as av from "@anyvali/js"; import { @@ -15,13 +19,13 @@ import { createEventSchemas, type Observable, } from "@bsb/base"; -import { createServer } from "node:http"; -import type { IncomingMessage } from "node:http"; -import type { Duplex } from "node:stream"; +import { createServer, type IncomingMessage, type Server as HttpServer } from "node:http"; +import { WebSocketServer, WebSocket } from "ws"; import { getRepo } from "../../shared/plugin-registry.js"; import { initSecrets } from "../../shared/secrets.js"; -import { createAuth, type AuthApi } from "../../shared/auth.js"; +import { createAuth } from "../../shared/auth.js"; +import { setCoordinator } from "../../shared/coordinator-registry.js"; // ---- Config ----------------------------------------------------------------- @@ -62,6 +66,30 @@ export const EventSchemas = createEventSchemas({ onBroadcast: {}, }); +// ---- Connected kiosks ------------------------------------------------------- + +interface ConnectedKiosk { + id: number; + name: string; + ws: WebSocket; +} + +const connectedKiosks = new Map(); + +function sendToKiosk(kioskId: number, message: object): boolean { + const k = connectedKiosks.get(kioskId); + if (!k || k.ws.readyState !== WebSocket.OPEN) return false; + k.ws.send(JSON.stringify(message)); + return true; +} + +function broadcastAll(message: object): void { + const payload = JSON.stringify(message); + for (const k of connectedKiosks.values()) { + if (k.ws.readyState === WebSocket.OPEN) k.ws.send(payload); + } +} + // ---- Plugin ----------------------------------------------------------------- export class Plugin extends BSBService, typeof EventSchemas> { @@ -73,7 +101,8 @@ export class Plugin extends BSBService, typeof Event runBeforePlugins?: string[]; runAfterPlugins?: string[]; - private httpServer?: ReturnType; + private httpServer?: HttpServer; + private wss?: WebSocketServer; private pingInterval?: ReturnType; constructor(cfg: BSBServiceConstructor, typeof EventSchemas>) { @@ -81,34 +110,131 @@ export class Plugin extends BSBService, typeof Event } async init(obs: Observable): Promise { - // Placeholder — full WS implementation requires 'ws' package or crossws. - // For now, start a basic HTTP server that responds to health checks. - // WS upgrade will be added when crossws or ws is installed. - const server = createServer((req, res) => { + const repo = getRepo(); + const secrets = initSecrets( + { dataDir: this.config.dataDir }, + { info: (m) => obs.log.info(m as any, {}), warn: (m) => obs.log.warn(m as any, {}) }, + ); + const auth = createAuth(repo, secrets, { + sessionIdleSeconds: this.config.sessionIdleSeconds, + sessionMaxSeconds: this.config.sessionMaxSeconds, + loginLockoutThreshold: this.config.loginLockoutThreshold, + loginLockoutSeconds: this.config.loginLockoutSeconds, + argon2Memory: this.config.argon2Memory, + argon2TimeCost: this.config.argon2TimeCost, + argon2Parallelism: this.config.argon2Parallelism, + totpIssuer: this.config.totpIssuer, + cookieName: this.config.cookieName, + }); + + const httpServer = createServer((req, res) => { if (req.url === "/healthz") { res.writeHead(200, { "content-type": "application/json" }); - res.end(JSON.stringify({ status: "ok" })); + res.end(JSON.stringify({ status: "ok", connected_kiosks: connectedKiosks.size })); return; } res.writeHead(404); res.end(); }); - server.listen(this.config.port, this.config.host, () => { + const wss = new WebSocketServer({ noServer: true }); + + httpServer.on("upgrade", async (req: IncomingMessage, socket, head) => { + const url = new URL(req.url ?? "/", `http://${req.headers.host}`); + if (url.pathname !== "/ws/kiosk") { + socket.write("HTTP/1.1 404 Not Found\r\n\r\n"); + socket.destroy(); + return; + } + const token = url.searchParams.get("token"); + if (!token) { + socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n"); + socket.destroy(); + return; + } + try { + const kiosk = await auth.verifyKioskKey(token); + if (!kiosk) { + socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n"); + socket.destroy(); + return; + } + const kioskData = repo.getKioskById(kiosk.id); + if (!kioskData) { + socket.write("HTTP/1.1 404 Not Found\r\n\r\n"); + socket.destroy(); + return; + } + wss.handleUpgrade(req, socket, head, (ws) => { + connectedKiosks.set(kiosk.id, { id: kiosk.id, name: kioskData.name, ws }); + obs.log.info("kiosk connected: {name}", { name: kioskData.name }); + ws.send(JSON.stringify({ type: "connected", kiosk_id: kiosk.id })); + + ws.on("message", (data) => { + try { + const msg = JSON.parse(data.toString()); + if (msg.type === "pong") return; + if (msg.type === "status") { + obs.log.info("kiosk status: {data}", { data: data.toString() }); + } + } catch { + // ignore malformed + } + }); + + ws.on("close", () => { + connectedKiosks.delete(kiosk.id); + obs.log.info("kiosk disconnected: {name}", { name: kioskData.name }); + }); + }); + } catch (err) { + obs.log.warn("ws upgrade error: {err}", { err: (err as Error).message }); + socket.destroy(); + } + }); + + httpServer.listen(this.config.port, this.config.host, () => { obs.log.info("coordinator-ws listening on {host}:{port}", { host: this.config.host, port: this.config.port, }); }); - this.httpServer = server; + // Register coordinator API for other plugins to use + setCoordinator({ + sendToKiosk, + broadcastAll, + notifyBundleChanged: () => broadcastAll({ type: "reload-bundle" }), + notifyKioskBundleChanged: (kioskId: number) => + sendToKiosk(kioskId, { type: "reload-bundle" }), + }); + + this.httpServer = httpServer; + this.wss = wss; + + // Ping connected kiosks every 30s + this.pingInterval = setInterval(() => { + const payload = JSON.stringify({ type: "ping", t: Date.now() }); + for (const k of connectedKiosks.values()) { + try { + if (k.ws.readyState === WebSocket.OPEN) k.ws.send(payload); + } catch { + // ignore + } + } + }, 30_000); } async run(_obs: Observable): Promise {} async dispose(): Promise { if (this.pingInterval) clearInterval(this.pingInterval); + for (const k of connectedKiosks.values()) { + try { k.ws.close(); } catch { /* ignore */ } + } + connectedKiosks.clear(); return new Promise((resolve) => { + if (this.wss) this.wss.close(); if (this.httpServer) { this.httpServer.close(() => resolve()); } else { diff --git a/server/src/shared/coordinator-registry.ts b/server/src/shared/coordinator-registry.ts new file mode 100644 index 0000000..372effa --- /dev/null +++ b/server/src/shared/coordinator-registry.ts @@ -0,0 +1,27 @@ +/** + * Coordinator registry — admin-http calls these to notify kiosks of changes. + * service-coordinator-ws sets the implementation in its init(). + */ +export interface CoordinatorApi { + sendToKiosk(kioskId: number, message: object): boolean; + broadcastAll(message: object): void; + notifyBundleChanged(): void; + notifyKioskBundleChanged(kioskId: number): void; +} + +const noop: CoordinatorApi = { + sendToKiosk: () => false, + broadcastAll: () => {}, + notifyBundleChanged: () => {}, + notifyKioskBundleChanged: () => {}, +}; + +let _coordinator: CoordinatorApi = noop; + +export function setCoordinator(c: CoordinatorApi): void { + _coordinator = c; +} + +export function getCoordinator(): CoordinatorApi { + return _coordinator; +} diff --git a/server/src/shared/pairing.ts b/server/src/shared/pairing.ts index 3aaef69..79e3cd0 100644 --- a/server/src/shared/pairing.ts +++ b/server/src/shared/pairing.ts @@ -118,7 +118,16 @@ export async function confirmPairing( if (pc.consumed_at) throw new Error("pairing code already used"); if (new Date(pc.expires_at) < new Date()) throw new Error("pairing code expired"); - const kioskName = input.nameOverride || pc.kiosk_proposed_name || `kiosk-${input.code.toLowerCase()}`; + const baseName = input.nameOverride || pc.kiosk_proposed_name || `kiosk-${input.code.toLowerCase()}`; + // Auto-suffix if name collides (kiosks.name is UNIQUE) + let kioskName = baseName; + let suffix = 2; + while (repo.getKioskByName(kioskName)) { + kioskName = `${baseName}-${suffix}`; + suffix++; + if (suffix > 100) throw new Error("could not generate unique kiosk name"); + } + const kioskKeyPlaintext = `bf-${randomBytes(24).toString("base64url")}`; const kioskKeyHash = await auth.hashPassword(kioskKeyPlaintext); const kioskKeyPrefix = kioskKeyPlaintext.slice(0, 8);