diff --git a/kiosk/src/ws_client.rs b/kiosk/src/ws_client.rs index 4fea7ea..d3ca326 100644 --- a/kiosk/src/ws_client.rs +++ b/kiosk/src/ws_client.rs @@ -2,11 +2,21 @@ use std::sync::mpsc::Sender; use std::time::Duration; use futures_util::{SinkExt, StreamExt}; +use serde::Deserialize; use tokio_tungstenite::{connect_async, tungstenite::Message}; use tracing::{info, warn}; use crate::ServerMsg; +#[derive(Deserialize)] +struct OnvifSoapRequest { + request_id: String, + url: String, + action: String, + body: String, + timeout_ms: Option, +} + /// Run the WebSocket client in a tokio runtime. Blocks the calling thread. /// Reconnects on disconnect with exponential backoff. pub fn run(server_url: &str, kiosk_key: &str, tx: Sender) { @@ -37,6 +47,17 @@ pub fn run(server_url: &str, kiosk_key: &str, tx: Sender) { Ok(Message::Text(text)) => { if text.contains("\"type\":\"ping\"") { let _ = ws.send(Message::Text(r#"{"type":"pong"}"#.to_string())).await; + } else if text.contains("\"type\":\"onvif-soap-request\"") { + let Ok(msg) = serde_json::from_str::(&text) else { + warn!("ws: onvif request was not valid JSON"); + continue; + }; + let Ok(req) = serde_json::from_value::(msg) else { + warn!("ws: onvif request missing fields"); + continue; + }; + let response = perform_onvif_soap(req).await; + let _ = ws.send(Message::Text(response)).await; } else if text.contains("\"type\":\"reload-bundle\"") { info!("ws: reload-bundle received"); let _ = tx.send(ServerMsg::ReloadBundle); @@ -102,6 +123,71 @@ pub fn run(server_url: &str, kiosk_key: &str, tx: Sender) { }); } +async fn perform_onvif_soap(req: OnvifSoapRequest) -> String { + let timeout = Duration::from_millis(req.timeout_ms.unwrap_or(8000).clamp(1000, 30000)); + let client = match reqwest::Client::builder().timeout(timeout).build() { + Ok(client) => client, + Err(err) => { + return serde_json::json!({ + "type": "onvif-soap-response", + "request_id": req.request_id, + "error": format!("kiosk ONVIF client init failed: {err}"), + }).to_string(); + } + }; + + let parsed = match req.url.parse::() { + Ok(url) => url, + Err(err) => { + return serde_json::json!({ + "type": "onvif-soap-response", + "request_id": req.request_id, + "error": format!("invalid ONVIF URL: {err}"), + }).to_string(); + } + }; + if parsed.scheme() != "http" && parsed.scheme() != "https" { + return serde_json::json!({ + "type": "onvif-soap-response", + "request_id": req.request_id, + "error": "ONVIF URL must use http or https", + }).to_string(); + } + + let result = client + .post(parsed) + .header("Content-Type", format!("application/soap+xml; charset=utf-8; action=\"{}\"", req.action)) + .header("SOAPAction", req.action) + .body(req.body) + .send() + .await; + + match result { + Ok(resp) => { + let status = resp.status().as_u16(); + match resp.text().await { + Ok(body) => serde_json::json!({ + "type": "onvif-soap-response", + "request_id": req.request_id, + "status": status, + "body": body, + }).to_string(), + Err(err) => serde_json::json!({ + "type": "onvif-soap-response", + "request_id": req.request_id, + "status": status, + "error": format!("kiosk ONVIF response read failed: {err}"), + }).to_string(), + } + } + Err(err) => serde_json::json!({ + "type": "onvif-soap-response", + "request_id": req.request_id, + "error": format!("kiosk ONVIF request failed: {err}"), + }).to_string(), + } +} + fn build_ws_url(http_url: &str, token: &str) -> String { // Replace http:// → ws://, https:// → wss://. Strip any trailing path. let base = if let Some(rest) = http_url.strip_prefix("https://") { diff --git a/server/src/plugins/service-admin-http/routes-admin.ts b/server/src/plugins/service-admin-http/routes-admin.ts index cf3603b..ffa2426 100644 --- a/server/src/plugins/service-admin-http/routes-admin.ts +++ b/server/src/plugins/service-admin-http/routes-admin.ts @@ -121,6 +121,35 @@ function formValues(v: FormValue): string[] { return v ? [v] : []; } +function kioskOnvifSoapTransport(kioskId: number) { + return async (url: string, action: string, body: string, timeoutMs: number): Promise => { + if (!Number.isInteger(kioskId) || kioskId <= 0) { + throw new Error("invalid kiosk selected for discovery"); + } + const response = await getCoordinator().requestKiosk<{ + type?: string; + request_id?: string; + status?: number; + body?: string; + error?: string; + }>(kioskId, { + type: "onvif-soap-request", + url, + action, + body, + timeout_ms: timeoutMs, + }, timeoutMs + 3000); + + if (response.error) throw new Error(response.error); + const status = Number(response.status ?? 0); + const text = response.body ?? ""; + if (status < 200 || status >= 300) { + throw new Error(`ONVIF ${action} via kiosk ${String(kioskId)} HTTP ${String(status)}: ${text.slice(0, 300)}`); + } + return text; + }; +} + function parseDiscoveredStreams(raw: string): DiscoverAddStream[] { try { const parsed = JSON.parse(raw) as DiscoverAddStream[]; @@ -461,7 +490,10 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void { app.get("/admin/cameras/discover", (event) => { const user = event.context.user!; - return htmlPage(CameraDiscoverPage({ user: user.username })); + return htmlPage(CameraDiscoverPage({ + user: user.username, + kiosks: deps.repo.listKiosks(), + })); }); app.post("/admin/cameras/discover", async (event) => { @@ -471,17 +503,22 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void { const port = parseInt(body?.["port"] ?? "80", 10) || 80; const username = (body?.["username"] ?? "").trim(); const password = body?.["password"] ?? ""; + const runner = (body?.["discovery_runner"] ?? "server").trim(); if (!host) { return htmlPage(CameraDiscoverPage({ user: user.username, + kiosks: deps.repo.listKiosks(), error: "Host required.", values: body, })); } try { - const cameras = await onvifDiscover({ host, port, username, password }); + const soapTransport = runner.startsWith("kiosk:") + ? kioskOnvifSoapTransport(Number(runner.slice("kiosk:".length))) + : undefined; + const cameras = await onvifDiscover({ host, port, username, password, soapTransport }); return htmlPage(CameraDiscoverResultsPage({ user: user.username, host, @@ -492,6 +529,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void { } catch (err) { return htmlPage(CameraDiscoverPage({ user: user.username, + kiosks: deps.repo.listKiosks(), error: `Discovery failed: ${(err as Error).message}`, values: body, })); diff --git a/server/src/plugins/service-coordinator-ws/index.ts b/server/src/plugins/service-coordinator-ws/index.ts index 7e2fde9..9f66414 100644 --- a/server/src/plugins/service-coordinator-ws/index.ts +++ b/server/src/plugins/service-coordinator-ws/index.ts @@ -20,6 +20,7 @@ import { type Observable, } from "@bsb/base"; import { createServer, type IncomingMessage, type Server as HttpServer } from "node:http"; +import { randomUUID } from "node:crypto"; import { WebSocketServer, WebSocket } from "ws"; import { getRepo } from "../../shared/plugin-registry.js"; @@ -77,6 +78,12 @@ interface ConnectedKiosk { } const connectedKiosks = new Map(); +const pendingRequests = new Map void; + reject: (err: Error) => void; + timer: ReturnType; +}>(); function sendToKiosk(kioskId: number, message: object): boolean { const k = connectedKiosks.get(kioskId); @@ -85,6 +92,28 @@ function sendToKiosk(kioskId: number, message: object): boolean { return true; } +function requestKiosk(kioskId: number, message: object, timeoutMs = 10000): Promise { + const requestId = randomUUID(); + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + pendingRequests.delete(requestId); + reject(new Error("kiosk request timed out")); + }, timeoutMs); + pendingRequests.set(requestId, { + kioskId, + resolve: (value) => resolve(value as T), + reject, + timer, + }); + const sent = sendToKiosk(kioskId, { ...message, request_id: requestId }); + if (!sent) { + clearTimeout(timer); + pendingRequests.delete(requestId); + reject(new Error("kiosk is not connected")); + } + }); +} + function broadcastAll(message: object): void { const payload = JSON.stringify(message); for (const k of connectedKiosks.values()) { @@ -193,6 +222,20 @@ export class Plugin extends BSBService, typeof Event try { const msg = JSON.parse(data.toString()) as Record; if (msg["type"] === "pong") return; + if (msg["type"] === "onvif-soap-response") { + const requestId = typeof msg["request_id"] === "string" ? msg["request_id"] : ""; + const pending = pendingRequests.get(requestId); + if (!pending || pending.kioskId !== kiosk.id) return; + pendingRequests.delete(requestId); + clearTimeout(pending.timer); + const error = typeof msg["error"] === "string" ? msg["error"] : ""; + if (error) { + pending.reject(new Error(error)); + } else { + pending.resolve(msg); + } + return; + } if (msg["type"] === "status") { obs.log.info("kiosk status: {data}", { data: data.toString() }); const cpu = typeof msg["cpu_temp_c"] === "number" ? msg["cpu_temp_c"] : null; @@ -221,6 +264,12 @@ export class Plugin extends BSBService, typeof Event ws.on("close", () => { connectedKiosks.delete(kiosk.id); + for (const [requestId, pending] of pendingRequests) { + if (pending.kioskId !== kiosk.id) continue; + pendingRequests.delete(requestId); + clearTimeout(pending.timer); + pending.reject(new Error("kiosk disconnected")); + } obs.log.info("kiosk disconnected: {name}", { name: kioskData.name }); nodered.forward("kiosk.changed", { kiosk_id: kiosk.id, @@ -245,6 +294,7 @@ export class Plugin extends BSBService, typeof Event // Register coordinator API for other plugins to use setCoordinator({ sendToKiosk, + requestKiosk, broadcastAll, notifyBundleChanged: () => broadcastAll({ type: "reload-bundle" }), notifyKioskBundleChanged: (kioskId: number) => diff --git a/server/src/shared/coordinator-registry.ts b/server/src/shared/coordinator-registry.ts index 372effa..312f0b0 100644 --- a/server/src/shared/coordinator-registry.ts +++ b/server/src/shared/coordinator-registry.ts @@ -4,6 +4,7 @@ */ export interface CoordinatorApi { sendToKiosk(kioskId: number, message: object): boolean; + requestKiosk(kioskId: number, message: object, timeoutMs?: number): Promise; broadcastAll(message: object): void; notifyBundleChanged(): void; notifyKioskBundleChanged(kioskId: number): void; @@ -11,6 +12,7 @@ export interface CoordinatorApi { const noop: CoordinatorApi = { sendToKiosk: () => false, + requestKiosk: async () => { throw new Error("kiosk is not connected"); }, broadcastAll: () => {}, notifyBundleChanged: () => {}, notifyKioskBundleChanged: () => {}, diff --git a/server/src/shared/onvif.ts b/server/src/shared/onvif.ts index 5de4262..f3d8436 100644 --- a/server/src/shared/onvif.ts +++ b/server/src/shared/onvif.ts @@ -39,8 +39,16 @@ interface DiscoverInput { mediaPath?: string; /** Optional timeout in ms (default 8s). */ timeoutMs?: number; + soapTransport?: SoapTransport; } +export type SoapTransport = ( + url: string, + action: string, + body: string, + timeoutMs: number, +) => Promise; + interface EndpointParts { origin: string; deviceUrl: string; @@ -82,7 +90,15 @@ function escapeXml(s: string): string { .replace(/'/g, "'"); } -async function soap(url: string, action: string, body: string, timeoutMs: number): Promise { +async function soap( + url: string, + action: string, + body: string, + timeoutMs: number, + transport?: SoapTransport, +): Promise { + if (transport) return transport(url, action, body, timeoutMs); + const controller = new AbortController(); const t = setTimeout(() => controller.abort(), timeoutMs); try { @@ -100,14 +116,25 @@ async function soap(url: string, action: string, body: string, timeoutMs: number throw new Error(`ONVIF ${action} HTTP ${String(res.status)}: ${text.slice(0, 300)}`); } return text; + } catch (err) { + if ((err as Error).name === "AbortError") { + throw new Error(`ONVIF ${action} timed out after ${String(timeoutMs)}ms`); + } + throw err; } finally { clearTimeout(t); } } -async function trySoap(url: string, action: string, body: string, timeoutMs: number): Promise { +async function trySoap( + url: string, + action: string, + body: string, + timeoutMs: number, + transport?: SoapTransport, +): Promise { try { - return await soap(url, action, body, timeoutMs); + return await soap(url, action, body, timeoutMs, transport); } catch { return null; } @@ -176,7 +203,12 @@ function normalizeEndpoint(input: DiscoverInput): EndpointParts { }; } -async function discoverMediaUrl(input: DiscoverInput, endpoint: EndpointParts, timeoutMs: number): Promise { +async function discoverMediaUrl( + input: DiscoverInput, + endpoint: EndpointParts, + timeoutMs: number, + transport?: SoapTransport, +): Promise { if (input.mediaPath) { return `${endpoint.origin}${input.mediaPath.startsWith("/") ? input.mediaPath : `/${input.mediaPath}`}`; } @@ -197,6 +229,7 @@ async function discoverMediaUrl(input: DiscoverInput, endpoint: EndpointParts, t "http://www.onvif.org/ver10/device/wsdl/GetCapabilities", capabilitiesEnv, timeoutMs, + transport, ); if (capabilitiesXml) { const mediaXAddr = pickFirstXAddr(capabilitiesXml, "Media"); @@ -279,7 +312,7 @@ function groupProfiles(host: string, profiles: DiscoveredProfile[]): DiscoveredC export async function discover(input: DiscoverInput): Promise { const timeoutMs = input.timeoutMs ?? 8000; const endpoint = normalizeEndpoint(input); - const mediaUrl = await discoverMediaUrl(input, endpoint, timeoutMs); + const mediaUrl = await discoverMediaUrl(input, endpoint, timeoutMs, input.soapTransport); const header = wsseHeader(input.username, input.password); @@ -290,6 +323,7 @@ export async function discover(input: DiscoverInput): Promise; } @@ -255,6 +256,20 @@ export function CameraDiscoverPage(props: CameraDiscoverProps) { +
+ + +