diff --git a/src/decoder/mod.rs b/src/decoder/mod.rs index 2b0314401a16968f9056771d7d15ae099d3c02c0..2773fd86ec6897cb5d7a64f61cb8689f5557bb7e 100644 --- a/src/decoder/mod.rs +++ b/src/decoder/mod.rs @@ -1,6 +1,13 @@ use std::io::{self, Read}; -use crate::util::print_packet; +use tokio::sync::mpsc; +use uuid::Uuid; + +use crate::{ + config::FelinetConfig, + felinet::PacketIn as SemiPacketIn, + util::{append_internet_to_packet_route, print_packet}, +}; use self::{demod::Demod, parse::LeF32Decoder}; @@ -18,12 +25,31 @@ impl Bit for f32 { } } -pub fn decode_forever() { +pub fn decode_forever( + felinet_send: mpsc::Sender<SemiPacketIn>, + felinet_config: &Option<FelinetConfig>, + uuid: &Uuid, +) { let stdin = io::stdin(); let soft_bits = LeF32Decoder::new(stdin.bytes().map(|x| x.expect("Could not read from stdin"))); let demod = Demod::new(soft_bits); - demod.demod(|p| { + demod.demod(|mut p| { print_packet(&p); + + if let Some(fc) = felinet_config { + if append_internet_to_packet_route(&fc.callsign, fc.ssid, &mut p).is_none() { + return; + }; + + if let Ok(semi) = p.semi_encode() { + let semi = SemiPacketIn { + raw: semi.to_vec(), + uuid: (*uuid).into(), + }; + + let _ = felinet_send.blocking_send(semi); + } + } }); } diff --git a/src/gate.rs b/src/gate.rs index 78d7afc72ef88f4b9d5c4ea14f56e6a5ae8b1d79..5d4979eae56ecff0d1031e6af26b601d723aebd5 100644 --- a/src/gate.rs +++ b/src/gate.rs @@ -11,8 +11,11 @@ use ham_cats::{ packet::Packet, whisker::{Gps, Identification, NodeInfoBuilder, Route}, }; -use std::time::{Duration, Instant}; -use tokio::sync::broadcast; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; +use tokio::sync::{mpsc, Mutex}; use tonic::{transport::Channel, Request}; use uuid::Uuid; @@ -21,7 +24,7 @@ const SOFTWARE_ID: u8 = 0x00; pub fn beacon_forever( c: &FelinetConfig, - felinet_send: broadcast::Sender<SemiPacketIn>, + felinet_send: mpsc::Sender<SemiPacketIn>, uuid: Uuid, ) -> anyhow::Result<()> { let start_time = Instant::now(); @@ -118,7 +121,7 @@ pub fn beacon_forever( uuid: uuid.into(), }; - let _ = felinet_send.send(semi.clone()); + let _ = felinet_send.send(semi.clone()).await; } }); @@ -127,13 +130,15 @@ pub fn beacon_forever( pub async fn felinet_send_forever( mut client: HandlerClient<Channel>, - tx: broadcast::Sender<SemiPacketIn>, + rx: mpsc::Receiver<SemiPacketIn>, ) { + let rx = Arc::new(Mutex::new(rx)); + loop { - let mut rx = tx.subscribe(); + let rx = rx.clone(); let felinet_stream = stream! { loop { - yield rx.recv().await.expect("Channel closed"); + yield rx.lock().await.recv().await.expect("Channel closed"); } }; diff --git a/src/main.rs b/src/main.rs index 8cbed7ad90d8b543436ec93d02e85e337931e98d..7f2a568beadf9d00fb2d4a28f825315985d33bb6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ use config::Config; use felinet::PingRequest; use gate::{beacon_forever, felinet_send_forever}; use std::{str::FromStr, time::Duration}; -use tokio::sync::broadcast; +use tokio::sync::mpsc; use tonic::{transport::Endpoint, Request}; use uuid::Uuid; @@ -23,15 +23,15 @@ pub mod felinet { async fn main() -> anyhow::Result<()> { let config = Config::load()?; - gate_forever(&config).await?; + gate_forever(config).await?; Ok(()) } -async fn gate_forever(config: &Config) -> anyhow::Result<()> { +async fn gate_forever(config: Config) -> anyhow::Result<()> { let uuid = Uuid::new_v4(); - let tx = broadcast::Sender::new(16); + let (tx, rx) = mpsc::channel(16); if let Some(felinet_config) = &config.felinet { let endpoint = Endpoint::from_str(&felinet_config.address)? @@ -44,9 +44,8 @@ async fn gate_forever(config: &Config) -> anyhow::Result<()> { { let client = client.clone(); - let tx = tx.clone(); tokio::task::spawn(async { - felinet_send_forever(client, tx).await; + felinet_send_forever(client, rx).await; }); } @@ -71,7 +70,10 @@ async fn gate_forever(config: &Config) -> anyhow::Result<()> { ); } - decoder::decode_forever(); + tokio::task::spawn_blocking(move || { + decoder::decode_forever(tx, &config.felinet, &uuid); + }) + .await?; Ok(()) } diff --git a/src/util.rs b/src/util.rs index 2b6cd3790e948bb0d57c437eb3e10c17a0b16691..3711401e0b27b4e8c15aaab356d3b29a78a30d3b 100644 --- a/src/util.rs +++ b/src/util.rs @@ -122,7 +122,6 @@ pub fn print_packet(pkt: &Packet<MAX_PACKET_LEN>) { pub fn append_internet_to_packet_route( callsign: &str, ssid: u8, - rssi: f64, packet: &mut Packet<MAX_PACKET_LEN>, ) -> Option<()> { let mut route = packet.route().unwrap_or(Route::new(0)); @@ -137,7 +136,7 @@ pub fn append_internet_to_packet_route( // we can just do the normal thing // since we're replacing the existing future node, this should never fail, unless we run out of byte space route - .append_hop(PastHop::new(Identity::new(callsign, ssid), Some(rssi))) + .append_hop(PastHop::new(Identity::new(callsign, ssid), None)) .ok()?; } else { // rip off all the futures and add ourselves @@ -153,10 +152,10 @@ pub fn append_internet_to_packet_route( } route = new_route; - route.push_past(PastHop::new(Identity::new(callsign, ssid), Some(rssi)))?; + route.push_past(PastHop::new(Identity::new(callsign, ssid), None))?; } } else { - route.push_past(PastHop::new(Identity::new(callsign, ssid), Some(rssi)))?; + route.push_past(PastHop::new(Identity::new(callsign, ssid), None))?; } route.push_internet()?;