feat(onvif): run discovery from selected kiosk

This commit is contained in:
Mitchell R 2026-05-20 06:16:27 +02:00
parent 6995990aca
commit 9942957bcf
No known key found for this signature in database
6 changed files with 234 additions and 7 deletions

View file

@ -2,11 +2,21 @@ use std::sync::mpsc::Sender;
use std::time::Duration; use std::time::Duration;
use futures_util::{SinkExt, StreamExt}; use futures_util::{SinkExt, StreamExt};
use serde::Deserialize;
use tokio_tungstenite::{connect_async, tungstenite::Message}; use tokio_tungstenite::{connect_async, tungstenite::Message};
use tracing::{info, warn}; use tracing::{info, warn};
use crate::ServerMsg; use crate::ServerMsg;
#[derive(Deserialize)]
struct OnvifSoapRequest {
request_id: String,
url: String,
action: String,
body: String,
timeout_ms: Option<u64>,
}
/// Run the WebSocket client in a tokio runtime. Blocks the calling thread. /// Run the WebSocket client in a tokio runtime. Blocks the calling thread.
/// Reconnects on disconnect with exponential backoff. /// Reconnects on disconnect with exponential backoff.
pub fn run(server_url: &str, kiosk_key: &str, tx: Sender<ServerMsg>) { pub fn run(server_url: &str, kiosk_key: &str, tx: Sender<ServerMsg>) {
@ -37,6 +47,17 @@ pub fn run(server_url: &str, kiosk_key: &str, tx: Sender<ServerMsg>) {
Ok(Message::Text(text)) => { Ok(Message::Text(text)) => {
if text.contains("\"type\":\"ping\"") { if text.contains("\"type\":\"ping\"") {
let _ = ws.send(Message::Text(r#"{"type":"pong"}"#.to_string())).await; let _ = ws.send(Message::Text(r#"{"type":"pong"}"#.to_string())).await;
} else if text.contains("\"type\":\"onvif-soap-request\"") {
let Ok(msg) = serde_json::from_str::<serde_json::Value>(&text) else {
warn!("ws: onvif request was not valid JSON");
continue;
};
let Ok(req) = serde_json::from_value::<OnvifSoapRequest>(msg) else {
warn!("ws: onvif request missing fields");
continue;
};
let response = perform_onvif_soap(req).await;
let _ = ws.send(Message::Text(response)).await;
} else if text.contains("\"type\":\"reload-bundle\"") { } else if text.contains("\"type\":\"reload-bundle\"") {
info!("ws: reload-bundle received"); info!("ws: reload-bundle received");
let _ = tx.send(ServerMsg::ReloadBundle); let _ = tx.send(ServerMsg::ReloadBundle);
@ -102,6 +123,71 @@ pub fn run(server_url: &str, kiosk_key: &str, tx: Sender<ServerMsg>) {
}); });
} }
async fn perform_onvif_soap(req: OnvifSoapRequest) -> String {
let timeout = Duration::from_millis(req.timeout_ms.unwrap_or(8000).clamp(1000, 30000));
let client = match reqwest::Client::builder().timeout(timeout).build() {
Ok(client) => client,
Err(err) => {
return serde_json::json!({
"type": "onvif-soap-response",
"request_id": req.request_id,
"error": format!("kiosk ONVIF client init failed: {err}"),
}).to_string();
}
};
let parsed = match req.url.parse::<url::Url>() {
Ok(url) => url,
Err(err) => {
return serde_json::json!({
"type": "onvif-soap-response",
"request_id": req.request_id,
"error": format!("invalid ONVIF URL: {err}"),
}).to_string();
}
};
if parsed.scheme() != "http" && parsed.scheme() != "https" {
return serde_json::json!({
"type": "onvif-soap-response",
"request_id": req.request_id,
"error": "ONVIF URL must use http or https",
}).to_string();
}
let result = client
.post(parsed)
.header("Content-Type", format!("application/soap+xml; charset=utf-8; action=\"{}\"", req.action))
.header("SOAPAction", req.action)
.body(req.body)
.send()
.await;
match result {
Ok(resp) => {
let status = resp.status().as_u16();
match resp.text().await {
Ok(body) => serde_json::json!({
"type": "onvif-soap-response",
"request_id": req.request_id,
"status": status,
"body": body,
}).to_string(),
Err(err) => serde_json::json!({
"type": "onvif-soap-response",
"request_id": req.request_id,
"status": status,
"error": format!("kiosk ONVIF response read failed: {err}"),
}).to_string(),
}
}
Err(err) => serde_json::json!({
"type": "onvif-soap-response",
"request_id": req.request_id,
"error": format!("kiosk ONVIF request failed: {err}"),
}).to_string(),
}
}
fn build_ws_url(http_url: &str, token: &str) -> String { fn build_ws_url(http_url: &str, token: &str) -> String {
// Replace http:// → ws://, https:// → wss://. Strip any trailing path. // Replace http:// → ws://, https:// → wss://. Strip any trailing path.
let base = if let Some(rest) = http_url.strip_prefix("https://") { let base = if let Some(rest) = http_url.strip_prefix("https://") {

View file

@ -121,6 +121,35 @@ function formValues(v: FormValue): string[] {
return v ? [v] : []; return v ? [v] : [];
} }
function kioskOnvifSoapTransport(kioskId: number) {
return async (url: string, action: string, body: string, timeoutMs: number): Promise<string> => {
if (!Number.isInteger(kioskId) || kioskId <= 0) {
throw new Error("invalid kiosk selected for discovery");
}
const response = await getCoordinator().requestKiosk<{
type?: string;
request_id?: string;
status?: number;
body?: string;
error?: string;
}>(kioskId, {
type: "onvif-soap-request",
url,
action,
body,
timeout_ms: timeoutMs,
}, timeoutMs + 3000);
if (response.error) throw new Error(response.error);
const status = Number(response.status ?? 0);
const text = response.body ?? "";
if (status < 200 || status >= 300) {
throw new Error(`ONVIF ${action} via kiosk ${String(kioskId)} HTTP ${String(status)}: ${text.slice(0, 300)}`);
}
return text;
};
}
function parseDiscoveredStreams(raw: string): DiscoverAddStream[] { function parseDiscoveredStreams(raw: string): DiscoverAddStream[] {
try { try {
const parsed = JSON.parse(raw) as DiscoverAddStream[]; const parsed = JSON.parse(raw) as DiscoverAddStream[];
@ -461,7 +490,10 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void {
app.get("/admin/cameras/discover", (event) => { app.get("/admin/cameras/discover", (event) => {
const user = event.context.user!; const user = event.context.user!;
return htmlPage(CameraDiscoverPage({ user: user.username })); return htmlPage(CameraDiscoverPage({
user: user.username,
kiosks: deps.repo.listKiosks(),
}));
}); });
app.post("/admin/cameras/discover", async (event) => { app.post("/admin/cameras/discover", async (event) => {
@ -471,17 +503,22 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void {
const port = parseInt(body?.["port"] ?? "80", 10) || 80; const port = parseInt(body?.["port"] ?? "80", 10) || 80;
const username = (body?.["username"] ?? "").trim(); const username = (body?.["username"] ?? "").trim();
const password = body?.["password"] ?? ""; const password = body?.["password"] ?? "";
const runner = (body?.["discovery_runner"] ?? "server").trim();
if (!host) { if (!host) {
return htmlPage(CameraDiscoverPage({ return htmlPage(CameraDiscoverPage({
user: user.username, user: user.username,
kiosks: deps.repo.listKiosks(),
error: "Host required.", error: "Host required.",
values: body, values: body,
})); }));
} }
try { try {
const cameras = await onvifDiscover({ host, port, username, password }); const soapTransport = runner.startsWith("kiosk:")
? kioskOnvifSoapTransport(Number(runner.slice("kiosk:".length)))
: undefined;
const cameras = await onvifDiscover({ host, port, username, password, soapTransport });
return htmlPage(CameraDiscoverResultsPage({ return htmlPage(CameraDiscoverResultsPage({
user: user.username, user: user.username,
host, host,
@ -492,6 +529,7 @@ export function registerAdminRoutes(app: H3, deps: AdminDeps): void {
} catch (err) { } catch (err) {
return htmlPage(CameraDiscoverPage({ return htmlPage(CameraDiscoverPage({
user: user.username, user: user.username,
kiosks: deps.repo.listKiosks(),
error: `Discovery failed: ${(err as Error).message}`, error: `Discovery failed: ${(err as Error).message}`,
values: body, values: body,
})); }));

View file

@ -20,6 +20,7 @@ import {
type Observable, type Observable,
} from "@bsb/base"; } from "@bsb/base";
import { createServer, type IncomingMessage, type Server as HttpServer } from "node:http"; import { createServer, type IncomingMessage, type Server as HttpServer } from "node:http";
import { randomUUID } from "node:crypto";
import { WebSocketServer, WebSocket } from "ws"; import { WebSocketServer, WebSocket } from "ws";
import { getRepo } from "../../shared/plugin-registry.js"; import { getRepo } from "../../shared/plugin-registry.js";
@ -77,6 +78,12 @@ interface ConnectedKiosk {
} }
const connectedKiosks = new Map<number, ConnectedKiosk>(); const connectedKiosks = new Map<number, ConnectedKiosk>();
const pendingRequests = new Map<string, {
kioskId: number;
resolve: (value: unknown) => void;
reject: (err: Error) => void;
timer: ReturnType<typeof setTimeout>;
}>();
function sendToKiosk(kioskId: number, message: object): boolean { function sendToKiosk(kioskId: number, message: object): boolean {
const k = connectedKiosks.get(kioskId); const k = connectedKiosks.get(kioskId);
@ -85,6 +92,28 @@ function sendToKiosk(kioskId: number, message: object): boolean {
return true; return true;
} }
function requestKiosk<T = unknown>(kioskId: number, message: object, timeoutMs = 10000): Promise<T> {
const requestId = randomUUID();
return new Promise<T>((resolve, reject) => {
const timer = setTimeout(() => {
pendingRequests.delete(requestId);
reject(new Error("kiosk request timed out"));
}, timeoutMs);
pendingRequests.set(requestId, {
kioskId,
resolve: (value) => resolve(value as T),
reject,
timer,
});
const sent = sendToKiosk(kioskId, { ...message, request_id: requestId });
if (!sent) {
clearTimeout(timer);
pendingRequests.delete(requestId);
reject(new Error("kiosk is not connected"));
}
});
}
function broadcastAll(message: object): void { function broadcastAll(message: object): void {
const payload = JSON.stringify(message); const payload = JSON.stringify(message);
for (const k of connectedKiosks.values()) { for (const k of connectedKiosks.values()) {
@ -193,6 +222,20 @@ export class Plugin extends BSBService<InstanceType<typeof Config>, typeof Event
try { try {
const msg = JSON.parse(data.toString()) as Record<string, unknown>; const msg = JSON.parse(data.toString()) as Record<string, unknown>;
if (msg["type"] === "pong") return; if (msg["type"] === "pong") return;
if (msg["type"] === "onvif-soap-response") {
const requestId = typeof msg["request_id"] === "string" ? msg["request_id"] : "";
const pending = pendingRequests.get(requestId);
if (!pending || pending.kioskId !== kiosk.id) return;
pendingRequests.delete(requestId);
clearTimeout(pending.timer);
const error = typeof msg["error"] === "string" ? msg["error"] : "";
if (error) {
pending.reject(new Error(error));
} else {
pending.resolve(msg);
}
return;
}
if (msg["type"] === "status") { if (msg["type"] === "status") {
obs.log.info("kiosk status: {data}", { data: data.toString() }); obs.log.info("kiosk status: {data}", { data: data.toString() });
const cpu = typeof msg["cpu_temp_c"] === "number" ? msg["cpu_temp_c"] : null; const cpu = typeof msg["cpu_temp_c"] === "number" ? msg["cpu_temp_c"] : null;
@ -221,6 +264,12 @@ export class Plugin extends BSBService<InstanceType<typeof Config>, typeof Event
ws.on("close", () => { ws.on("close", () => {
connectedKiosks.delete(kiosk.id); connectedKiosks.delete(kiosk.id);
for (const [requestId, pending] of pendingRequests) {
if (pending.kioskId !== kiosk.id) continue;
pendingRequests.delete(requestId);
clearTimeout(pending.timer);
pending.reject(new Error("kiosk disconnected"));
}
obs.log.info("kiosk disconnected: {name}", { name: kioskData.name }); obs.log.info("kiosk disconnected: {name}", { name: kioskData.name });
nodered.forward("kiosk.changed", { nodered.forward("kiosk.changed", {
kiosk_id: kiosk.id, kiosk_id: kiosk.id,
@ -245,6 +294,7 @@ export class Plugin extends BSBService<InstanceType<typeof Config>, typeof Event
// Register coordinator API for other plugins to use // Register coordinator API for other plugins to use
setCoordinator({ setCoordinator({
sendToKiosk, sendToKiosk,
requestKiosk,
broadcastAll, broadcastAll,
notifyBundleChanged: () => broadcastAll({ type: "reload-bundle" }), notifyBundleChanged: () => broadcastAll({ type: "reload-bundle" }),
notifyKioskBundleChanged: (kioskId: number) => notifyKioskBundleChanged: (kioskId: number) =>

View file

@ -4,6 +4,7 @@
*/ */
export interface CoordinatorApi { export interface CoordinatorApi {
sendToKiosk(kioskId: number, message: object): boolean; sendToKiosk(kioskId: number, message: object): boolean;
requestKiosk<T = unknown>(kioskId: number, message: object, timeoutMs?: number): Promise<T>;
broadcastAll(message: object): void; broadcastAll(message: object): void;
notifyBundleChanged(): void; notifyBundleChanged(): void;
notifyKioskBundleChanged(kioskId: number): void; notifyKioskBundleChanged(kioskId: number): void;
@ -11,6 +12,7 @@ export interface CoordinatorApi {
const noop: CoordinatorApi = { const noop: CoordinatorApi = {
sendToKiosk: () => false, sendToKiosk: () => false,
requestKiosk: async () => { throw new Error("kiosk is not connected"); },
broadcastAll: () => {}, broadcastAll: () => {},
notifyBundleChanged: () => {}, notifyBundleChanged: () => {},
notifyKioskBundleChanged: () => {}, notifyKioskBundleChanged: () => {},

View file

@ -39,8 +39,16 @@ interface DiscoverInput {
mediaPath?: string; mediaPath?: string;
/** Optional timeout in ms (default 8s). */ /** Optional timeout in ms (default 8s). */
timeoutMs?: number; timeoutMs?: number;
soapTransport?: SoapTransport;
} }
export type SoapTransport = (
url: string,
action: string,
body: string,
timeoutMs: number,
) => Promise<string>;
interface EndpointParts { interface EndpointParts {
origin: string; origin: string;
deviceUrl: string; deviceUrl: string;
@ -82,7 +90,15 @@ function escapeXml(s: string): string {
.replace(/'/g, "&apos;"); .replace(/'/g, "&apos;");
} }
async function soap(url: string, action: string, body: string, timeoutMs: number): Promise<string> { async function soap(
url: string,
action: string,
body: string,
timeoutMs: number,
transport?: SoapTransport,
): Promise<string> {
if (transport) return transport(url, action, body, timeoutMs);
const controller = new AbortController(); const controller = new AbortController();
const t = setTimeout(() => controller.abort(), timeoutMs); const t = setTimeout(() => controller.abort(), timeoutMs);
try { try {
@ -100,14 +116,25 @@ async function soap(url: string, action: string, body: string, timeoutMs: number
throw new Error(`ONVIF ${action} HTTP ${String(res.status)}: ${text.slice(0, 300)}`); throw new Error(`ONVIF ${action} HTTP ${String(res.status)}: ${text.slice(0, 300)}`);
} }
return text; return text;
} catch (err) {
if ((err as Error).name === "AbortError") {
throw new Error(`ONVIF ${action} timed out after ${String(timeoutMs)}ms`);
}
throw err;
} finally { } finally {
clearTimeout(t); clearTimeout(t);
} }
} }
async function trySoap(url: string, action: string, body: string, timeoutMs: number): Promise<string | null> { async function trySoap(
url: string,
action: string,
body: string,
timeoutMs: number,
transport?: SoapTransport,
): Promise<string | null> {
try { try {
return await soap(url, action, body, timeoutMs); return await soap(url, action, body, timeoutMs, transport);
} catch { } catch {
return null; return null;
} }
@ -176,7 +203,12 @@ function normalizeEndpoint(input: DiscoverInput): EndpointParts {
}; };
} }
async function discoverMediaUrl(input: DiscoverInput, endpoint: EndpointParts, timeoutMs: number): Promise<string> { async function discoverMediaUrl(
input: DiscoverInput,
endpoint: EndpointParts,
timeoutMs: number,
transport?: SoapTransport,
): Promise<string> {
if (input.mediaPath) { if (input.mediaPath) {
return `${endpoint.origin}${input.mediaPath.startsWith("/") ? input.mediaPath : `/${input.mediaPath}`}`; return `${endpoint.origin}${input.mediaPath.startsWith("/") ? input.mediaPath : `/${input.mediaPath}`}`;
} }
@ -197,6 +229,7 @@ async function discoverMediaUrl(input: DiscoverInput, endpoint: EndpointParts, t
"http://www.onvif.org/ver10/device/wsdl/GetCapabilities", "http://www.onvif.org/ver10/device/wsdl/GetCapabilities",
capabilitiesEnv, capabilitiesEnv,
timeoutMs, timeoutMs,
transport,
); );
if (capabilitiesXml) { if (capabilitiesXml) {
const mediaXAddr = pickFirstXAddr(capabilitiesXml, "Media"); const mediaXAddr = pickFirstXAddr(capabilitiesXml, "Media");
@ -279,7 +312,7 @@ function groupProfiles(host: string, profiles: DiscoveredProfile[]): DiscoveredC
export async function discover(input: DiscoverInput): Promise<DiscoveredCamera[]> { export async function discover(input: DiscoverInput): Promise<DiscoveredCamera[]> {
const timeoutMs = input.timeoutMs ?? 8000; const timeoutMs = input.timeoutMs ?? 8000;
const endpoint = normalizeEndpoint(input); const endpoint = normalizeEndpoint(input);
const mediaUrl = await discoverMediaUrl(input, endpoint, timeoutMs); const mediaUrl = await discoverMediaUrl(input, endpoint, timeoutMs, input.soapTransport);
const header = wsseHeader(input.username, input.password); const header = wsseHeader(input.username, input.password);
@ -290,6 +323,7 @@ export async function discover(input: DiscoverInput): Promise<DiscoveredCamera[]
"http://www.onvif.org/ver10/media/wsdl/GetProfiles", "http://www.onvif.org/ver10/media/wsdl/GetProfiles",
profilesEnv, profilesEnv,
timeoutMs, timeoutMs,
input.soapTransport,
); );
const profileBlocks = splitProfiles(profilesXml); const profileBlocks = splitProfiles(profilesXml);
@ -330,6 +364,7 @@ export async function discover(input: DiscoverInput): Promise<DiscoveredCamera[]
"http://www.onvif.org/ver10/media/wsdl/GetStreamUri", "http://www.onvif.org/ver10/media/wsdl/GetStreamUri",
streamEnv, streamEnv,
timeoutMs, timeoutMs,
input.soapTransport,
); );
} catch { } catch {
continue; // skip profiles we can't get a stream uri for continue; // skip profiles we can't get a stream uri for
@ -347,6 +382,7 @@ export async function discover(input: DiscoverInput): Promise<DiscoveredCamera[]
"http://www.onvif.org/ver10/media/wsdl/GetSnapshotUri", "http://www.onvif.org/ver10/media/wsdl/GetSnapshotUri",
snapshotEnv, snapshotEnv,
timeoutMs, timeoutMs,
input.soapTransport,
); );
snapshotUri = pickAll(snapshotXml, "Uri")[0] ?? null; snapshotUri = pickAll(snapshotXml, "Uri")[0] ?? null;
} catch { } catch {

View file

@ -228,6 +228,7 @@ export function CameraNewPage(props: CameraNewProps) {
interface CameraDiscoverProps { interface CameraDiscoverProps {
user: string; user: string;
kiosks: Kiosk[];
error?: string; error?: string;
values?: Record<string, string>; values?: Record<string, string>;
} }
@ -255,6 +256,20 @@ export function CameraDiscoverPage(props: CameraDiscoverProps) {
<label for="port">Port</label> <label for="port">Port</label>
<input id="port" name="port" type="number" class="form-input" value={v["port"] ?? "80"} /> <input id="port" name="port" type="number" class="form-input" value={v["port"] ?? "80"} />
</div> </div>
<div class="form-group">
<label for="discovery_runner">Run discovery from</label>
<select id="discovery_runner" name="discovery_runner" class="form-input">
<option value="server" selected={(v["discovery_runner"] ?? "server") === "server"}>Server</option>
{props.kiosks.map((k) => (
<option
value={`kiosk:${String(k.id)}`}
selected={v["discovery_runner"] === `kiosk:${String(k.id)}`}
>
{k.name}{k.local_last_ip ? ` (${k.local_last_ip})` : ""}
</option>
)).join("")}
</select>
</div>
<div style="display:grid; grid-template-columns:1fr 1fr; gap:0.75rem"> <div style="display:grid; grid-template-columns:1fr 1fr; gap:0.75rem">
<div class="form-group"> <div class="form-group">
<label for="username">Username</label> <label for="username">Username</label>