diff --git a/src/main.rs b/src/main.rs index 710b387..9332bd5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,7 +23,7 @@ use std::{ fs::File, io::{BufWriter, Read, Sink, Write, stdout}, ops::DerefMut, - sync::{Arc, mpsc::RecvTimeoutError}, + sync::{Arc, atomic::AtomicU64, mpsc::RecvTimeoutError}, time::Duration, u64, }; @@ -32,7 +32,7 @@ use tokio::{ join, net::{TcpSocket, TcpStream, UdpSocket}, select, - sync::mpsc::error::TryRecvError, + sync::mpsc::{UnboundedSender, error::TryRecvError, unbounded_channel}, time::timeout, }; @@ -50,7 +50,7 @@ use eframe::egui::{self, CentralPanel, Color32}; use tokio::sync::RwLock; use tokio::sync::mpsc::{Receiver, Sender, channel}; -const BAUD_RATE: u32 = 1000; +const BAUD_RATE: u32 = 700; const SAMPLE_RATE: u32 = 48000; // Modulation parameters @@ -225,10 +225,7 @@ impl Transceiver { self.eye_receiver.try_recv() } - pub fn start( - mut sample_stream: Receiver, - mut sample_sender: Sender, - ) -> Self { + pub fn start(mut sample_stream: Receiver, mut sample_sender: Sender>) -> Self { let mut resend: Option> = None; let (mut eyes_tx, eyes_rx) = channel::>(1024); @@ -293,7 +290,7 @@ impl Transceiver { } } - pub async fn transmit(frame: Frame, samples_sender: &mut Sender) { + pub async fn transmit(frame: Frame, samples_sender: &mut Sender>) { let bytes = frame.bytes(); let mut bit_stream = bytes.iter().flat_map(|x| byte_to_bits(*x)); let modulator = BFSKMod::new( @@ -303,14 +300,14 @@ impl Transceiver { ); let up_lo = Nco::new(hz_to_rad_per_sample(CENTER_FREQ, SAMPLE_RATE as f32)); - samples_sender.send(SampleSenderCommand::Open).await; + let mut samples = vec![]; for (m, up) in modulator.zip(up_lo) { let sample = m * up; - samples_sender - .send(SampleSenderCommand::Sample(sample.re)) - .await; + samples.push(sample.re); } - samples_sender.send(SampleSenderCommand::Close).await; + let len = samples.len(); + samples_sender.send(samples).await.unwrap(); + tokio::time::sleep(Duration::from_secs_f32(len as f32 / SAMPLE_RATE as f32)).await; } } @@ -465,7 +462,7 @@ struct EguiApp { } impl EguiApp { fn new(_cc: &eframe::CreationContext<'_>) -> Self { - let (up_sender, mut up_receiver) = channel::(1024); + let (up_sender, mut up_receiver) = channel::>(16); let (down_sender, down_receiver) = channel::(1024); let transceiver = Transceiver::start(down_receiver, up_sender); @@ -522,18 +519,26 @@ impl EguiApp { .with_sample_rate(cpal::SampleRate(48_000)) .config(); + let output_samples = Arc::new(std::sync::RwLock::new(Vec::new())); + let progression = Arc::new(AtomicU64::new(0)); + + let output_samples2 = output_samples.clone(); + let progression2 = progression.clone(); + + let (finished_tx, mut finished_rx) = channel::<()>(16); let send_stream = device .build_output_stream( &config, move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { + let output_samples = output_samples.read().unwrap(); + let len = output_samples.len(); for d in data.iter_mut() { - loop { - if let Ok(SampleSenderCommand::Sample(smpl)) = - up_receiver.try_recv() - { - *d = smpl; - break; - } + let i = progression.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + as usize; + if i < len { + *d = output_samples[i]; + } else { + let _ = finished_tx.blocking_send(()); } } }, @@ -544,9 +549,16 @@ impl EguiApp { ) .unwrap(); - send_stream.play().unwrap(); + while let Some(data) = up_receiver.recv().await { + progression2.store(0, std::sync::atomic::Ordering::Relaxed); + *output_samples2.write().unwrap() = data; - tokio::time::sleep(Duration::from_secs(u64::MAX)).await; + send_stream.play().unwrap(); + + let _ = finished_rx.recv().await; + + let _ = send_stream.pause(); + } }); EguiApp {