diff --git a/src/main.rs b/src/main.rs index 751dffc..c59f1b3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ mod filtering; mod iq; mod math; mod nco; +mod squelch; mod ted; mod units; mod windows; @@ -19,12 +20,19 @@ use std::{ collections::VecDeque, env::{self, args}, fs::File, - io::{BufWriter, Sink, Write, stdout}, + io::{BufWriter, Read, Sink, Write, stdout}, ops::DerefMut, - sync::Arc, + sync::{Arc, mpsc::RecvTimeoutError}, time::Duration, }; -use tokio::{join, net::UdpSocket, select, sync::mpsc::error::TryRecvError, time::timeout}; +use tokio::{ + io::{self, AsyncReadExt, AsyncWriteExt}, + join, + net::{TcpSocket, TcpStream, UdpSocket}, + select, + sync::mpsc::error::TryRecvError, + time::timeout, +}; use crate::{ bfsk::BFSKMod, @@ -32,6 +40,7 @@ use crate::{ filtering::{dc_block::DCBlocker, fir::FIRFilter}, iq::IQSampler, nco::Nco, + squelch::Squelch, ted::elg::ELGate, units::frequency::hz_to_rad_per_sample, }; @@ -46,6 +55,8 @@ const SAMPLE_RATE: u32 = 48000; const CENTER_FREQ: f32 = 1700.; const DEVIATION: f32 = 500.; +static mut INSTANCE_ID: u32 = 0; + pub enum SampleSenderCommand { Open, Close, @@ -93,152 +104,20 @@ impl SampleSender for WavSampleSender { } } -struct Transceiver { - tx_stream: Sender>, - rx_stream: Receiver>, - - eye_receiver: Receiver>, +struct FSKReceiver { + pos_correllator: FIRFilter, + neg_correllator: FIRFilter, + eye_sender: Sender>, + matched_lowpass: FIRFilter, + elg: ELGate, + dc_block: DCBlocker, + last_byte: u8, + frame_constructor: FrameConstructor, + bit_count: Option, } -impl Transceiver { - pub async fn send(&self, data: Vec) { - self.tx_stream.send(data).await; - } - - pub fn get_sender(&self) -> Sender> { - self.tx_stream.clone() - } - - pub fn try_recv(&mut self) -> Result, TryRecvError> { - self.rx_stream.try_recv() - } - - pub fn try_recv_eye(&mut self) -> Result, TryRecvError> { - self.eye_receiver.try_recv() - } - - pub fn start( - mut sample_stream: Receiver, - mut sample_sender: Sender, - ) -> Self { - let mut resend: Option> = None; - let (mut eyes_tx, eyes_rx) = channel::>(1024); - - let (rx_stream_sender, rx_stream_receiver) = channel::>(1024); - let (tx_stream_sender, mut tx_stream_receiver) = channel::>(1024); - - tokio::spawn(async move { - loop { - select! { - _ = Self::squelch_detector(&mut sample_stream) => - { - println!("Squelch up"); - select! - { - x = Self::receive(&mut sample_stream, &mut eyes_tx) => - { - match x - { - Err(()) => {continue;}, - Ok(Frame::Ack) => - { - resend = None; - } - Ok(Frame::Data(data)) => - { - println!("Got data frame, send data"); - let _ = rx_stream_sender.send(data).await; - tokio::time::sleep(Duration::from_secs(1)).await; - println!("Got data frame, sending ack"); - Self::transmit(Frame::Ack, &mut sample_sender).await; - println!("Sent ack"); - } - } - }, - _ = tokio::time::sleep(Duration::from_secs(100)) => {continue;}, //TODO: 65 - //sec - //timeout - } - }, // End squelch - data_opt = async - { - tokio::time::sleep(Duration::from_secs(2)).await; - if let Some(resend_data) = resend.clone() - { - Some(resend_data) - } - else - { - tx_stream_receiver.recv().await - } - } - => - { - if let Some(data) = data_opt - { - Self::transmit(Frame::Data(data.clone()), &mut sample_sender).await; - resend = Some(data); - } - } - } - } - }); - - Self { - eye_receiver: eyes_rx, - - tx_stream: tx_stream_sender, - rx_stream: rx_stream_receiver, - } - } - - async fn squelch_detector(sample_stream: &mut Receiver) { - let length = 200; - let level = 0.4; - let mut iq_sampler = IQSampler::new(hz_to_rad_per_sample(CENTER_FREQ, SAMPLE_RATE as f32)); - let mut squelch_sum = 0.; - let mut i = 0; - while let Some(smpl) = sample_stream.recv().await { - let iq = iq_sampler.sample(smpl); - squelch_sum += iq.mag() / length as f32; - i += 1; - - if i >= length { - if squelch_sum >= level { - return; - } - i = 0; - squelch_sum = 0.; - } - } - } - - pub async fn transmit(frame: Frame, samples_sender: &mut Sender) { - let bytes = frame.bytes(); - let mut bit_stream = bytes.iter().flat_map(|x| byte_to_bits(*x)); - let modulator = BFSKMod::new( - (SAMPLE_RATE as f32 / BAUD_RATE as f32).round() as u32, - hz_to_rad_per_sample(DEVIATION, SAMPLE_RATE as f32), - &mut bit_stream, - ); - - let up_lo = Nco::new(hz_to_rad_per_sample(CENTER_FREQ, SAMPLE_RATE as f32)); - samples_sender.send(SampleSenderCommand::Open).await; - for (m, up) in modulator.zip(up_lo) { - let sample = m * up; - samples_sender - .send(SampleSenderCommand::Sample(sample.re)) - .await; - } - samples_sender.send(SampleSenderCommand::Close).await; - } - - async fn receive( - sample_stream: &mut Receiver, - eye_sender: &mut Sender>, - ) -> Result { - let mut iq_sampler = IQSampler::new(hz_to_rad_per_sample(CENTER_FREQ, SAMPLE_RATE as f32)); - +impl FSKReceiver { + fn new(eye_sender: Sender>) -> Self { let samples_per_symbol = (SAMPLE_RATE as f32) / (BAUD_RATE as f32); let correllator_length = samples_per_symbol as usize; @@ -270,61 +149,166 @@ impl Transceiver { let mut loop_ir = vec![Complex32::new(loop_i, 0.); samples_per_symbol as usize]; loop_ir.push(Complex32::new(loop_p, 0.)); let mut elg = ELGate::new(samples_per_symbol, FIRFilter::new(&loop_ir)); + Self { + //iq_sampler: IQSampler::new(hz_to_rad_per_sample(CENTER_FREQ, SAMPLE_RATE as f32)), + pos_correllator, + neg_correllator, + matched_lowpass, + dc_block, + elg, + last_byte: 0x00u8, + frame_constructor: FrameConstructor::new(), + bit_count: None, + eye_sender, + } + } + async fn receive(&mut self, iq: Complex32) -> Result, FrameConstructionError> { // Frame reconstruction - let mut last_byte = 0x00u8; - let mut frame_constructor = FrameConstructor::new(); - let mut bit_count: Option = None; - while let Some(sample) = sample_stream.recv().await { - let iq = iq_sampler.sample(sample); - let matched = - matched_lowpass - .next_real(dc_block.next_real( - pos_correllator.next(iq).mag() - neg_correllator.next(iq).mag(), - )); - if let Some((bit_sample, eye)) = elg.next_eye(matched) { - let _ = eye_sender.send(eye).await; - last_byte >>= 1; - last_byte |= ((bit_sample > 0.) as u8) << 7; - //last_byte <<= 1; - //last_byte |= ((bit_sample < 0.) as u8); - bit_count = bit_count.map(|x| x + 1); + let matched = + self.matched_lowpass.next_real(self.dc_block.next_real( + self.pos_correllator.next(iq).mag() - self.neg_correllator.next(iq).mag(), + )); + if let Some((bit_sample, eye)) = self.elg.next_eye(matched) { + let _ = self.eye_sender.send(eye).await; + self.last_byte >>= 1; + self.last_byte |= ((bit_sample > 0.) as u8) << 7; + //last_byte <<= 1; + //last_byte |= ((bit_sample < 0.) as u8); + self.bit_count = self.bit_count.map(|x| x + 1); - if let None = bit_count - && last_byte == 0xD8 - { - // Potential frame starts - last_byte = 0; - frame_constructor = FrameConstructor::new(); - bit_count = Some(0); - } + if let None = self.bit_count + && self.last_byte == 0xD8 + { + // Potential frame starts + self.last_byte = 0; + self.frame_constructor = FrameConstructor::new(); + self.bit_count = Some(0); + } - if let Some(8) = bit_count { - let frame_opt = frame_constructor.add_byte(last_byte); - bit_count = Some(0); - //print!("{}", last_byte as char); - print!(".{:x}.", last_byte); - let _ = std::io::stdout().flush(); - - if let Ok(Some(Frame::Ack)) = frame_opt { - println!("Got ack"); - return Ok(Frame::Ack); - } - - if let Ok(Some(Frame::Data(ref frame_data))) = frame_opt { - println!("Got data"); - return Ok(Frame::Data(frame_data.to_vec())); - } - - if let Err(()) = frame_opt { - // Erroneous frame - println!("Error"); - return Err(()); - } - } + if let Some(8) = self.bit_count { + let frame_opt = self.frame_constructor.add_byte(self.last_byte); + self.bit_count = Some(0); + //print!("{}", last_byte as char); + print!(".{:x}.", self.last_byte); + let _ = std::io::stdout().flush(); + return frame_opt; } } - return Err(()); + return Ok(None); + } +} + +struct Transceiver { + tx_stream: Sender>, + rx_stream: Receiver>, + + eye_receiver: Receiver>, +} + +impl Transceiver { + pub async fn send(&self, data: Vec) { + self.tx_stream.send(data).await; + } + + pub fn get_sender(&self) -> Sender> { + self.tx_stream.clone() + } + + pub fn try_recv(&mut self) -> Result, TryRecvError> { + self.rx_stream.try_recv() + } + + pub fn try_recv_eye(&mut self) -> Result, TryRecvError> { + self.eye_receiver.try_recv() + } + + pub fn start( + mut sample_stream: Receiver, + mut sample_sender: Sender, + ) -> Self { + let mut resend: Option> = None; + let (mut eyes_tx, eyes_rx) = channel::>(1024); + + let (rx_stream_sender, mut rx_stream_receiver) = channel::>(1024); + let (tx_stream_sender, mut tx_stream_receiver) = channel::>(1024); + + let receiving = Arc::new(RwLock::new(false)); + tokio::spawn(async move { + let mut squelch = Squelch::new(200, 0.1); + let mut iq_sampler = + IQSampler::new(hz_to_rad_per_sample(CENTER_FREQ, SAMPLE_RATE as f32)); + + let mut current_message = None; + loop { + select! { + _ = async { + while squelch.next(iq_sampler.sample(sample_stream.recv().await.unwrap())).is_none() {} + } + => + { + // Wait for end of tranmission + let mut recv = Some(FSKReceiver::new(eyes_tx.clone())); + while let Some(iq) = squelch.next(iq_sampler.sample(sample_stream.recv().await.unwrap())) + { + if recv.as_ref().is_some() + { + match recv.as_mut().unwrap().receive(iq).await + { + Ok(Some(Frame::Data(_))) => {println!("GOT DATA"); Self::transmit(Frame::Ack, &mut sample_sender).await; recv = None;}, + Ok(Some(Frame::Ack)) => {current_message = None; recv = None;}, + Err(()) => {recv = None;}, + _ => {} + } + } + } + println!("EOT"); + }, + message = async + { + if current_message.is_none() + { + current_message = Some((tx_stream_receiver).recv().await.unwrap()); + } + tokio::time::sleep(Duration::from_secs(rand::random_range(1..3))).await; + current_message.as_ref().unwrap() + } => + { + println!("Sending message"); + Self::transmit(Frame::Data(message.clone()), &mut sample_sender).await; + //current_message = None; + println!("Sent message"); + } + }; + } + }); + + Self { + eye_receiver: eyes_rx, + + tx_stream: tx_stream_sender, + rx_stream: rx_stream_receiver, + } + } + + pub async fn transmit(frame: Frame, samples_sender: &mut Sender) { + let bytes = frame.bytes(); + let mut bit_stream = bytes.iter().flat_map(|x| byte_to_bits(*x)); + let modulator = BFSKMod::new( + (SAMPLE_RATE as f32 / BAUD_RATE as f32).round() as u32, + hz_to_rad_per_sample(DEVIATION, SAMPLE_RATE as f32), + &mut bit_stream, + ); + + let up_lo = Nco::new(hz_to_rad_per_sample(CENTER_FREQ, SAMPLE_RATE as f32)); + samples_sender.send(SampleSenderCommand::Open).await; + for (m, up) in modulator.zip(up_lo) { + let sample = m * up; + samples_sender + .send(SampleSenderCommand::Sample(sample.re)) + .await; + } + samples_sender.send(SampleSenderCommand::Close).await; } } @@ -440,6 +424,16 @@ impl Frame { #[tokio::main] async fn main() { + // Read instance + let id = std::env::args().collect::>()[1] + .parse::() + .expect("NO INPUT ID"); + assert!(id == 0 || id == 1); + + unsafe { + INSTANCE_ID = id; + }; + //Transceiver::transmit(Frame::Data("Skibditoilet".repeat(100).bytes().collect::>()), &mut WavSampleSender{}).await; //Transceiver::transmit(Frame::Ack, &mut WavSampleSender::default()).await; //return; @@ -463,97 +457,95 @@ impl SampleSender for DummySampleSender { } struct EguiApp { - a_transceiver: Transceiver, - b_transceiver: Transceiver, + transceiver: Transceiver, - eyes_a: VecDeque>, - eyes_b: VecDeque>, + eyes: VecDeque>, } impl EguiApp { fn new(_cc: &eframe::CreationContext<'_>) -> Self { - let (up_a_sender, mut up_a_receiver) = channel::(1024); - let (down_a_sender, down_a_receiver) = channel::(1024); + let (up_sender, mut up_receiver) = channel::(1024); + let (down_sender, down_receiver) = channel::(1024); - let (up_b_sender, mut up_b_receiver) = channel::(1024); - let (down_b_sender, down_b_receiver) = channel::(1024); + let transceiver = Transceiver::start(down_receiver, up_sender); - let (a2b_tx, mut a2b_rx) = channel::(1024); - let (b2a_tx, mut b2a_rx) = channel::(1024); + let instance_id = unsafe { INSTANCE_ID }; + tokio::task::spawn(async move { + println!("Waiting for connection ..."); - let a_txrx = Transceiver::start(down_a_receiver, up_a_sender); - let b_txrx = Transceiver::start(down_b_receiver, up_b_sender); + // let socket = Arc::new( + // UdpSocket::bind(format!("0.0.0.0:{}", 9000 + instance_id)) + // .await + // .unwrap(), + // ); + // socket + // .connect(format!("127.0.0.1:{}", 9000 + (1 - instance_id))) + // .await + // .unwrap(); - // A dummy channel - tokio::spawn(async move { - //let rng = rand::thread_rng(); - let mut sending = false; - loop { - let noise = rand::random::() * 0.1; - let mut sample = 0.; + let mut stream; - match up_a_receiver.try_recv() { - Ok(SampleSenderCommand::Open) => { - sending = true; - } - Ok(SampleSenderCommand::Close) => { - sending = false; - } - Ok(SampleSenderCommand::Sample(x)) => { - sample = x; - } - _ => {} - } - - if sending { - // Flush receiver buffer but ignore - while let Ok(_) = b2a_rx.try_recv() {} - - // Send to other - a2b_tx.send(sample + noise).await.unwrap(); - } else if let Ok(down_sample) = b2a_rx.try_recv() { - down_a_sender.send(down_sample).await.unwrap(); - } + if instance_id == 0 { + let socket = TcpSocket::new_v4().unwrap(); + socket.bind("0.0.0.0:9000".parse().unwrap()).unwrap(); + stream = socket.listen(1).unwrap().accept().await.unwrap().0; + } else { + stream = TcpStream::connect("127.0.0.1:9000").await.unwrap(); } - }); - // B dummy channel - tokio::spawn(async move { + println!("Connected"); + + let (mut sock_recv, mut sock_send) = io::split(stream); + // Receiving end + tokio::spawn(async move { + let mut buf = [0u8; 65536]; + while let Ok(size) = sock_recv.read(&mut buf).await { + assert!(buf.len() % 4 == 0); + for x in buf.chunks(4).take(size / 4) { + let _ = down_sender.try_send(f32::from_ne_bytes(x.try_into().unwrap())); + } + } + }); + + // Sending end let mut sending = false; + let mut wait_countdown = 0; loop { + // Up link + //let mut sample = 0.; let noise = rand::random::() * 0.1; - let mut sample = 0.; - - match up_b_receiver.try_recv() { - Ok(SampleSenderCommand::Open) => { - sending = true; + if let Ok(x) = up_receiver.try_recv() { + match x { + SampleSenderCommand::Open => { + sending = true; + } + SampleSenderCommand::Close => { + sending = false; + } + SampleSenderCommand::Sample(x) => { + if sending { + let _ = sock_send.write(&(x + noise).to_ne_bytes()).await; + wait_countdown += 1; + } + } } - Ok(SampleSenderCommand::Close) => { - sending = false; - } - Ok(SampleSenderCommand::Sample(x)) => { - sample = x; - } - _ => {} } - if sending { - // Flush receiver buffer but ignore - while let Ok(_) = a2b_rx.try_recv() {} + if !sending { + let _ = sock_send.write(&noise.to_ne_bytes()).await; + wait_countdown += 1; + } - // Send to other - b2a_tx.send(sample + noise).await.unwrap(); - } else if let Ok(down_sample) = a2b_rx.try_recv() { - down_b_sender.send(down_sample).await.unwrap(); + if wait_countdown >= 480 { + tokio::time::sleep(Duration::from_millis(10)).await; + wait_countdown = 0; } } }); EguiApp { - a_transceiver: a_txrx, - b_transceiver: b_txrx, + transceiver, - eyes_a: VecDeque::new(), - eyes_b: VecDeque::new(), + eyes: VecDeque::new(), } } } @@ -563,64 +555,38 @@ impl eframe::App for EguiApp { egui::CentralPanel::default().show(ctx, |ui| { let max_eyes = 100; - while let Ok(eye) = self.a_transceiver.try_recv_eye() { - self.eyes_a.push_back(eye); + while let Ok(eye) = self.transceiver.try_recv_eye() { + self.eyes.push_back(eye); } - while self.eyes_a.len() > max_eyes { - self.eyes_a.pop_front(); + while self.eyes.len() > max_eyes { + self.eyes.pop_front(); } - while let Ok(eye) = self.b_transceiver.try_recv_eye() { - self.eyes_b.push_back(eye); + Plot::new("EyeA") + .legend(Legend::default()) + .show(ui, |plot_ui| { + //plot_ui.set_auto_bounds(Vec2b { x: false, y: false }); + for eye in self.eyes.iter() { + let line = Line::new( + "EyeA", + eye.iter() + .enumerate() + .map(|(i, x)| [i as f64, *x as f64]) + .collect::>(), + ) + .color(Color32::LIGHT_GREEN); + plot_ui.line(line); + } + }); + + if ui.button("Start").clicked() { + let snd = self.transceiver.get_sender(); + tokio::spawn(async move { + let _ = snd + .send("Skibditoilet".repeat(10).as_bytes().to_vec()) + .await; + }); } - while self.eyes_b.len() > max_eyes { - self.eyes_b.pop_front(); - } - - ui.columns(2, |uis| { - Plot::new("EyeA") - .legend(Legend::default()) - .show(&mut uis[0], |plot_ui| { - //plot_ui.set_auto_bounds(Vec2b { x: false, y: false }); - for eye in self.eyes_a.iter() { - let line = Line::new( - "EyeA", - eye.iter() - .enumerate() - .map(|(i, x)| [i as f64, *x as f64]) - .collect::>(), - ) - .color(Color32::LIGHT_GREEN); - plot_ui.line(line); - } - }); - - if uis[0].button("Start").clicked() { - let snd = self.a_transceiver.get_sender(); - tokio::spawn(async move { - let _ = snd - .send("Skibditoilet".repeat(100).as_bytes().to_vec()) - .await; - }); - } - - Plot::new("EyeB") - .legend(Legend::default()) - .show(&mut uis[1], |plot_ui| { - //plot_ui.set_auto_bounds(Vec2b { x: false, y: false }); - for eye in self.eyes_b.iter() { - let line = Line::new( - "EyeB", - eye.iter() - .enumerate() - .map(|(i, x)| [i as f64, *x as f64]) - .collect::>(), - ) - .color(Color32::LIGHT_GREEN); - plot_ui.line(line); - } - }); - }); }); // Central panel std::thread::sleep(Duration::from_millis(16)); diff --git a/src/squelch.rs b/src/squelch.rs new file mode 100644 index 0000000..720692b --- /dev/null +++ b/src/squelch.rs @@ -0,0 +1,34 @@ +use std::collections::VecDeque; + +use rand::seq::index::sample; + +use crate::complex::Complex32; + +pub struct Squelch { + window: VecDeque, + sum: f32, + level: f32, +} + +impl Squelch { + pub fn new(length: usize, level: f32) -> Self { + Squelch { + window: VecDeque::from(vec![0.; length]), + sum: 0., + level, + } + } + + pub fn next(&mut self, sample: Complex32) -> Option { + let oldest = self.window.pop_back().unwrap(); + self.window.push_front(sample.mag()); + self.sum -= oldest; + self.sum += sample.mag(); + + if self.sum / (self.window.len() as f32) > self.level { + Some(sample) + } else { + None + } + } +}