diff --git a/kiosk/src/pipeline.rs b/kiosk/src/pipeline.rs index 1edf9ce..1e4a197 100644 --- a/kiosk/src/pipeline.rs +++ b/kiosk/src/pipeline.rs @@ -2,8 +2,6 @@ use gstreamer::prelude::*; use gstreamer::{self as gst, Element, Pipeline}; use tracing::{error, info, warn}; -/// Create a GStreamer pipeline for an RTSP camera that outputs to a GTK4 paintable sink. -/// Returns (pipeline, paintable_sink). pub fn create_camera_pipeline(name: &str, rtsp_uri: &str) -> Option<(Pipeline, Element)> { let pipeline_name = format!("cam-{name}"); let pipeline = Pipeline::with_name(&pipeline_name); @@ -13,72 +11,102 @@ pub fn create_camera_pipeline(name: &str, rtsp_uri: &str) -> Option<(Pipeline, E .property("latency", 300u32) .property_from_str("protocols", "tcp") .build() + .map_err(|e| error!("[{pipeline_name}] rtspsrc: {e}")) .ok()?; - let decode = gst::ElementFactory::make("decodebin").build().ok()?; - let convert = gst::ElementFactory::make("videoconvert").build().ok()?; + let decode = gst::ElementFactory::make("decodebin") + .build() + .map_err(|e| error!("[{pipeline_name}] decodebin: {e}")) + .ok()?; + + let convert = gst::ElementFactory::make("videoconvert") + .build() + .map_err(|e| error!("[{pipeline_name}] videoconvert: {e}")) + .ok()?; let sink = gst::ElementFactory::make("gtk4paintablesink") .build() - .map_err(|e| { - error!("gtk4paintablesink not available: {e}. Install gst-plugin-gtk4"); - }) + .map_err(|e| error!("[{pipeline_name}] gtk4paintablesink: {e}")) .ok()?; - let queue = gst::ElementFactory::make("queue") - .property("max-size-buffers", 1u32) - .property_from_str("leaky", "downstream") - .build() + pipeline.add_many([&src, &decode, &convert, &sink]).ok()?; + gst::Element::link_many([&convert, &sink]) + .map_err(|e| error!("[{pipeline_name}] link convert→sink failed: {e}")) .ok()?; - pipeline.add_many([&src, &decode, &queue, &convert, &sink]).ok()?; - // rtspsrc → decodebin (dynamic pads) let decode_weak = decode.downgrade(); + let pn = pipeline_name.clone(); src.connect_pad_added(move |_src, pad| { + info!("[{pn}] rtspsrc pad added: {}", pad.name()); let Some(decode) = decode_weak.upgrade() else { return }; let sink_pad = decode.static_pad("sink").unwrap(); if !sink_pad.is_linked() { - let _ = pad.link(&sink_pad); - } - }); - - // decodebin → queue → convert → sink (dynamic pads, video only) - let queue_weak = queue.downgrade(); - let pipeline_name_clone = pipeline_name.clone(); - decode.connect_pad_added(move |_decode, pad| { - let caps = pad.current_caps().unwrap_or_else(|| pad.query_caps(None)); - let caps_str = caps.to_string(); - - if !caps_str.starts_with("video/") { - return; - } - - info!("[{pipeline_name_clone}] decodebin video pad: {caps_str}"); - - let Some(queue) = queue_weak.upgrade() else { return }; - let sink_pad = queue.static_pad("sink").unwrap(); - if !sink_pad.is_linked() { - if pad.link(&sink_pad).is_err() { - warn!("[{pipeline_name_clone}] decodebin pad link failed"); + match pad.link(&sink_pad) { + Ok(_) => info!("[{pn}] rtspsrc→decodebin linked"), + Err(e) => error!("[{pn}] rtspsrc→decodebin link failed: {e:?}"), } } }); - gst::Element::link_many([&queue, &convert, &sink]).ok()?; + // decodebin → convert (dynamic pads, video only) + let convert_weak = convert.downgrade(); + let pn2 = pipeline_name.clone(); + decode.connect_pad_added(move |_decode, pad| { + let caps = pad.current_caps().unwrap_or_else(|| pad.query_caps(None)); + let structure = caps.structure(0); + let media = structure.map(|s| s.name().to_string()).unwrap_or_default(); + + info!("[{pn2}] decodebin pad: {media}"); + + if !media.starts_with("video/") { + return; + } + + let Some(convert) = convert_weak.upgrade() else { return }; + let sink_pad = convert.static_pad("sink").unwrap(); + if !sink_pad.is_linked() { + match pad.link(&sink_pad) { + Ok(_) => info!("[{pn2}] decodebin→convert linked"), + Err(e) => error!("[{pn2}] decodebin→convert link failed: {e:?}"), + } + } + }); + + // Watch bus for errors + let pn3 = pipeline_name.clone(); + let bus = pipeline.bus().unwrap(); + bus.add_watch_local(move |_bus, msg| { + use gst::MessageView; + match msg.view() { + MessageView::Error(err) => { + error!("[{pn3}] pipeline error: {} ({:?})", err.error(), err.debug()); + } + MessageView::Warning(w) => { + warn!("[{pn3}] pipeline warning: {} ({:?})", w.error(), w.debug()); + } + MessageView::StateChanged(sc) => { + if sc.src().map(|s| s.name().as_str() == pn3).unwrap_or(false) { + info!("[{pn3}] state: {:?} → {:?}", sc.old(), sc.current()); + } + } + _ => {} + } + gst::glib::ControlFlow::Continue + }) + .ok()?; info!("[{pipeline_name}] pipeline created for {rtsp_uri}"); Some((pipeline, sink)) } -/// Start a pipeline. pub fn play(pipeline: &Pipeline) { - if let Err(e) = pipeline.set_state(gst::State::Playing) { - error!("Failed to set pipeline to Playing: {e:?}"); + match pipeline.set_state(gst::State::Playing) { + Ok(r) => info!("[{}] set_state(Playing) = {r:?}", pipeline.name()), + Err(e) => error!("[{}] set_state(Playing) failed: {e:?}", pipeline.name()), } } -/// Stop a pipeline. pub fn stop(pipeline: &Pipeline) { let _ = pipeline.set_state(gst::State::Null); }