diff --git a/src/main.rs b/src/main.rs index 6e5347f..751dffc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,7 +24,7 @@ use std::{ sync::Arc, time::Duration, }; -use tokio::{join, net::UdpSocket, select, time::timeout}; +use tokio::{join, net::UdpSocket, select, sync::mpsc::error::TryRecvError, time::timeout}; use crate::{ bfsk::BFSKMod, @@ -46,6 +46,12 @@ const SAMPLE_RATE: u32 = 48000; const CENTER_FREQ: f32 = 1700.; const DEVIATION: f32 = 500.; +pub enum SampleSenderCommand { + Open, + Close, + Sample(f32), +} + pub trait SampleSender { async fn open_link(&mut self); async fn send_sample(&mut self, sample: f32); @@ -87,69 +93,102 @@ impl SampleSender for WavSampleSender { } } -struct Transceiver {} +struct Transceiver { + tx_stream: Sender>, + rx_stream: Receiver>, + + eye_receiver: Receiver>, +} impl Transceiver { - pub async fn start( + pub async fn send(&self, data: Vec) { + self.tx_stream.send(data).await; + } + + pub fn get_sender(&self) -> Sender> { + self.tx_stream.clone() + } + + pub fn try_recv(&mut self) -> Result, TryRecvError> { + self.rx_stream.try_recv() + } + + pub fn try_recv_eye(&mut self) -> Result, TryRecvError> { + self.eye_receiver.try_recv() + } + + pub fn start( mut sample_stream: Receiver, - mut tx_stream: Receiver>, - mut rx_stream: Sender>, - mut sample_sender: T, - mut eye_sender: Sender>, - ) { + mut sample_sender: Sender, + ) -> Self { let mut resend: Option> = None; - loop { - select! { - _ = Self::squelch_detector(&mut sample_stream) => - { - println!("Squelch up"); - select! + let (mut eyes_tx, eyes_rx) = channel::>(1024); + + let (rx_stream_sender, rx_stream_receiver) = channel::>(1024); + let (tx_stream_sender, mut tx_stream_receiver) = channel::>(1024); + + tokio::spawn(async move { + loop { + select! { + _ = Self::squelch_detector(&mut sample_stream) => { - x = Self::receive(&mut sample_stream, &mut eye_sender) => + println!("Squelch up"); + select! { - // Flush channel - while sample_stream.recv().await.is_some() {}; - match x + x = Self::receive(&mut sample_stream, &mut eyes_tx) => { - Err(()) => {continue;}, - Ok(Frame::Ack) => + match x { - resend = None; + Err(()) => {continue;}, + Ok(Frame::Ack) => + { + resend = None; + } + Ok(Frame::Data(data)) => + { + println!("Got data frame, send data"); + let _ = rx_stream_sender.send(data).await; + tokio::time::sleep(Duration::from_secs(1)).await; + println!("Got data frame, sending ack"); + Self::transmit(Frame::Ack, &mut sample_sender).await; + println!("Sent ack"); + } } - Ok(Frame::Data(data)) => - { - let _ = rx_stream.send(data).await; - tokio::time::sleep(Duration::from_secs(1)).await; - Self::transmit(Frame::Ack, &mut sample_sender).await; - } - } - }, - _ = tokio::time::sleep(Duration::from_secs(100)) => {continue;}, //TODO: 65 - //sec - //timeout - } - }, // End squelch - data_opt = async - { - tokio::time::sleep(Duration::from_secs(2)).await; - if let Some(resend_data) = resend.clone() + }, + _ = tokio::time::sleep(Duration::from_secs(100)) => {continue;}, //TODO: 65 + //sec + //timeout + } + }, // End squelch + data_opt = async { - Some(resend_data) + tokio::time::sleep(Duration::from_secs(2)).await; + if let Some(resend_data) = resend.clone() + { + Some(resend_data) + } + else + { + tx_stream_receiver.recv().await + } } - else + => { - tx_stream.recv().await - } - } - => - { - if let Some(data) = data_opt - { - Self::transmit(Frame::Data(data.clone()), &mut sample_sender).await; - resend = Some(data); + if let Some(data) = data_opt + { + Self::transmit(Frame::Data(data.clone()), &mut sample_sender).await; + resend = Some(data); + } } } } + }); + + Self { + eye_receiver: eyes_rx, + + tx_stream: tx_stream_sender, + rx_stream: rx_stream_receiver, } } @@ -174,7 +213,7 @@ impl Transceiver { } } - pub async fn transmit(frame: Frame, samples_sender: &mut T) { + pub async fn transmit(frame: Frame, samples_sender: &mut Sender) { let bytes = frame.bytes(); let mut bit_stream = bytes.iter().flat_map(|x| byte_to_bits(*x)); let modulator = BFSKMod::new( @@ -184,12 +223,14 @@ impl Transceiver { ); let up_lo = Nco::new(hz_to_rad_per_sample(CENTER_FREQ, SAMPLE_RATE as f32)); - samples_sender.open_link().await; + samples_sender.send(SampleSenderCommand::Open).await; for (m, up) in modulator.zip(up_lo) { let sample = m * up; - samples_sender.send_sample(sample.re).await; + samples_sender + .send(SampleSenderCommand::Sample(sample.re)) + .await; } - samples_sender.close_link().await; + samples_sender.send(SampleSenderCommand::Close).await; } async fn receive( @@ -422,12 +463,98 @@ impl SampleSender for DummySampleSender { } struct EguiApp { - eye_receiver_a: Receiver>, + a_transceiver: Transceiver, + b_transceiver: Transceiver, + eyes_a: VecDeque>, + eyes_b: VecDeque>, } impl EguiApp { fn new(_cc: &eframe::CreationContext<'_>) -> Self { - EguiApp {} + let (up_a_sender, mut up_a_receiver) = channel::(1024); + let (down_a_sender, down_a_receiver) = channel::(1024); + + let (up_b_sender, mut up_b_receiver) = channel::(1024); + let (down_b_sender, down_b_receiver) = channel::(1024); + + let (a2b_tx, mut a2b_rx) = channel::(1024); + let (b2a_tx, mut b2a_rx) = channel::(1024); + + let a_txrx = Transceiver::start(down_a_receiver, up_a_sender); + let b_txrx = Transceiver::start(down_b_receiver, up_b_sender); + + // A dummy channel + tokio::spawn(async move { + //let rng = rand::thread_rng(); + let mut sending = false; + loop { + let noise = rand::random::() * 0.1; + let mut sample = 0.; + + match up_a_receiver.try_recv() { + Ok(SampleSenderCommand::Open) => { + sending = true; + } + Ok(SampleSenderCommand::Close) => { + sending = false; + } + Ok(SampleSenderCommand::Sample(x)) => { + sample = x; + } + _ => {} + } + + if sending { + // Flush receiver buffer but ignore + while let Ok(_) = b2a_rx.try_recv() {} + + // Send to other + a2b_tx.send(sample + noise).await.unwrap(); + } else if let Ok(down_sample) = b2a_rx.try_recv() { + down_a_sender.send(down_sample).await.unwrap(); + } + } + }); + + // B dummy channel + tokio::spawn(async move { + let mut sending = false; + loop { + let noise = rand::random::() * 0.1; + let mut sample = 0.; + + match up_b_receiver.try_recv() { + Ok(SampleSenderCommand::Open) => { + sending = true; + } + Ok(SampleSenderCommand::Close) => { + sending = false; + } + Ok(SampleSenderCommand::Sample(x)) => { + sample = x; + } + _ => {} + } + + if sending { + // Flush receiver buffer but ignore + while let Ok(_) = a2b_rx.try_recv() {} + + // Send to other + b2a_tx.send(sample + noise).await.unwrap(); + } else if let Ok(down_sample) = a2b_rx.try_recv() { + down_b_sender.send(down_sample).await.unwrap(); + } + } + }); + + EguiApp { + a_transceiver: a_txrx, + b_transceiver: b_txrx, + + eyes_a: VecDeque::new(), + eyes_b: VecDeque::new(), + } } } @@ -436,13 +563,68 @@ impl eframe::App for EguiApp { egui::CentralPanel::default().show(ctx, |ui| { let max_eyes = 100; - while let Ok(eye) = self.eye_receiver_a.try_recv() { + while let Ok(eye) = self.a_transceiver.try_recv_eye() { self.eyes_a.push_back(eye); } while self.eyes_a.len() > max_eyes { self.eyes_a.pop_front(); } - }); + + while let Ok(eye) = self.b_transceiver.try_recv_eye() { + self.eyes_b.push_back(eye); + } + while self.eyes_b.len() > max_eyes { + self.eyes_b.pop_front(); + } + + ui.columns(2, |uis| { + Plot::new("EyeA") + .legend(Legend::default()) + .show(&mut uis[0], |plot_ui| { + //plot_ui.set_auto_bounds(Vec2b { x: false, y: false }); + for eye in self.eyes_a.iter() { + let line = Line::new( + "EyeA", + eye.iter() + .enumerate() + .map(|(i, x)| [i as f64, *x as f64]) + .collect::>(), + ) + .color(Color32::LIGHT_GREEN); + plot_ui.line(line); + } + }); + + if uis[0].button("Start").clicked() { + let snd = self.a_transceiver.get_sender(); + tokio::spawn(async move { + let _ = snd + .send("Skibditoilet".repeat(100).as_bytes().to_vec()) + .await; + }); + } + + Plot::new("EyeB") + .legend(Legend::default()) + .show(&mut uis[1], |plot_ui| { + //plot_ui.set_auto_bounds(Vec2b { x: false, y: false }); + for eye in self.eyes_b.iter() { + let line = Line::new( + "EyeB", + eye.iter() + .enumerate() + .map(|(i, x)| [i as f64, *x as f64]) + .collect::>(), + ) + .color(Color32::LIGHT_GREEN); + plot_ui.line(line); + } + }); + }); + }); // Central panel + + std::thread::sleep(Duration::from_millis(16)); + ctx.request_repaint(); } }