diff --git a/example/output.wav b/example/output.wav new file mode 100644 index 0000000..cd189b0 Binary files /dev/null and b/example/output.wav differ diff --git a/example/src/main.rs b/example/src/main.rs index f85953d..954db26 100644 --- a/example/src/main.rs +++ b/example/src/main.rs @@ -35,6 +35,7 @@ use oxydsp_flowgraph::flowgraph; use oxydsp_flowgraph::graph::FlowGraph; use rand::random; +use crate::receiver::RadioReceiver; use crate::transmitter::Transmitter; pub mod receiver; @@ -42,19 +43,28 @@ pub mod transmitter; fn main() { - let tx = Transmitter::start_new(); - - loop + if std::env::args().len() == 2 { - let mut user_input = String::new(); - io::stdin().read_line(&mut user_input).unwrap(); - println!("Transmitting ..."); - tx.transmit(user_input.as_bytes().to_vec()); + println!("Transmitter"); + let tx = Transmitter::start_new(); + + loop + { + let mut user_input = String::new(); + io::stdin().read_line(&mut user_input).unwrap(); + println!("Transmitting ..."); + tx.transmit(user_input.as_bytes().to_vec()); + } + } + else + { + println!("Receiver"); + let _tx = RadioReceiver::start_new(); } } pub const SAMPLE_RATE: usize = 48_000; -pub const SAMPLE_PER_SYMBOL: usize = 96; +pub const SAMPLE_PER_SYMBOL: usize = 48; pub const DEVIATION: f64 = 500.; pub const CARRIER: f64 = 1700.; diff --git a/example/src/receiver.rs b/example/src/receiver.rs index a50ea4e..830102d 100644 --- a/example/src/receiver.rs +++ b/example/src/receiver.rs @@ -1,6 +1,10 @@ use std::collections::VecDeque; +use std::net::UdpSocket; use std::sync::mpsc; +use std::sync::mpsc::Receiver; +use std::thread::JoinHandle; +use cpal::Stream; use cpal::traits::DeviceTrait; use cpal::traits::HostTrait; use eframe::NativeOptions; @@ -12,6 +16,7 @@ use num::Zero; use oxydsp_dsp::blocks::filtering::fir::FirFilter; use oxydsp_dsp::blocks::iq::zero_if::ZeroIf; use oxydsp_dsp::blocks::ted::early_late::EarlyLateGate; +use oxydsp_dsp::blocks::utilities::adapters::Map; use oxydsp_dsp::blocks::utilities::adapters::NullSink; use oxydsp_dsp::blocks::utilities::adapters::Scan; use oxydsp_dsp::blocks::utilities::adapters::ScanTagged; @@ -21,153 +26,306 @@ use oxydsp_dsp::filtering::fir::Fir; use oxydsp_dsp::units::DigitalFrequency; use oxydsp_flowgraph::flowgraph; use oxydsp_flowgraph::graph::FlowGraph; +use oxydsp_flowgraph::tag::Tag; +use oxydsp_flowgraph::tag::Tagged; use crate::CARRIER; use crate::DEVIATION; use crate::SAMPLE_PER_SYMBOL; use crate::SAMPLE_RATE; -fn main_demod() +pub enum PacketBuilderBitState { - let carrier = DigitalFrequency::from_time_frequency(CARRIER, SAMPLE_RATE as f64); + WaitingForPreamble, + InPacket, +} - let mut reader = hound::WavReader::open("mod.wav").unwrap(); - let sqr_sum = reader - .samples::() - .map(|sample| (sample.unwrap() as f32) / (i16::MAX as f32)) - .collect::>(); - let (audio_tx, audio_rx) = mpsc::channel(); +pub enum PacketBuilderByteState +{ + Length1, + Length2, + Data, +} - //let (source, signal) = IterSource::new(sqr_sum.into_iter().cycle()); - let (source, signal) = RxSource::new(audio_rx); +pub struct PacketBuilder +{ + current_byte: u8, + bit_index: u8, - let (mut zero_if, iq) = ZeroIf::new(signal, carrier.into()); + bit_state: PacketBuilderBitState, + packet_state: PacketBuilderByteState, - zero_if.set_fir(Fir::lowpass( - DigitalFrequency::from_time_frequency(DEVIATION * 2. + 100., SAMPLE_RATE as f64), - 200, - )); + // Packet building + length: u16, + data: Vec, +} - let (squelch, iq) = Squelch::new(iq, 5., 100); - - let (arg_extract, arg) = Scan::new(iq, Complex::zero(), |state, sample| { - let angle: Complex = sample / *state; - *state = sample; - angle.arg() * 14. - }); - let (sig_lowpass, arg) = - FirFilter::::new(arg, Fir(vec![1.; SAMPLE_PER_SYMBOL]).normalized()); - - let mut elg_loop = Fir(vec![1. / 30.; 30]); - //let mut elg_loop = elg_loop.normalized(); - *elg_loop.0.last_mut().unwrap() = 0.4; - //*elg_loop.0.first_mut().unwrap() = 0.001; - let (elg, arg) = EarlyLateGate::new(arg, elg_loop, SAMPLE_PER_SYMBOL); - - // Eye diagram - let (tx, rx) = mpsc::channel(); - let (eye_sender, arg) = ScanTagged::new(arg, VecDeque::new(), move |history, x| { - if history.len() == SAMPLE_PER_SYMBOL - { - history.pop_back(); +impl PacketBuilder +{ + pub fn new() -> Self + { + Self { + current_byte: 0, + bit_index: 0, + bit_state: PacketBuilderBitState::WaitingForPreamble, + packet_state: PacketBuilderByteState::Length1, + length: 0, + data: vec![], } + } - let mut error: f32 = 0.; - let is_symbol_center = x.1.as_ref().is_some_and(|t| { - if let Some(err) = t.retrieve("elg_symbol") - { - error = *err.downcast().unwrap(); - true - } - else - { - false - } - }); - history.push_front(((is_symbol_center, error), x.0)); - - if history.len() > SAMPLE_PER_SYMBOL / 2 && history[SAMPLE_PER_SYMBOL / 2].0.0 + fn next_byte(&mut self) -> Option> + { + match self.packet_state { - let _ = tx.send(( - history.iter().map(|(_, x)| *x).collect::>(), - history[SAMPLE_PER_SYMBOL / 2].0.1, - )); - } - - x.0.into() - }); - let null_sink = NullSink::new(arg); - - let graph = flowgraph![ - source, - squelch, - zero_if, - arg_extract, - sig_lowpass, - elg, - eye_sender, - null_sink - ]; - - // Setup input - let host = cpal::default_host(); - let device = host.default_input_device().expect("No input device"); - let mut supported_configs_range = device - .supported_input_configs() - .expect("error while querying configs"); - let supported_config = supported_configs_range - .next() - .expect("no supported config?!") - .with_sample_rate(SAMPLE_RATE as u32); - let stream = device.build_input_stream( - &supported_config.into(), - move |data: &[f32], _: &cpal::InputCallbackInfo| { - for x in data.iter() + PacketBuilderByteState::Length1 => { - let _ = audio_tx.send(*x); + self.length = 0; + self.length |= self.current_byte as u16; + println!("starting packet, length 1 {}", self.current_byte); + self.packet_state = PacketBuilderByteState::Length2; } - }, - move |err| { - panic!() // react to errors here. - }, - None, // None=blocking, Some(Duration)=timeout - ); - - let j = graph.run(); - - let mut eyes = VecDeque::new(); - eframe::run_simple_native("Plot", NativeOptions::default(), move |ctx, _frame| { - while let Ok(eye) = rx.try_recv() - { - if eyes.len() >= 100 + PacketBuilderByteState::Length2 => { - let _ = eyes.pop_back(); + println!("starting packet, length 2 {}", self.current_byte); + self.length |= (self.current_byte as u16) << 8; + self.data = vec![]; + self.packet_state = PacketBuilderByteState::Data; + println!("length : {}", self.length); } - eyes.push_front(eye); - } - - egui::CentralPanel::default().show(ctx, |ui| { - egui_plot::Plot::new("hello").show(ui, |plot_ui| { - for eye in eyes.iter() + PacketBuilderByteState::Data => + { + self.data.push(self.current_byte); + self.length -= 1; + if self.length == 0 { - plot_ui.line( - Line::new( - "eyes", - eye.0 - .iter() - .enumerate() - .map(|(i, s)| [i as f64, *s as f64]) - .collect::(), - ) - .id("eyes") - .color(color_from_err(eye.1, 0.3)), - ); + println!("finished"); + let current = std::mem::replace(self, Self::new()); + return Some(current.data); + } + } + } + None + } + + pub fn next_bit(&mut self, bit: bool) -> Option> + { + self.current_byte >>= 1; + self.current_byte |= (bit as u8) << 7; + match self.bit_state + { + PacketBuilderBitState::WaitingForPreamble => + { + if self.current_byte == 0b01100111 + { + println!("preamble heard !"); + self.bit_state = PacketBuilderBitState::InPacket; + self.bit_index = 0; + } + return None; + } + PacketBuilderBitState::InPacket => + { + self.bit_index += 1; + if self.bit_index == 8 + { + let out = self.next_byte(); + self.bit_index = 0; + return out; + } + None + } + } + } +} + +pub struct RadioReceiver { + //stream: Stream, + //pub packet_receiver: Receiver>, +} + +impl RadioReceiver +{ + pub fn start_new() -> Self + { + let carrier = DigitalFrequency::from_time_frequency(CARRIER, SAMPLE_RATE as f64); + + let (audio_tx, audio_rx) = mpsc::channel(); + let (packet_tx, packet_rx) = mpsc::channel::>(); + let (source, signal) = RxSource::new(audio_rx); + let (inspect, signal) = Map::new(signal, |x| { + //println!("{x}"); + x + }); + let (mut zero_if, iq) = ZeroIf::new(signal, carrier.into()); + + zero_if.set_fir(Fir::lowpass( + DigitalFrequency::from_time_frequency(2. * DEVIATION + 100., SAMPLE_RATE as f64), + SAMPLE_PER_SYMBOL * 4, + )); + + let (squelch, iq) = Squelch::new(iq, 5., 100); + + let (arg_extract, arg) = Scan::new(iq, Complex::zero(), |state, sample| { + let angle: Complex = sample / *state; + *state = sample; + angle.arg() * 14. + }); + + let mut elg_loop = Fir(vec![1. / 20.; 30]); + *elg_loop.0.last_mut().unwrap() = 1.; + let (elg, arg) = EarlyLateGate::new(arg, elg_loop, SAMPLE_PER_SYMBOL); + + // // Eye diagram + let (tx, rx) = mpsc::channel::<(Vec, f32)>(); + //let (eye_sender, arg) = ScanTagged::new(arg, VecDeque::<()>::new(), move |history, x| { + let (eye_sender, arg) = ScanTagged::new(arg, VecDeque::new(), move |history, x| { + let cloned_tag = x.1.clone(); + if history.len() == 2 * SAMPLE_PER_SYMBOL + { + history.pop_back(); + } + + let mut error: f32 = 0.; + let is_symbol_center = x.1.as_ref().is_some_and(|t| { + if let Some(err) = t.retrieve("elg_symbol") + { + error = *err.downcast().unwrap(); + true + } + else + { + false } }); - ctx.request_repaint(); + history.push_front(((is_symbol_center, error), x.0)); + + if history.len() > SAMPLE_PER_SYMBOL && history[SAMPLE_PER_SYMBOL].0.0 + { + let _ = tx.send(( + history.iter().map(|(_, x)| *x).collect::>(), + history[SAMPLE_PER_SYMBOL].0.1, + )); + } + + Tagged::new(x.0, None) }); - }) - .unwrap(); + + let (packet_map, arg) = + ScanTagged::new(arg, PacketBuilder::new(), move |builder, sample| { + if sample + .1 + .as_ref() + .is_some_and(|t| t.retrieve("elg_symbol").is_some()) + && let Some(packet) = builder.next_bit(sample.0 < 0.) + { + let _ = packet_tx.send(packet); + } + + Tagged::new(sample.0, None) + }); + let null_sink = NullSink::new(arg); + + let graph = flowgraph![ + source, + inspect, + squelch, + zero_if, + packet_map, + arg_extract, + //sig_lowpass, + elg, + eye_sender, + null_sink + ]; + let t = graph.run(); + + // Setup input + // let host = cpal::default_host(); + // let device = host.default_input_device().expect("No input device"); + // let mut supported_configs_range = device + // .supported_input_configs() + // .expect("error while querying configs"); + // let supported_config = supported_configs_range + // .next() + // .expect("no supported config?!") + // .with_sample_rate(SAMPLE_RATE as u32); + // let stream = device + // .build_input_stream( + // &supported_config.into(), + // move |data: &[f32], _: &cpal::InputCallbackInfo| { + // for x in data.iter() + // { + // let _ = audio_tx.send(*x); + // } + // }, + // move |err| { + // panic!() // react to errors here. + // }, + // None, // None=blocking, Some(Duration)=timeout + // ) + // .unwrap(); + std::thread::spawn(move || { + let socket = UdpSocket::bind("0.0.0.0:25565").unwrap(); + + let mut buffer = [0u8; 4096]; + while let Ok(read) = socket.recv(&mut buffer) + { + let read_buffer = &mut buffer[0..read]; + for x in read_buffer.chunks(4) + { + let val = f32::from_le_bytes([x[0], x[1], x[2], x[3]]); + let _ = audio_tx.send(val); + } + } + }); + + let mut eyes = VecDeque::new(); + eframe::run_simple_native("Plot", NativeOptions::default(), move |ctx, _frame| { + while let Ok(x) = packet_rx.try_recv() + { + println!("Got data: {} bytes.", x.len()); + let str: String = x.iter().map(|x| *x as char).collect(); + println!("-----\n\n{}\n\n-----", str); + } + + while let Ok(eye) = rx.try_recv() + { + if eyes.len() >= 100 + { + let _ = eyes.pop_back(); + } + eyes.push_front(eye); + } + + egui::CentralPanel::default().show(ctx, |ui| { + egui_plot::Plot::new("hello").show(ui, |plot_ui| { + for eye in eyes.iter() + { + plot_ui.line( + Line::new( + "eyes", + eye.0 + .iter() + .enumerate() + .map(|(i, s)| [i as f64 / 2., *s as f64]) + .collect::(), + ) + .id("eyes") + .color(Color32::GREEN), + ); + } + }); + ctx.request_repaint(); + }); + }) + .unwrap(); + + Self { + //stream, + //packet_receiver: packet_rx, + } + } } pub fn color_from_err(error: f32, max: f32) -> Color32 diff --git a/example/src/transmitter.rs b/example/src/transmitter.rs index 60d6542..ecd831e 100644 --- a/example/src/transmitter.rs +++ b/example/src/transmitter.rs @@ -2,6 +2,7 @@ use std::collections::VecDeque; use std::fs::File; use std::io::Write; use std::iter::FusedIterator; +use std::net::UdpSocket; use std::ops::BitXor; use std::sync::mpsc; use std::sync::mpsc::Receiver; @@ -9,6 +10,7 @@ use std::sync::mpsc::Sender; use std::sync::mpsc::SyncSender; use std::sync::mpsc::sync_channel; use std::thread::JoinHandle; +use std::time::Duration; use cpal::Stream; use cpal::traits::DeviceTrait; @@ -176,30 +178,33 @@ impl Transmitter let (packet_rec, packets): (_, In>) = RxSource::new(packet_rx); let (linearizer, bits) = FlatMap::new(packets, |packet| { // +1 for chksum - let packet_length = (packet.len() + 1) as u16; + let packet_length = packet.len() as u16; let checksum = packet.iter().copied().reduce(BitXor::bitxor).unwrap(); // Learning sequence let mut frame = vec![0b10101010; 8]; + // Preamble + frame.push(0b01100111); frame.push(packet_length.to_le_bytes()[0]); frame.push(packet_length.to_le_bytes()[1]); frame.extend(packet.iter()); frame.push(checksum); + frame.extend((0..16).map(|_| 0)); frame .into_iter() .flat_map(to_bits) - .map(|x| if x { 1. } else { -1.0f32 }) + .map(|x| if x { 1. } else { -1. }) }); let (repeat, bits) = Repeat::new(bits, SAMPLE_PER_SYMBOL); // gaussian fir let fir = Fir((0..SAMPLE_PER_SYMBOL) - .map(|x| gaussian(0.3, x as f32 / SAMPLE_PER_SYMBOL as f32)) + .map(|x| gaussian(0.1, x as f32 / SAMPLE_PER_SYMBOL as f32)) .collect()) .normalized(); - let (bit_filter, bits) = FirFilter::new(bits, fir); + //let (bit_filter, bits) = FirFilter::new(bits, fir); let (to_freq, freq) = Map::new(bits, move |x| { DigitalFrequency::from_time_frequency(DEVIATION * x as f64, SAMPLE_RATE as f64) }); @@ -207,12 +212,36 @@ impl Transmitter let (local_oscillator, lo) = OscillatorSource::::new(carrier.into()); let (frontend, passband) = Multiplier::new(baseband, lo); let (audio_tx, audio_rx) = mpsc::channel::>(); + + let reverb_length = 20; + // let (reverb, passband) = FirFilter::new( + // passband, + // Fir((0..reverb_length) + // .map(|x| (-15. * (x as f32) / (reverb_length as f32)).exp()) + // .collect()) + // .normalized(), + // ); + + let (udp_map, passband) = Scan::new( + passband, + UdpSocket::bind("127.0.0.1:25566").unwrap(), + |sckt, sample| { + sckt.send_to( + &(sample.re + ((random::() * 2.) - 1.) * 0.0).to_le_bytes(), + "127.0.0.1:25565", + ) + .unwrap(); + sample + }, + ); let tx_sink = TxSink::new(passband, audio_tx); let graph = flowgraph![ packet_rec, linearizer, - bit_filter, + //reverb, + //bit_filter, + udp_map, to_freq, repeat, base_oscillator, diff --git a/oxydsp-flowgraph/src/graph.rs b/oxydsp-flowgraph/src/graph.rs index f40429f..79726e3 100644 --- a/oxydsp-flowgraph/src/graph.rs +++ b/oxydsp-flowgraph/src/graph.rs @@ -53,6 +53,7 @@ impl FlowGraph } crate::block::BlockResult::Exit => { + println!("KILLING GRAPH"); break 'outer; } }