use std::{ sync::mpsc::{self, Receiver, Sender, SyncSender}, thread, time::{Duration, Instant}, }; use anyhow::Context; use crate::{ aprs::CommandHandler, config::Config, img::ImgManager, packet::{FecPacket, Packet}, radio::McuRadio, ssdv::ssdv_encode, }; const IMAGE_PACKET_QUEUE_LENGTH: usize = 8192; const TEMP_REFRESH_INTERVAL: Duration = Duration::from_secs(5); const SYNC_REFRESH_INTERVAL: Duration = Duration::from_secs(7); pub struct Controller { config: Config, } impl Controller { pub fn new(config: Config) -> Self { Self { config } } pub fn run_forever(self) { let (img_tx, img_rx) = mpsc::sync_channel(IMAGE_PACKET_QUEUE_LENGTH); let (telem_tx, telem_rx) = mpsc::channel(); let (cmd_tx, cmd_rx) = mpsc::channel(); { let callsign = self.config.callsign.clone(); let uart = self.config.uart.clone(); thread::spawn(|| Self::tx_thread(callsign, uart, img_rx, telem_rx)); } { let config = self.config.clone(); thread::spawn(|| Self::aprs_thread(config, cmd_tx, telem_tx)); } let mut manager = ImgManager::new(self.config.paths.clone(), cmd_rx); loop { while let Some((idx, bytes)) = manager.next(self.config.max_img_dimension) { if let Err(e) = self.process_image(&bytes, idx, &img_tx) { eprintln!("Error processing image: {e}"); } } // after we don't find anything, sleep for a bit while // we wait for new images thread::sleep(Duration::from_secs(1)); } } fn process_image(&self, img: &[u8], idx: u8, tx: &SyncSender<FecPacket>) -> anyhow::Result<()> { for p in ssdv_encode(&self.config.callsign, img, idx)? { tx.send(p.into()).context("TX thread died")?; } Ok(()) } // manages incoming APRS packets // used to request HD images, as well as // to initiate burst/cutdown fn aprs_thread(config: Config, tx: Sender<u8>, telem_tx: Sender<Packet>) { if let Some(ctrl) = config.control { let mut handler = CommandHandler::new( config.callsign, ctrl.secret, ctrl.burst_command, ctrl.cutdown_command, tx, telem_tx, ); handler.process_forever(); } } // manages our transceiver fn tx_thread( callsign: String, uart: String, image_rx: Receiver<FecPacket>, telem_rx: Receiver<Packet>, ) { let mut radio = loop { let r = McuRadio::new(&uart); match r { Ok(r) => break r, Err(e) => { eprintln!("Error initializing radio: {e}"); } } thread::sleep(Duration::from_millis(1000)); }; let mut text_msg_id = 0; let mut last_got_temp = Instant::now(); let mut last_synced_at = Instant::now(); loop { while let Ok(tm) = telem_rx.try_recv() { let tm = tm.into_raw(&mut text_msg_id).into(); if let Err(e) = radio.send_packet(&tm) { eprintln!("Could not send packet: {}", e); } } if let Ok(img) = image_rx.try_recv() { if let Err(e) = radio.send_packet(&img) { eprintln!("Could not send packet: {}", e); } } else { if let Err(e) = radio.flush() { eprintln!("Could not flush radio: {}", e); } thread::sleep(Duration::from_millis(50)); } if Instant::now() - last_got_temp > TEMP_REFRESH_INTERVAL { last_got_temp = Instant::now(); let temp = match radio.get_temp() { Ok(x) => x, Err(e) => { eprintln!("Could not get radio temp: {}", e); continue; } }; let packet = Packet::new_text_message(&callsign, &format!("Temp: {}", temp)); if let Err(e) = radio.send_packet(&packet.into_raw(&mut text_msg_id).into()) { eprintln!("Could not send packet: {}", e); } } if Instant::now() - last_synced_at > SYNC_REFRESH_INTERVAL { last_synced_at = Instant::now(); if let Err(e) = radio.sync() { eprintln!("Could not sync: {}", e); } } } } }