use std::{ sync::mpsc::{self, Receiver, Sender, SyncSender}, thread, time::Duration, }; use anyhow::Context; use thread_priority::ThreadPriority; use crate::{ aprs::CommandHandler, config::Config, img::ImgManager, packet::{FecPacket, Packet}, radio::{SpiRadio, UartRadio}, ssdv::ssdv_encode, }; const IMAGE_PACKET_QUEUE_LENGTH: usize = 1024; pub struct Controller { config: Config, } impl Controller { pub fn new(config: Config) -> Self { Self { config } } pub fn run_forever(self) { let (img_tx, img_rx) = mpsc::sync_channel(IMAGE_PACKET_QUEUE_LENGTH); let (telem_tx, telem_rx) = mpsc::channel(); let (cmd_tx, cmd_rx) = mpsc::channel(); thread::spawn(|| Self::tx_thread(img_rx, telem_rx)); { let config = self.config.clone(); thread::spawn(|| Self::aprs_thread(config, cmd_tx, telem_tx)); } let mut manager = ImgManager::new(self.config.paths.clone(), cmd_rx); loop { while let Some((idx, bytes)) = manager.next() { if let Err(e) = self.process_image(&bytes, idx, &img_tx) { eprintln!("Error processing image: {e}"); } } // after we don't find anything, sleep for a bit while // we wait for new images thread::sleep(Duration::from_secs(1)); } } fn process_image(&self, img: &[u8], idx: u8, tx: &SyncSender<FecPacket>) -> anyhow::Result<()> { for p in ssdv_encode(&self.config.callsign, img, idx)? { tx.send(p.into()).context("TX thread died")?; } Ok(()) } // manages incoming APRS packets // used to request HD images, as well as // to initiate burst/cutdown fn aprs_thread(config: Config, tx: Sender<u8>, telem_tx: Sender<Packet>) { if let Some(ctrl) = config.control { let mut handler = CommandHandler::new( config.callsign, ctrl.secret, ctrl.burst_command, ctrl.cutdown_command, tx, telem_tx, ); handler.process_forever(); } } // manages our transceiver fn tx_thread(image_rx: Receiver<FecPacket>, telem_rx: Receiver<Packet>) { ThreadPriority::Max.set_for_current().unwrap(); //let mut radio = UartRadio::new("/dev/ttyAMA0").expect("Could not initialize radio"); let mut radio = loop { let r = SpiRadio::new(); match r { Ok(r) => break r, Err(e) => { eprintln!("Error initializing radio: {e}"); } } thread::sleep(Duration::from_millis(1000)); }; let mut text_msg_id = 0; loop { while let Ok(tm) = telem_rx.try_recv() { let tm = tm.into_raw(&mut text_msg_id).into(); if let Err(e) = radio.send_packet(&tm) { eprintln!("Could not send packet: {}", e); } } if let Ok(img) = image_rx.try_recv() { if let Err(e) = radio.send_packet(&img) { eprintln!("Could not send packet: {}", e); } } else { thread::sleep(Duration::from_millis(5)); } } } }