#![allow(dead_code)] mod bfsk; mod complex; pub mod fft; mod filtering; mod iq; mod math; mod nco; mod squelch; mod ted; mod units; mod windows; use egui_plot::{Legend, Line, Plot}; use hound::WavWriter; use rand::{Rng, rand_core::le, seq::index::sample}; use std::{ cell::{Cell, RefCell}, collections::VecDeque, env::{self, args}, fs::File, io::{BufWriter, Read, Sink, Write, stdout}, ops::DerefMut, sync::{Arc, mpsc::RecvTimeoutError}, time::Duration, }; use tokio::{ io::{self, AsyncReadExt, AsyncWriteExt}, join, net::{TcpSocket, TcpStream, UdpSocket}, select, sync::mpsc::error::TryRecvError, time::timeout, }; use crate::{ bfsk::BFSKMod, complex::Complex32, filtering::{dc_block::DCBlocker, fir::FIRFilter}, iq::IQSampler, nco::Nco, squelch::Squelch, ted::elg::ELGate, units::frequency::hz_to_rad_per_sample, }; use eframe::egui::{self, CentralPanel, Color32}; use tokio::sync::RwLock; use tokio::sync::mpsc::{Receiver, Sender, channel}; const BAUD_RATE: u32 = 1000; const SAMPLE_RATE: u32 = 48000; // Modulation parameters const CENTER_FREQ: f32 = 1700.; const DEVIATION: f32 = 500.; static mut INSTANCE_ID: u32 = 0; pub enum SampleSenderCommand { Open, Close, Sample(f32), } pub trait SampleSender { async fn open_link(&mut self); async fn send_sample(&mut self, sample: f32); async fn close_link(&mut self); } struct WavSampleSender { writer: Option>>, } impl Default for WavSampleSender { fn default() -> Self { Self { writer: None } } } impl SampleSender for WavSampleSender { async fn open_link(&mut self) { let spec = hound::WavSpec { channels: 1, sample_rate: SAMPLE_RATE, bits_per_sample: 16, sample_format: hound::SampleFormat::Int, }; self.writer = Some(hound::WavWriter::create("audio/modulated.wav", spec).unwrap()); } async fn send_sample(&mut self, sample: f32) { let out_sample = (sample * i16::MAX as f32) as i16; self.writer .as_mut() .unwrap() .write_sample(out_sample) .unwrap(); } async fn close_link(&mut self) { self.writer = None; } } struct FSKReceiver { pos_correllator: FIRFilter, neg_correllator: FIRFilter, eye_sender: Sender>, matched_lowpass: FIRFilter, elg: ELGate, dc_block: DCBlocker, last_byte: u8, frame_constructor: FrameConstructor, bit_count: Option, } impl FSKReceiver { fn new(eye_sender: Sender>) -> Self { let samples_per_symbol = (SAMPLE_RATE as f32) / (BAUD_RATE as f32); let correllator_length = samples_per_symbol as usize; let mut pos_nco = Nco::new(hz_to_rad_per_sample(DEVIATION, SAMPLE_RATE as f32)); let mut neg_nco = Nco::new(hz_to_rad_per_sample(-DEVIATION, SAMPLE_RATE as f32)); let pos_ir = (0..correllator_length).map(|i| { pos_nco.step(); pos_nco.cexp() * windows::blackmann(i as f32 / correllator_length as f32) }); let neg_ir = (0..correllator_length).map(|i| { neg_nco.step(); neg_nco.cexp() * windows::blackmann(i as f32 / correllator_length as f32) }); let mut pos_correllator = FIRFilter::new(&pos_ir.collect::>()); let mut neg_correllator = FIRFilter::new(&neg_ir.collect::>()); pos_correllator.normalize_freq(hz_to_rad_per_sample(DEVIATION, SAMPLE_RATE as f32)); neg_correllator.normalize_freq(hz_to_rad_per_sample(-DEVIATION, SAMPLE_RATE as f32)); let mut matched_lowpass = FIRFilter::new(&vec![ Complex32::new(1., 0.); samples_per_symbol as usize / 2 ]); matched_lowpass.normalize_freq(hz_to_rad_per_sample(DEVIATION, SAMPLE_RATE as f32)); //let mut dc_block = DCBlocker::new(0.999); let mut dc_block = DCBlocker::new(1.); let loop_i = 0.0; let loop_p = 0.1; let mut loop_ir = vec![Complex32::new(loop_i, 0.); samples_per_symbol as usize]; loop_ir.push(Complex32::new(loop_p, 0.)); let mut elg = ELGate::new(samples_per_symbol, FIRFilter::new(&loop_ir)); Self { //iq_sampler: IQSampler::new(hz_to_rad_per_sample(CENTER_FREQ, SAMPLE_RATE as f32)), pos_correllator, neg_correllator, matched_lowpass, dc_block, elg, last_byte: 0x00u8, frame_constructor: FrameConstructor::new(), bit_count: None, eye_sender, } } async fn receive(&mut self, iq: Complex32) -> Result, FrameConstructionError> { // Frame reconstruction let matched = self.matched_lowpass.next_real(self.dc_block.next_real( self.pos_correllator.next(iq).mag() - self.neg_correllator.next(iq).mag(), )); if let Some((bit_sample, eye)) = self.elg.next_eye(matched) { let _ = self.eye_sender.send(eye).await; self.last_byte >>= 1; self.last_byte |= ((bit_sample > 0.) as u8) << 7; //last_byte <<= 1; //last_byte |= ((bit_sample < 0.) as u8); self.bit_count = self.bit_count.map(|x| x + 1); if let None = self.bit_count && self.last_byte == 0xD8 { // Potential frame starts self.last_byte = 0; self.frame_constructor = FrameConstructor::new(); self.bit_count = Some(0); } if let Some(8) = self.bit_count { let frame_opt = self.frame_constructor.add_byte(self.last_byte); self.bit_count = Some(0); //print!("{}", last_byte as char); print!(".{:x}.", self.last_byte); let _ = std::io::stdout().flush(); return frame_opt; } } return Ok(None); } } struct Transceiver { tx_stream: Sender>, rx_stream: Receiver>, eye_receiver: Receiver>, } impl Transceiver { pub async fn send(&self, data: Vec) { self.tx_stream.send(data).await; } pub fn get_sender(&self) -> Sender> { self.tx_stream.clone() } pub fn try_recv(&mut self) -> Result, TryRecvError> { self.rx_stream.try_recv() } pub fn try_recv_eye(&mut self) -> Result, TryRecvError> { self.eye_receiver.try_recv() } pub fn start( mut sample_stream: Receiver, mut sample_sender: Sender, ) -> Self { let mut resend: Option> = None; let (mut eyes_tx, eyes_rx) = channel::>(1024); let (rx_stream_sender, mut rx_stream_receiver) = channel::>(1024); let (tx_stream_sender, mut tx_stream_receiver) = channel::>(1024); let receiving = Arc::new(RwLock::new(false)); tokio::spawn(async move { let mut squelch = Squelch::new(200, 0.1); let mut iq_sampler = IQSampler::new(hz_to_rad_per_sample(CENTER_FREQ, SAMPLE_RATE as f32)); let mut current_message = None; loop { select! { _ = async { while squelch.next(iq_sampler.sample(sample_stream.recv().await.unwrap())).is_none() {} } => { // Wait for end of tranmission let mut recv = Some(FSKReceiver::new(eyes_tx.clone())); while let Some(iq) = squelch.next(iq_sampler.sample(sample_stream.recv().await.unwrap())) { if recv.as_ref().is_some() { match recv.as_mut().unwrap().receive(iq).await { Ok(Some(Frame::Data(_))) => {println!("GOT DATA"); Self::transmit(Frame::Ack, &mut sample_sender).await; recv = None;}, Ok(Some(Frame::Ack)) => {current_message = None; recv = None;}, Err(()) => {recv = None;}, _ => {} } } } println!("EOT"); }, message = async { if current_message.is_none() { current_message = Some((tx_stream_receiver).recv().await.unwrap()); } tokio::time::sleep(Duration::from_secs(rand::random_range(1..3))).await; current_message.as_ref().unwrap() } => { println!("Sending message"); Self::transmit(Frame::Data(message.clone()), &mut sample_sender).await; //current_message = None; println!("Sent message"); } }; } }); Self { eye_receiver: eyes_rx, tx_stream: tx_stream_sender, rx_stream: rx_stream_receiver, } } pub async fn transmit(frame: Frame, samples_sender: &mut Sender) { let bytes = frame.bytes(); let mut bit_stream = bytes.iter().flat_map(|x| byte_to_bits(*x)); let modulator = BFSKMod::new( (SAMPLE_RATE as f32 / BAUD_RATE as f32).round() as u32, hz_to_rad_per_sample(DEVIATION, SAMPLE_RATE as f32), &mut bit_stream, ); let up_lo = Nco::new(hz_to_rad_per_sample(CENTER_FREQ, SAMPLE_RATE as f32)); samples_sender.send(SampleSenderCommand::Open).await; for (m, up) in modulator.zip(up_lo) { let sample = m * up; samples_sender .send(SampleSenderCommand::Sample(sample.re)) .await; } samples_sender.send(SampleSenderCommand::Close).await; } } enum Frame { Data(Vec), Ack, } type FrameConstructionError = (); pub struct FrameConstructor { frame: Vec, frame_countdown: Option, checksum: u8, started: bool, } impl FrameConstructor { pub fn new() -> Self { Self { frame: Vec::new(), frame_countdown: None, checksum: 0u8, started: false, } } pub fn add_byte(&mut self, byte: u8) -> Result, FrameConstructionError> { if self.frame.is_empty() && byte != 0xC4 && byte != 0x4C && !self.started { println!("Wrong type {:x}", byte); self.started = true; return Err(()); } if self.frame.is_empty() && byte == 0xC4 && !self.started { self.started = true; return Ok(Some(Frame::Ack)); } if self.frame.is_empty() && byte == 0x4C && !self.started { self.started = true; return Ok(None); } self.frame.push(byte); // Retrieve length if self.frame.len() == 1 { self.frame_countdown = Some(self.frame[0] as u16); return Ok(None); } if self.frame.len() == 2 { *self.frame_countdown.as_mut().unwrap() |= (self.frame[1] as u16) << 8; return Ok(None); } if self.frame_countdown.unwrap() == 0 { // All data has been received if self.checksum == byte { return Ok(Some(Frame::Data( self.frame.iter().skip(2).copied().collect(), ))); } println!("Checksum failed"); return Err(()); } //self.frame.push(byte); self.checksum ^= byte; *self.frame_countdown.as_mut().unwrap() -= 1; Ok(None) } } impl Frame { pub fn bytes(&self) -> Vec { let mut output_bytes = vec![]; // Initial training sequence output_bytes.append(&mut vec![0b01010101; 64]); // Preamble byte output_bytes.push(0xD8); // Command match self { Frame::Data(x) => { assert!(x.len() < 65536, "Data size over MTU"); output_bytes.push(0x4C); // DATA FRAME let len_u16 = x.len() as u16; output_bytes.push((len_u16 & 0xFF).try_into().unwrap()); output_bytes.push(((len_u16 >> 8) & 0xFF).try_into().unwrap()); let mut checksum = 0u8; x.iter().for_each(|x| checksum ^= x); output_bytes.extend(x.iter()); output_bytes.push(checksum); } Frame::Ack => { output_bytes.push(0xC4); // ACK FRAME } } // SEND EOT output_bytes.extend(std::iter::repeat_n(4, 32)); output_bytes } } #[tokio::main] async fn main() { // Read instance let id = std::env::args().collect::>()[1] .parse::() .expect("NO INPUT ID"); assert!(id == 0 || id == 1); unsafe { INSTANCE_ID = id; }; //Transceiver::transmit(Frame::Data("Skibditoilet".repeat(100).bytes().collect::>()), &mut WavSampleSender{}).await; //Transceiver::transmit(Frame::Ack, &mut WavSampleSender::default()).await; //return; let native_options = eframe::NativeOptions::default(); let _ = eframe::run_native( "Egui", native_options, Box::new(|cc| Ok(Box::new(EguiApp::new(cc)))), ); } //#[derive(Default)] struct DummySampleSender(); impl SampleSender for DummySampleSender { async fn open_link(&mut self) {} async fn send_sample(&mut self, _sample: f32) {} async fn close_link(&mut self) {} } struct EguiApp { transceiver: Transceiver, eyes: VecDeque>, } impl EguiApp { fn new(_cc: &eframe::CreationContext<'_>) -> Self { let (up_sender, mut up_receiver) = channel::(1024); let (down_sender, down_receiver) = channel::(1024); let transceiver = Transceiver::start(down_receiver, up_sender); let instance_id = unsafe { INSTANCE_ID }; tokio::task::spawn(async move { println!("Waiting for connection ..."); // let socket = Arc::new( // UdpSocket::bind(format!("0.0.0.0:{}", 9000 + instance_id)) // .await // .unwrap(), // ); // socket // .connect(format!("127.0.0.1:{}", 9000 + (1 - instance_id))) // .await // .unwrap(); let mut stream; if instance_id == 0 { let socket = TcpSocket::new_v4().unwrap(); socket.bind("0.0.0.0:9000".parse().unwrap()).unwrap(); stream = socket.listen(1).unwrap().accept().await.unwrap().0; } else { stream = TcpStream::connect("127.0.0.1:9000").await.unwrap(); } println!("Connected"); let (mut sock_recv, mut sock_send) = io::split(stream); // Receiving end tokio::spawn(async move { let mut buf = [0u8; 65536]; while let Ok(size) = sock_recv.read(&mut buf).await { assert!(buf.len() % 4 == 0); for x in buf.chunks(4).take(size / 4) { let _ = down_sender.try_send(f32::from_ne_bytes(x.try_into().unwrap())); } } }); // Sending end let mut sending = false; let mut wait_countdown = 0; loop { // Up link //let mut sample = 0.; let noise = rand::random::() * 0.1; if let Ok(x) = up_receiver.try_recv() { match x { SampleSenderCommand::Open => { sending = true; } SampleSenderCommand::Close => { sending = false; } SampleSenderCommand::Sample(x) => { if sending { let _ = sock_send.write(&(x + noise).to_ne_bytes()).await; wait_countdown += 1; } } } } if !sending { let _ = sock_send.write(&noise.to_ne_bytes()).await; wait_countdown += 1; } if wait_countdown >= 480 { tokio::time::sleep(Duration::from_millis(10)).await; wait_countdown = 0; } } }); EguiApp { transceiver, eyes: VecDeque::new(), } } } impl eframe::App for EguiApp { fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) { egui::CentralPanel::default().show(ctx, |ui| { let max_eyes = 100; while let Ok(eye) = self.transceiver.try_recv_eye() { self.eyes.push_back(eye); } while self.eyes.len() > max_eyes { self.eyes.pop_front(); } Plot::new("EyeA") .legend(Legend::default()) .show(ui, |plot_ui| { //plot_ui.set_auto_bounds(Vec2b { x: false, y: false }); for eye in self.eyes.iter() { let line = Line::new( "EyeA", eye.iter() .enumerate() .map(|(i, x)| [i as f64, *x as f64]) .collect::>(), ) .color(Color32::LIGHT_GREEN); plot_ui.line(line); } }); if ui.button("Start").clicked() { let snd = self.transceiver.get_sender(); tokio::spawn(async move { let _ = snd .send("Skibditoilet".repeat(10).as_bytes().to_vec()) .await; }); } }); // Central panel std::thread::sleep(Duration::from_millis(16)); ctx.request_repaint(); } } fn byte_to_bits(byte: u8) -> Vec { vec![ byte & 1 == 1, (byte >> 1) & 1 == 1, (byte >> 2) & 1 == 1, (byte >> 3) & 1 == 1, (byte >> 4) & 1 == 1, (byte >> 5) & 1 == 1, (byte >> 6) & 1 == 1, (byte >> 7) & 1 == 1, ] } fn bits_to_byte(bits: &[bool]) -> u8 { bits[0] as u8 | (bits[1] as u8) << 1 | (bits[2] as u8) << 2 | (bits[3] as u8) << 3 | (bits[4] as u8) << 4 | (bits[5] as u8) << 5 | (bits[6] as u8) << 6 | (bits[7] as u8) << 7 }