control.rs 4.66 KiB
use std::{
sync::mpsc::{self, Receiver, Sender, SyncSender},
thread,
time::{Duration, Instant},
};
use anyhow::Context;
use crate::{
aprs::CommandHandler,
config::Config,
img::ImgManager,
packet::{FecPacket, Packet},
radio::McuRadio,
ssdv::ssdv_encode,
};
const IMAGE_PACKET_QUEUE_LENGTH: usize = 8192;
const TEMP_REFRESH_INTERVAL: Duration = Duration::from_secs(5);
const SYNC_REFRESH_INTERVAL: Duration = Duration::from_secs(7);
pub struct Controller {
config: Config,
}
impl Controller {
pub fn new(config: Config) -> Self {
Self { config }
}
pub fn run_forever(self) {
let (img_tx, img_rx) = mpsc::sync_channel(IMAGE_PACKET_QUEUE_LENGTH);
let (telem_tx, telem_rx) = mpsc::channel();
let (cmd_tx, cmd_rx) = mpsc::channel();
{
let callsign = self.config.callsign.clone();
let uart = self.config.uart.clone();
thread::spawn(|| Self::tx_thread(callsign, uart, img_rx, telem_rx));
}
{
let config = self.config.clone();
thread::spawn(|| Self::aprs_thread(config, cmd_tx, telem_tx));
}
let mut manager = ImgManager::new(self.config.paths.clone(), cmd_rx);
loop {
while let Some((idx, bytes)) = manager.next(self.config.max_img_dimension) {
if let Err(e) = self.process_image(&bytes, idx, &img_tx) {
eprintln!("Error processing image: {e}");
}
}
// after we don't find anything, sleep for a bit while
// we wait for new images
thread::sleep(Duration::from_secs(1));
}
}
fn process_image(&self, img: &[u8], idx: u8, tx: &SyncSender<FecPacket>) -> anyhow::Result<()> {
for p in ssdv_encode(&self.config.callsign, img, idx)? {
tx.send(p.into()).context("TX thread died")?;
}
Ok(())
}
// manages incoming APRS packets
// used to request HD images, as well as
// to initiate burst/cutdown
fn aprs_thread(config: Config, tx: Sender<u8>, telem_tx: Sender<Packet>) {
if let Some(ctrl) = config.control {
let mut handler = CommandHandler::new(
config.callsign,
ctrl.secret,
ctrl.burst_command,
ctrl.cutdown_command,
tx,
telem_tx,
);
handler.process_forever();
}
}
// manages our transceiver
fn tx_thread(
callsign: String,
uart: String,
image_rx: Receiver<FecPacket>,
telem_rx: Receiver<Packet>,
) {
let mut radio = loop {
let r = McuRadio::new(&uart);
match r {
Ok(r) => break r,
Err(e) => {
eprintln!("Error initializing radio: {e}");
}
}
thread::sleep(Duration::from_millis(1000));
};
let mut text_msg_id = 0;
let mut last_got_temp = Instant::now();
let mut last_synced_at = Instant::now();
loop {
while let Ok(tm) = telem_rx.try_recv() {
let tm = tm.into_raw(&mut text_msg_id).into();
if let Err(e) = radio.send_packet(&tm) {
eprintln!("Could not send packet: {}", e);
}
}
if let Ok(img) = image_rx.try_recv() {
if let Err(e) = radio.send_packet(&img) {
eprintln!("Could not send packet: {}", e);
}
} else {
if let Err(e) = radio.flush() {
eprintln!("Could not flush radio: {}", e);
}
thread::sleep(Duration::from_millis(50));
}
if Instant::now() - last_got_temp > TEMP_REFRESH_INTERVAL {
last_got_temp = Instant::now();
let temp = match radio.get_temp() {
Ok(x) => x,
Err(e) => {
eprintln!("Could not get radio temp: {}", e);
continue;
}
};
let packet = Packet::new_text_message(&callsign, &format!("Temp: {}", temp));
if let Err(e) = radio.send_packet(&packet.into_raw(&mut text_msg_id).into()) {
eprintln!("Could not send packet: {}", e);
}
}
if Instant::now() - last_synced_at > SYNC_REFRESH_INTERVAL {
last_synced_at = Instant::now();
if let Err(e) = radio.sync() {
eprintln!("Could not sync: {}", e);
}
}
}
}
}