diff --git a/Cargo.lock b/Cargo.lock index 26d725b1a2301a875537a06e3e2b3069b58fb039..a184da661e65406047cd7dad7d3b0d0467b6057f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -37,13 +37,14 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "balloon_tx_monolith" -version = "0.3.0" +version = "0.3.1" dependencies = [ "anyhow", "aprs-parser", "base64", "bitvec", "crc", + "crossbeam-channel", "gstreamer", "gstreamer-app", "gstreamer-video", diff --git a/Cargo.toml b/Cargo.toml index 62d4a1cce328e6d577c253a17b066c57f39be739..bdf714e70e24fe733d5593777b6144852165dfa5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "balloon_tx_monolith" -version = "0.3.0" +version = "0.3.1" edition = "2021" license = "MIT" @@ -32,3 +32,4 @@ toml = "0.7.3" gstreamer = "0.21.0" gstreamer-app = "0.21.0" gstreamer-video = "0.21.0" +crossbeam-channel = "0.5.8" diff --git a/src/control.rs b/src/control.rs index 994ada1acb14709f9c8f7b22f1c1b4f5295309b2..99572eb76cba5065d7a37b8e6e0673046f86de17 100644 --- a/src/control.rs +++ b/src/control.rs @@ -31,7 +31,7 @@ impl Controller { pub fn run_forever(self) { let (img_tx, img_rx) = mpsc::sync_channel(IMAGE_PACKET_QUEUE_LENGTH); - let (vid_tx, vid_rx) = mpsc::sync_channel(VIDEO_PACKET_QUEUE_LENGTH); + let (vid_tx, vid_rx) = crossbeam_channel::bounded(VIDEO_PACKET_QUEUE_LENGTH); let (telem_tx, telem_rx) = mpsc::channel(); let (cmd_tx, cmd_rx) = mpsc::channel(); @@ -94,7 +94,7 @@ impl Controller { fn tx_thread( callsign: String, image_rx: Receiver<FecPacket>, - vid_rx: Receiver<Vec<u8>>, + vid_rx: crossbeam_channel::Receiver<Vec<u8>>, telem_rx: Receiver<Packet>, ) { let mut radio = loop { @@ -135,7 +135,7 @@ impl Controller { fn tx_thread_single_iter( callsign: &str, image_rx: &Receiver<FecPacket>, - vid_rx: &Receiver<Vec<u8>>, + vid_rx: &crossbeam_channel::Receiver<Vec<u8>>, telem_rx: &Receiver<Packet>, text_msg_id: &mut u16, last_got_temp: &mut Instant, @@ -181,7 +181,15 @@ impl Controller { let temp = radio.get_temp()?; - let packet = Packet::new_text_message(callsign, &format!("Temp: {}", temp)); + let packet = Packet::new_text_message( + callsign, + &format!( + "Temp: {}, Vbuf: {} / {}", + temp, + vid_rx.len(), + VIDEO_PACKET_QUEUE_LENGTH + ), + ); radio.send_packet(&packet.into_raw(text_msg_id).into())?; } diff --git a/src/video.rs b/src/video.rs index 4af4e30bc352ce105ce2daa4965deb67535f9437..8403ab7a5068e9a0d608fb8fa9b4cb99b340f668 100644 --- a/src/video.rs +++ b/src/video.rs @@ -1,11 +1,8 @@ -use std::{ - sync::mpsc::{SyncSender, TrySendError}, - thread, - time::Duration, -}; +use std::{thread, time::Duration}; use crate::packet::{Packet, RawPacket, VIDEO_LEN}; use anyhow::Context; +use crossbeam_channel::{Sender, TrySendError}; use gstreamer::{ element_error, prelude::{Cast, ElementExtManual, GstBinExtManual}, @@ -73,7 +70,7 @@ impl VideoPacker { } } -pub fn start_video(sender: SyncSender<Vec<u8>>) { +pub fn start_video(sender: Sender<Vec<u8>>) { thread::spawn(move || loop { match init(sender.clone()) { Ok((pipeline, bus)) => handle_pipeline(pipeline, bus), @@ -86,7 +83,7 @@ pub fn start_video(sender: SyncSender<Vec<u8>>) { }); } -fn init(sender: SyncSender<Vec<u8>>) -> anyhow::Result<(Pipeline, Bus)> { +fn init(sender: Sender<Vec<u8>>) -> anyhow::Result<(Pipeline, Bus)> { gstreamer::init()?; let src = gstreamer::ElementFactory::make("libcamerasrc")