use std::{thread, time::Duration}; use crate::packet::{Packet, RawPacket, VIDEO_LEN}; use anyhow::Context; use crossbeam_channel::{Sender, TrySendError}; use gstreamer::{ element_error, prelude::{Cast, ElementExtManual, GstBinExtManual}, traits::{ElementExt, GstObjectExt}, Bus, Caps, ClockTime, FlowError, FlowSuccess, Pipeline, ResourceError, State, }; use gstreamer_app::{AppSink, AppSinkCallbacks}; use gstreamer_video::{VideoCapsBuilder, VideoFormat}; const DATA_WHITENER: [u8; 255] = [ 102, 54, 91, 67, 89, 223, 198, 83, 22, 16, 48, 184, 117, 97, 153, 246, 169, 167, 89, 85, 49, 67, 128, 123, 83, 210, 58, 104, 248, 102, 219, 195, 121, 57, 101, 172, 57, 223, 190, 106, 90, 36, 39, 156, 99, 92, 87, 10, 56, 29, 137, 71, 144, 89, 82, 182, 127, 72, 93, 249, 214, 6, 155, 164, 177, 22, 84, 111, 52, 60, 68, 235, 30, 13, 174, 101, 49, 43, 95, 61, 214, 89, 110, 24, 77, 208, 103, 209, 87, 12, 218, 147, 224, 85, 178, 49, 28, 233, 65, 132, 61, 238, 70, 164, 177, 90, 158, 99, 180, 77, 251, 17, 227, 43, 109, 33, 120, 15, 89, 172, 69, 213, 25, 166, 59, 254, 220, 31, 21, 247, 246, 12, 204, 223, 134, 136, 100, 92, 20, 182, 204, 79, 239, 120, 8, 40, 138, 222, 239, 85, 15, 196, 169, 36, 38, 193, 207, 165, 7, 4, 33, 4, 120, 250, 114, 240, 128, 3, 22, 62, 254, 139, 13, 56, 153, 15, 63, 96, 62, 44, 128, 241, 25, 22, 125, 127, 0, 137, 165, 145, 156, 39, 90, 94, 145, 86, 156, 17, 187, 217, 249, 193, 112, 160, 238, 216, 183, 46, 27, 74, 38, 127, 233, 188, 184, 35, 194, 249, 90, 195, 33, 21, 67, 56, 75, 243, 140, 6, 187, 93, 49, 224, 20, 34, 204, 204, 141, 132, 252, 101, 3, 149, 107, 173, 139, 125, 41, 133, 251, 42, 171, 130, 254, 145, 34, ]; pub struct VideoPacker { buf: [u8; VIDEO_LEN], buf_i: usize, } impl VideoPacker { pub fn new() -> Self { Self { buf: [0; VIDEO_LEN], buf_i: 0, } } // theoretically suboptimal. If one packet fails to send the loop terminates and we throw away all the passed in data pub fn pack<E, F>(&mut self, data: &[u8], mut f: F) -> Result<(), E> where F: FnMut(RawPacket) -> Result<(), E>, { for d in data { self.buf[self.buf_i] = *d; self.buf_i += 1; if self.buf_i >= VIDEO_LEN { // data whitening // makes our SDR happy for (b, w) in self.buf.iter_mut().zip(DATA_WHITENER.iter()) { *b ^= w; } let pkt = Packet::new_video(self.buf); self.buf_i = 0; // very garbage. we only pass in a text id here // because the function requires it. it's not used f(pkt.into_raw(&mut 0))?; } } Ok(()) } } pub fn start_video(sender: Sender<Vec<u8>>) { thread::spawn(move || loop { match init(sender.clone()) { Ok((pipeline, bus)) => handle_pipeline(pipeline, bus), Err(e) => { eprintln!("Could not restart video pipeline: {e:?}") } } thread::sleep(Duration::from_secs(1)); }); } fn init(sender: Sender<Vec<u8>>) -> anyhow::Result<(Pipeline, Bus)> { gstreamer::init()?; let src = gstreamer::ElementFactory::make("libcamerasrc") .name("source") .build() .context("Could not create source element")?; let capsfilter = gstreamer::ElementFactory::make("capsfilter") .property( "caps", VideoCapsBuilder::new() .width(640) .height(480) .framerate((12, 1).into()) .format(VideoFormat::Nv21) .build(), ) .build() .context("Could not build capsfilter")?; let conv = gstreamer::ElementFactory::make("videoconvert") .name("conv") .build() .context("Could not build converter")?; let enc = gstreamer::ElementFactory::make("x265enc") .name("enc") .property("bitrate", 200u32) .property("key-int-max", 48) .property_from_str("speed-preset", "ultrafast") .build() .context("Could not build encoder")?; let parse = gstreamer::ElementFactory::make("h265parse") .name("parse") .property("config-interval", -1) .build() .context("Could not build parser")?; let mpegts = gstreamer::ElementFactory::make("mpegtsmux") .name("mpegtsmux") .build() .context("Could not create mpegts element")?; let appsink = AppSink::builder().caps(&Caps::new_any()).build(); let pipeline = gstreamer::Pipeline::with_name("pipeline"); pipeline.add_many([ &src, &capsfilter, &conv, &enc, &parse, &mpegts, appsink.upcast_ref(), ])?; src.link(&capsfilter)?; capsfilter.link(&conv)?; conv.link(&enc)?; enc.link(&parse)?; parse.link(&mpegts)?; mpegts.link(&appsink)?; appsink.set_callbacks( AppSinkCallbacks::builder() .new_sample(move |appsink| { let sample = appsink.pull_sample().map_err(|_| FlowError::Eos)?; let buffer = sample.buffer().ok_or_else(|| { element_error!( appsink, ResourceError::Failed, ("Failed to get buffer from appsink") ); FlowError::Error })?; let map = buffer .map_readable() .map_err(|_| -> FlowError { FlowError::Error })?; match sender.try_send(map.as_ref().to_vec()) { Ok(_) => {} Err(TrySendError::Full(_)) => { eprintln!("Video buffer overrun. Skipping frames"); } Err(TrySendError::Disconnected(_)) => { element_error!(appsink, ResourceError::Failed, ("Channel disconnected")); return Err(FlowError::Error); } } Ok(FlowSuccess::Ok) }) .build(), ); pipeline .set_state(State::Playing) .context("Unable to set the pipeline to the `Playing` state")?; let bus = pipeline.bus().context("No pipeline bus")?; Ok((pipeline, bus)) } fn handle_pipeline(pipeline: Pipeline, bus: Bus) { // Wait until error or EOS for msg in bus.iter_timed(ClockTime::NONE) { use gstreamer::MessageView; match msg.view() { MessageView::Eos(..) => break, MessageView::Error(err) => { eprintln!( "Error from {:?}: {} ({:?})", err.src().map(|s| s.path_string()), err.error(), err.debug() ); break; } _ => (), } } // Shutdown pipeline if let Err(e) = pipeline.set_state(State::Null) { println!("Unable to set the pipeline to the `Null` state: {e}"); } }