From b57b85f95973019316778dc9faeb4075e21ad919 Mon Sep 17 00:00:00 2001 From: Albin Chaboissier Date: Wed, 25 Mar 2026 16:33:10 +0100 Subject: [PATCH] Tee block, bpsk eye --- examples/dpsk-modem/demodulator.dot | 22 ++ examples/dpsk-modem/src/main.rs | 215 ++++++++++++++------ oxydsp-dsp/src/blocks/utilities/adapters.rs | 51 ++++- 3 files changed, 229 insertions(+), 59 deletions(-) create mode 100644 examples/dpsk-modem/demodulator.dot diff --git a/examples/dpsk-modem/demodulator.dot b/examples/dpsk-modem/demodulator.dot new file mode 100644 index 0000000..ca7f5a3 --- /dev/null +++ b/examples/dpsk-modem/demodulator.dot @@ -0,0 +1,22 @@ + + digraph G { + node [shape=record]; + rankdir=TB; + IterSource_0 [label="{ IterSource |{ output} }"]; +ZeroIf_1 [label="{ { input}| ZeroIf |{ output} }"]; +NullSink_2 [label="{ { input}| NullSink }"]; +Scan_3 [label="{ { input}| Scan |{ output} }"]; +Multiplier_4 [label="{ { input_a| input_b}| Multiplier |{ output} }"]; +Tee_5 [label="{ { input}| Tee |{ output_a| output_b} }"]; +Scan_6 [label="{ { input}| Scan |{ output} }"]; + + IterSource_0:o0 -> ZeroIf_1:i0 [label="f32"]; +ZeroIf_1:o0 -> Tee_5:i0 [label="num_complex::Complex"]; +Scan_3:o0 -> Multiplier_4:i1 [label="num_complex::Complex"]; +Multiplier_4:o0 -> Scan_6:i0 [label="num_complex::Complex"]; +Tee_5:o0 -> Multiplier_4:i0 [label="num_complex::Complex"]; +Tee_5:o1 -> Scan_3:i0 [label="num_complex::Complex"]; +Scan_6:o0 -> NullSink_2:i0 [label="()"]; + + } + \ No newline at end of file diff --git a/examples/dpsk-modem/src/main.rs b/examples/dpsk-modem/src/main.rs index f66f7c1..45b8fe2 100644 --- a/examples/dpsk-modem/src/main.rs +++ b/examples/dpsk-modem/src/main.rs @@ -1,9 +1,17 @@ use std::collections::VecDeque; +use std::env::args; +use std::fs::File; +use std::io::Write; +use std::net::Ipv4Addr; +use std::net::UdpSocket; +use std::os::unix::thread; use std::sync::mpsc::channel; +use std::sync::mpsc::sync_channel; use std::time::Duration; use eframe::NativeOptions; use egui::Color32; +use egui_plot::Line; use egui_plot::PlotPoints; use egui_plot::Points; use num::Complex; @@ -15,6 +23,8 @@ use oxydsp_dsp::blocks::utilities::adapters::MapResultTagged; use oxydsp_dsp::blocks::utilities::adapters::NullSink; use oxydsp_dsp::blocks::utilities::adapters::Repeat; use oxydsp_dsp::blocks::utilities::adapters::Scan; +use oxydsp_dsp::blocks::utilities::adapters::Tee; +use oxydsp_dsp::blocks::utilities::channels::RxSource; use oxydsp_dsp::blocks::utilities::channels::TxSink; use oxydsp_dsp::blocks::utilities::iter::IterSource; use oxydsp_dsp::filtering::fir::Fir; @@ -23,7 +33,6 @@ use oxydsp_flowgraph::block::BlockResult; use oxydsp_flowgraph::flowgraph; use oxydsp_flowgraph::tag::Tags; use rand::random; -use rand::random_bool; const SAMPLE_RATE: usize = 48_000; const CARRIER: f64 = 1000.; @@ -31,65 +40,127 @@ const SAMPLE_PER_SYMBOL: usize = 96; fn main() { - modulator(); - demodulator(); + let args = std::env::args(); + if args.len() == 1 + { + demodulator(); + } + else + { + modulator(); + } println!("Hello, world!"); } fn demodulator() { - let mut reader = hound::WavReader::open("mod.wav").unwrap(); - let samples = reader - .samples::() - .map(|x| (x.unwrap() as f32) / (i16::MAX as f32)) - .collect::>(); + let (signal_tx, signal_rx) = channel(); + std::thread::spawn(move || { + let udp_socket = UdpSocket::bind("0.0.0.0:25565").unwrap(); + let mut buffer = [0u8; 4096]; + while let Ok(size) = udp_socket.recv(&mut buffer) + { + let read = &mut buffer[..size]; + for bytes in read.chunks(4) + { + if bytes.len() == 4 + { + let _ = signal_tx + .send(f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])); + } + } + } + }); - let (signal_source, signal) = IterSource::new(samples.into_iter()); - let (mut zero_if, baseband) = ZeroIf::new( + let (signal_source, signal) = IterSource::new(signal_rx.into_iter()); + let (zero_if, baseband) = ZeroIf::new( signal, DigitalFrequency::from_time_frequency(CARRIER, SAMPLE_RATE as f64).into(), ); - zero_if.set_fir(Fir::lowpass( - DigitalFrequency::from_time_frequency(CARRIER + 100., SAMPLE_RATE as f64), - 100, - )); + // zero_if.set_fir(Fir::lowpass( + // DigitalFrequency::from_time_frequency(CARRIER + 100., SAMPLE_RATE as f64), + // 100, + // )); - let (constellation_tx, conbtellation_rx) = channel::>(); - let tx_sink = TxSink::new(baseband, constellation_tx); + // Matched corellator + let (tee, baseband, baseband_late) = Tee::new(baseband); - let graph = flowgraph![signal_source, zero_if, tx_sink]; + // Delay + let (delay, baseband_late) = Scan::new( + baseband_late, + VecDeque::from([Complex::::ZERO; SAMPLE_PER_SYMBOL]), + |history, x| { + history.push_front(x); + history.pop_back().unwrap().conj() + }, + ); + + let (multiplier, matched) = Multiplier::new(baseband, baseband_late); + + let (constellation_tx, conbtellation_rx) = channel(); + let (sender_scan, matched) = Scan::new(matched, VecDeque::new(), move |history, x| { + history.push_back(x.re); + if history.len() >= SAMPLE_PER_SYMBOL + { + let _ = constellation_tx.send(Vec::from(history.clone())); + history.clear(); + } + }); + let tx_sink = NullSink::new(matched); + + let graph = flowgraph![ + signal_source, + zero_if, + tx_sink, + delay, + multiplier, + tee, + sender_scan + ]; + File::create("demodulator.dot") + .unwrap() + .write_all(graph.get_dot().as_bytes()) + .unwrap(); graph.run(); let mut constellation = VecDeque::new(); - let mut n = 0; eframe::run_simple_native("Plot", NativeOptions::default(), move |ctx, _frame| { while let Ok(sample) = conbtellation_rx.try_recv() { - if constellation.len() >= 100_000 + if constellation.len() >= 100 { let _ = constellation.pop_back(); } - if n.is_multiple_of(&SAMPLE_PER_SYMBOL) - { - constellation.push_front(sample); - } - n += 1; + constellation.push_front(sample); } egui::CentralPanel::default().show(ctx, |ui| { egui_plot::Plot::new("hello").show(ui, |plot_ui| { - plot_ui.points( - Points::new( - "constellation", - constellation - .iter() - .map(|s| [s.re as f64, s.im as f64]) - .collect::(), - ) - .id("constellation") - .color(Color32::GREEN), - ); + for eye in constellation.iter() + { + plot_ui.line( + Line::new( + "eye", + eye.iter() + .enumerate() + .map(|(i, e)| [i as f64, *e as f64]) + .collect::(), + ) + .color(Color32::GREEN), + ); + } + // plot_ui.points( + // Points::new( + // "constellation", + // constellation + // .iter() + // .map(|s| [s.re as f64, s.im as f64]) + // .collect::(), + // ) + // .id("constellation") + // .color(Color32::GREEN), + // ); }); ctx.request_repaint(); }); @@ -99,12 +170,24 @@ fn demodulator() fn modulator() { - let random_source = (0..1000).map(|_| random_bool(0.5)); + let (data_tx, data_rx) = channel(); + + std::thread::spawn(move || { + loop + { + let mut str = String::new(); + let input = std::io::stdin().read_line(&mut str).unwrap(); + + for bit in str.as_bytes().iter().copied().flat_map(to_bits) + { + let _ = data_tx.send(bit); + } + } + }); let mut tags = Tags::new(); - let (mut bit_source, bits) = IterSource::new(random_source); - let last_tag = tags.allocate_tag("finished"); - bit_source.tag_last_with(last_tag.clone()); + let (mut bit_source, bits) = RxSource::new(data_rx); + //let last_tag = tags.allocate_tag("finished"); let (phase_map, phase) = Scan::new(bits, 1., |state, bit| { if bit @@ -119,13 +202,13 @@ fn modulator() DigitalFrequency::from_time_frequency(CARRIER, SAMPLE_RATE as f64).into(), ); let (multiplier, passband) = Multiplier::new(passband, phase); - let (output_tx, output_rx) = channel::>(); + let (output_tx, output_rx) = sync_channel::>(48_000); let (tx_map, passband) = MapResultTagged::new(passband, move |s| { let _ = output_tx.send(s.0); - if s.retrieve(&last_tag).is_some() - { - return (s, BlockResult::Exit); - } + // if s.retrieve(&last_tag).is_some() + // { + // return (s, BlockResult::Exit); + // } (s, BlockResult::Ok) }); let null_sink = NullSink::new(passband); @@ -135,19 +218,37 @@ fn modulator() ]; graph.run(); - let spec = hound::WavSpec { - channels: 1, - sample_rate: SAMPLE_RATE as u32, - bits_per_sample: 16, - sample_format: hound::SampleFormat::Int, - }; - let mut writer = hound::WavWriter::create("mod.wav", spec).unwrap(); - for sample in output_rx.iter() + let udp_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + while let Ok(sample) = output_rx.recv() { - let amplitude = i16::MAX as f32; - writer - .write_sample(((sample.re + random::() * 0.2) * amplitude) as i16) - .unwrap(); + std::thread::sleep(Duration::from_micros(20)); + let val = sample.re + random::() * 0.2; + let _ = udp_socket.send_to(&val.to_le_bytes(), "127.0.0.1:25565"); } - writer.finalize().unwrap(); +} + +pub fn to_bits(n: u8) -> [bool; 8] +{ + [ + (n & 1) == 1, + (n >> 1) & 1 == 1, + (n >> 2) & 1 == 1, + (n >> 3) & 1 == 1, + (n >> 4) & 1 == 1, + (n >> 5) & 1 == 1, + (n >> 6) & 1 == 1, + (n >> 7) & 1 == 1, + ] +} + +pub fn from_bits(n: [bool; 8]) -> u8 +{ + (n[0] as u8) + | ((n[1] as u8) << 1) + | ((n[2] as u8) << 2) + | ((n[3] as u8) << 3) + | ((n[4] as u8) << 4) + | ((n[5] as u8) << 5) + | ((n[6] as u8) << 6) + | ((n[7] as u8) << 7) } diff --git a/oxydsp-dsp/src/blocks/utilities/adapters.rs b/oxydsp-dsp/src/blocks/utilities/adapters.rs index 5518a9a..de148d4 100644 --- a/oxydsp-dsp/src/blocks/utilities/adapters.rs +++ b/oxydsp-dsp/src/blocks/utilities/adapters.rs @@ -1,5 +1,3 @@ -use core::sync; - use oxydsp_flowgraph::BlockIO; use oxydsp_flowgraph::block::Block; use oxydsp_flowgraph::block::BlockResult; @@ -348,3 +346,52 @@ impl<'view, I: 'static> SyncBlock<'view> for NullSink Some(()) } } + +#[derive(BlockIO)] +pub struct Tee +{ + #[input] + input: In, + + #[output] + output_a: Out, + + #[output] + output_b: Out, +} + +impl Tee +{ + pub fn new(input: In) -> (Self, In, In) + { + let (output_a, port_a) = stream(); + let (output_b, port_b) = stream(); + ( + Self { + input, + output_a, + output_b, + }, + port_a, + port_b, + ) + } +} + +impl Block for Tee +{ + fn work(&mut self) -> BlockResult + { + let writer_a = self.output_a.write(); + let writer_b = self.output_b.write(); + for x in self + .input + .pop_iter() + .take(writer_a.len().min(writer_b.len())) + { + let _ = writer_a.push(x.clone()); + let _ = writer_b.push(x); + } + BlockResult::Ok + } +}