Skip to content
Snippets Groups Projects
control.rs 4.66 KiB
use std::{
    sync::mpsc::{self, Receiver, Sender, SyncSender},
    thread,
    time::{Duration, Instant},
};

use anyhow::Context;

use crate::{
    aprs::CommandHandler,
    config::Config,
    img::ImgManager,
    packet::{FecPacket, Packet},
    radio::McuRadio,
    ssdv::ssdv_encode,
};

const IMAGE_PACKET_QUEUE_LENGTH: usize = 8192;
const TEMP_REFRESH_INTERVAL: Duration = Duration::from_secs(5);
const SYNC_REFRESH_INTERVAL: Duration = Duration::from_secs(7);

pub struct Controller {
    config: Config,
}

impl Controller {
    pub fn new(config: Config) -> Self {
        Self { config }
    }

    pub fn run_forever(self) {
        let (img_tx, img_rx) = mpsc::sync_channel(IMAGE_PACKET_QUEUE_LENGTH);
        let (telem_tx, telem_rx) = mpsc::channel();
        let (cmd_tx, cmd_rx) = mpsc::channel();

        {
            let callsign = self.config.callsign.clone();
            let uart = self.config.uart.clone();
            thread::spawn(|| Self::tx_thread(callsign, uart, img_rx, telem_rx));
        }

        {
            let config = self.config.clone();
            thread::spawn(|| Self::aprs_thread(config, cmd_tx, telem_tx));
        }

        let mut manager = ImgManager::new(self.config.paths.clone(), cmd_rx);

        loop {
            while let Some((idx, bytes)) = manager.next(self.config.max_img_dimension) {
                if let Err(e) = self.process_image(&bytes, idx, &img_tx) {
                    eprintln!("Error processing image: {e}");
                }
            }

            // after we don't find anything, sleep for a bit while
            // we wait for new images
            thread::sleep(Duration::from_secs(1));
        }
    }

    fn process_image(&self, img: &[u8], idx: u8, tx: &SyncSender<FecPacket>) -> anyhow::Result<()> {
        for p in ssdv_encode(&self.config.callsign, img, idx)? {
            tx.send(p.into()).context("TX thread died")?;
        }

        Ok(())
    }

    // manages incoming APRS packets
    // used to request HD images, as well as
    // to initiate burst/cutdown
    fn aprs_thread(config: Config, tx: Sender<u8>, telem_tx: Sender<Packet>) {
        if let Some(ctrl) = config.control {
            let mut handler = CommandHandler::new(
                config.callsign,
                ctrl.secret,
                ctrl.burst_command,
                ctrl.cutdown_command,
                tx,
                telem_tx,
            );

            handler.process_forever();
        }
    }

    // manages our transceiver
    fn tx_thread(
        callsign: String,
        uart: String,
        image_rx: Receiver<FecPacket>,
        telem_rx: Receiver<Packet>,
    ) {
        let mut radio = loop {
            let r = McuRadio::new(&uart);

            match r {
                Ok(r) => break r,
                Err(e) => {
                    eprintln!("Error initializing radio: {e}");
                }
            }

            thread::sleep(Duration::from_millis(1000));
        };

        let mut text_msg_id = 0;
        let mut last_got_temp = Instant::now();
        let mut last_synced_at = Instant::now();
        loop {
            while let Ok(tm) = telem_rx.try_recv() {
                let tm = tm.into_raw(&mut text_msg_id).into();

                if let Err(e) = radio.send_packet(&tm) {
                    eprintln!("Could not send packet: {}", e);
                }
            }

            if let Ok(img) = image_rx.try_recv() {
                if let Err(e) = radio.send_packet(&img) {
                    eprintln!("Could not send packet: {}", e);
                }
            } else {
                if let Err(e) = radio.flush() {
                    eprintln!("Could not flush radio: {}", e);
                }
                thread::sleep(Duration::from_millis(50));
            }

            if Instant::now() - last_got_temp > TEMP_REFRESH_INTERVAL {
                last_got_temp = Instant::now();

                let temp = match radio.get_temp() {
                    Ok(x) => x,
                    Err(e) => {
                        eprintln!("Could not get radio temp: {}", e);
                        continue;
                    }
                };

                let packet = Packet::new_text_message(&callsign, &format!("Temp: {}", temp));

                if let Err(e) = radio.send_packet(&packet.into_raw(&mut text_msg_id).into()) {
                    eprintln!("Could not send packet: {}", e);
                }
            }

            if Instant::now() - last_synced_at > SYNC_REFRESH_INTERVAL {
                last_synced_at = Instant::now();

                if let Err(e) = radio.sync() {
                    eprintln!("Could not sync: {}", e);
                }
            }
        }
    }
}