From 964aab47bb58f4f3ad5b2942043bac58268986d5 Mon Sep 17 00:00:00 2001 From: Stephen <webmaster@scd31.com> Date: Mon, 17 Apr 2023 21:15:15 -0300 Subject: [PATCH] wire up requesting high quality images --- src/aprs.rs | 8 ++--- src/control.rs | 62 ++++++++++++++++++++++++------------ src/img.rs | 86 +++++++++++++++++++++++++++++++++++++++++--------- src/main.rs | 2 +- src/ssdv.rs | 11 +++++-- 5 files changed, 127 insertions(+), 42 deletions(-) diff --git a/src/aprs.rs b/src/aprs.rs index 3a55746..13f048a 100644 --- a/src/aprs.rs +++ b/src/aprs.rs @@ -39,7 +39,7 @@ pub struct CommandHandler { burst_command: String, cutdown_command: String, - img_request_queue: Sender<u8>, + img_request: Sender<u8>, } impl CommandHandler { @@ -48,7 +48,7 @@ impl CommandHandler { secret: String, burst_command: String, cutdown_command: String, - img_request_queue: Sender<u8>, + img_request: Sender<u8>, ) -> Self { Self { callsign, @@ -57,11 +57,11 @@ impl CommandHandler { burst_command, cutdown_command, - img_request_queue, + img_request, } } - fn process_forever(&self) { + pub fn process_forever(&self) { tokio::runtime::Builder::new_current_thread() .enable_all() .build() diff --git a/src/control.rs b/src/control.rs index b9e0877..3b7af03 100644 --- a/src/control.rs +++ b/src/control.rs @@ -1,7 +1,6 @@ use std::{ io::{Read, Write}, - path::PathBuf, - sync::mpsc::{self, Receiver, SyncSender}, + sync::mpsc::{self, Receiver, Sender, SyncSender}, thread, time::Duration, }; @@ -9,36 +8,38 @@ use std::{ use anyhow::Context; use serial::{BaudRate::BaudOther, SerialPort}; -use crate::{img::ImgManager, packet::FecPacket, ssdv::ssdv_encode}; +use crate::{ + aprs::CommandHandler, config::Config, img::ImgManager, packet::FecPacket, ssdv::ssdv_encode, +}; const IMAGE_PACKET_QUEUE_LENGTH: usize = 1024; pub struct Controller { - callsign: String, - manager: ImgManager, + config: Config, } impl Controller { - pub fn new(callsign: String, paths: Vec<PathBuf>) -> Self { - Self { - callsign, - manager: ImgManager::new(paths), - } + pub fn new(config: Config) -> Self { + Self { config } } - pub fn run_forever(&mut self) { - let (tx, rx) = mpsc::sync_channel(IMAGE_PACKET_QUEUE_LENGTH); + pub fn run_forever(self) { + let (img_tx, img_rx) = mpsc::sync_channel(IMAGE_PACKET_QUEUE_LENGTH); + let (cmd_tx, cmd_rx) = mpsc::channel(); + + thread::spawn(|| Self::tx_thread(img_rx)); + { + let config = self.config.clone(); + thread::spawn(|| Self::aprs_thread(config, cmd_tx)); + } - thread::spawn(|| Self::tx_thread(rx)); + let mut manager = ImgManager::new(self.config.paths.clone(), cmd_rx); - let mut idx = 0; loop { - while let Some(bytes) = self.manager.next() { - if let Err(e) = self.process_image(&bytes, idx, &tx) { + while let Some((idx, bytes, lossless)) = manager.next() { + if let Err(e) = self.process_image(&bytes, idx, &img_tx, lossless) { eprintln!("Error processing image: {e}"); } - - idx = idx.wrapping_add(1); } // after we don't find anything, sleep for a bit while @@ -47,14 +48,35 @@ impl Controller { } } - fn process_image(&self, img: &[u8], idx: u8, tx: &SyncSender<FecPacket>) -> anyhow::Result<()> { - for p in ssdv_encode(&self.callsign, img, idx)? { + fn process_image( + &self, + img: &[u8], + idx: u8, + tx: &SyncSender<FecPacket>, + lossless: bool, + ) -> anyhow::Result<()> { + for p in ssdv_encode(&self.config.callsign, img, idx, lossless)? { tx.send(p.into()).context("TX thread died")?; } Ok(()) } + // manages incoming APRS packets + // used to request HD images, as well as + // to initiate burst/cutdown + fn aprs_thread(config: Config, tx: Sender<u8>) { + let handler = CommandHandler::new( + config.callsign, + config.secret, + config.burst_command, + config.cutdown_command, + tx, + ); + + handler.process_forever(); + } + // manages our transceiver // TODO currently very hacky, because we're going to rip // most of this out when we stop using a lobotomized NPR-70 diff --git a/src/img.rs b/src/img.rs index 4696c71..c94d71b 100644 --- a/src/img.rs +++ b/src/img.rs @@ -1,5 +1,5 @@ use image::{imageops::FilterType, io::Reader as ImageReader, ImageOutputFormat}; -use std::{fs, io::Cursor, path::PathBuf}; +use std::{fs, io::Cursor, path::PathBuf, sync::mpsc::Receiver}; const IMG_DIM: u32 = 1024; const JPEG_QUALITY: u8 = 100; // let ssdv do the compression @@ -43,35 +43,78 @@ impl ImgInfo { Ok(bytes) } + + pub fn lossless_jpeg_bytes(&self) -> anyhow::Result<Vec<u8>> { + let img = ImageReader::open(&self.path)?.decode()?; + + // turn image into a square, making it bigger in the process + // must be divisible by 16 + let mut dim = img.width().max(img.height()); + let offset = dim % 16; + dim += (16 - offset) % 16; + + img.crop_imm(0, 0, dim, dim); + + let mut bytes = vec![]; + + img.write_to( + &mut Cursor::new(&mut bytes), + ImageOutputFormat::Jpeg(JPEG_QUALITY), + )?; + + Ok(bytes) + } } pub struct ImgManager { paths: Vec<PathBuf>, imgs: Vec<ImgInfo>, + // used to receive image requests, which will be sent at full resolution + rx: Receiver<u8>, // state for iterator iter: u64, path_idx: usize, found: bool, + tx_idx: u8, + // map between sent image ids, the the images + // in the array above + // used for decoding rx requests + img_map: [usize; 256], } impl ImgManager { - pub fn new(paths: Vec<PathBuf>) -> Self { + pub fn new(paths: Vec<PathBuf>, rx: Receiver<u8>) -> Self { Self { paths, imgs: vec![], + rx, iter: 0, path_idx: 0, found: false, + tx_idx: 0, + img_map: [0; 256], } } // not making this an iterator since // it's entirely valid to have a Some(...) // after a None - pub fn next(&mut self) -> Option<Vec<u8>> { + pub fn next(&mut self) -> Option<(u8, Vec<u8>, bool)> { loop { + if let Ok(req) = self.rx.try_recv() { + let img = self + .img_map + .get(usize::from(req)) + .and_then(|id| self.imgs.get(*id)) + .and_then(|img| img.lossless_jpeg_bytes().ok()); + + if let Some(img) = img { + return Some((self.inc_tx(None), img, true)); + } + } + if let Err(e) = self.scan_path() { eprintln!( "Error scanning {}: {}", @@ -84,21 +127,22 @@ impl ImgManager { let new = self .imgs .iter_mut() - .filter(|img| !img.txed) // non-sent images only - .filter(|img| img.path_idx == self.path_idx) - .filter(|img| img.found_on_iter == self.iter) - .max_by_key(|img| img.size_bytes); + .enumerate() + .filter(|(_, img)| !img.txed) // non-sent images only + .filter(|(_, img)| img.path_idx == self.path_idx) + .filter(|(_, img)| img.found_on_iter == self.iter) + .max_by_key(|(_, img)| img.size_bytes); - if let Some(new) = new { + if let Some((new_id, new)) = new { new.txed = true; let bytes = new.compressed_jpeg_bytes(); self.inc_iter(true); - // not perfect. ideally we would keep looking throug the new images + // not perfect. ideally we would keep looking through the new images // if this Err'd if let Ok(x) = bytes { - return Some(x); + return Some((self.inc_tx(Some(new_id)), x, false)); } } @@ -106,18 +150,19 @@ impl ImgManager { let old = self .imgs .iter_mut() - .filter(|img| !img.txed) // non-sent images only - .filter(|img| img.path_idx == self.path_idx) - .max_by_key(|img| img.size_bytes); + .enumerate() + .filter(|(_, img)| !img.txed) // non-sent images only + .filter(|(_, img)| img.path_idx == self.path_idx) + .max_by_key(|(_, img)| img.size_bytes); - if let Some(old) = old { + if let Some((old_idx, old)) = old { old.txed = true; let bytes = old.compressed_jpeg_bytes(); self.inc_iter(true); if let Ok(x) = bytes { - return Some(x); + return Some((self.inc_tx(Some(old_idx)), x, false)); } } @@ -146,6 +191,17 @@ impl ImgManager { true } + fn inc_tx(&mut self, id: Option<usize>) -> u8 { + let idx = self.tx_idx; + self.tx_idx = self.tx_idx.wrapping_add(1); + + if let Some(id) = id { + self.img_map[usize::from(idx)] = id; + } + + idx + } + fn scan_path(&mut self) -> anyhow::Result<()> { // imgs we don't already have let new_imgs: Vec<_> = fs::read_dir(&self.paths[self.path_idx])? diff --git a/src/main.rs b/src/main.rs index da3a131..95e7a48 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,7 +13,7 @@ mod ssdv; fn main() -> anyhow::Result<()> { let config = Config::load()?; - let mut controller = Controller::new(config.callsign, config.paths); + let controller = Controller::new(config); controller.run_forever(); diff --git a/src/ssdv.rs b/src/ssdv.rs index 63784d3..b9b3bae 100644 --- a/src/ssdv.rs +++ b/src/ssdv.rs @@ -4,7 +4,14 @@ use crate::packet::RawPacket; // TODO eventually rewrite Ssdv in Rust? // Don't want to use FFI because then segfaults can hurt us -pub fn ssdv_encode(callsign: &str, img: &[u8], img_idx: u8) -> anyhow::Result<Vec<RawPacket>> { +pub fn ssdv_encode( + callsign: &str, + img: &[u8], + img_idx: u8, + lossless: bool, +) -> anyhow::Result<Vec<RawPacket>> { + let quality = if lossless { "7" } else { "6" }; + let (stdout, stderr) = subprocess::Exec::cmd("ssdv") .args(&[ "-e", @@ -12,7 +19,7 @@ pub fn ssdv_encode(callsign: &str, img: &[u8], img_idx: u8) -> anyhow::Result<Ve callsign, "-n", "-q", - "6", + quality, "-i", &format!("{}", img_idx), ]) -- GitLab