diff --git a/examples/bfsk-modem/src/receiver.rs b/examples/bfsk-modem/src/receiver.rs index 00783e7..fdd08a3 100644 --- a/examples/bfsk-modem/src/receiver.rs +++ b/examples/bfsk-modem/src/receiver.rs @@ -192,7 +192,7 @@ impl RadioReceiver let mut error: f32 = 0.; let is_symbol_center = x.1.as_ref().is_some_and(|t| { - if let Some(err) = t.retrieve(symbol_tag2.clone()) + if let Some(err) = t.retrieve(&symbol_tag2.clone()) { error = *err; true @@ -220,7 +220,7 @@ impl RadioReceiver if sample .1 .as_ref() - .is_some_and(|t| t.retrieve(symbol_tag.clone()).is_some()) + .is_some_and(|t| t.retrieve(&symbol_tag.clone()).is_some()) && let Some(packet) = builder.next_bit(sample.0 < 0.) { let _ = packet_tx.send(packet); diff --git a/examples/bfsk-modem/src/transmitter.rs b/examples/bfsk-modem/src/transmitter.rs index 9eb607a..3afaf5d 100644 --- a/examples/bfsk-modem/src/transmitter.rs +++ b/examples/bfsk-modem/src/transmitter.rs @@ -29,6 +29,7 @@ use std::sync::mpsc::Receiver; use std::sync::mpsc::SyncSender; use std::sync::mpsc::sync_channel; use std::thread::JoinHandle; +use std::time::Duration; use crate::CARRIER; use crate::DEVIATION; @@ -37,109 +38,6 @@ use crate::SAMPLE_RATE; use crate::gaussian; use crate::to_bits; -#[derive(BlockIO)] -pub struct FlatMap -where - I: 'static, - O: IntoIterator + 'static, - O::IntoIter: FusedIterator, - F: Fn(I) -> O, -{ - #[input] - input: In, - - #[output] - output: Out, - - current_iter: Option, - map: F, -} - -impl FlatMap -where - I: 'static, - O: IntoIterator + 'static, - O::IntoIter: FusedIterator, - F: Fn(I) -> O, -{ - pub fn new(input: In, map: F) -> (Self, In) - { - let (output, port) = oxydsp_flowgraph::io::stream(); - ( - Self { - input, - output, - current_iter: None, - map, - }, - port, - ) - } -} - -impl Block for FlatMap -where - I: 'static, - O: IntoIterator + 'static, - O::IntoIter: FusedIterator, - F: Fn(I) -> O, -{ - fn work(&mut self) -> BlockResult - { - let writer = self.output.write(); - let reader = self.input.read(); - - let max_write = writer.len(); - let mut written = 0; - - while written < max_write - { - if let Some(current_iter) = self.current_iter.as_mut() - { - if let Some(next_elt) = current_iter.next() - { - let _ = writer.push((next_elt, None).into()); - written += 1; - continue; - } - else - { - // Iterator empty - self.current_iter = None; - } - } - - if self.current_iter.is_none() - { - // Get input - if let Some(input) = reader.pop() - { - let mut new_iter = (self.map)(input.0).into_iter(); - if let Some(first_elt) = new_iter.next() - { - self.current_iter = Some(new_iter); - let _ = writer.push((first_elt, input.1).into()); - written += 1; - } - else - { - // Iterator empty - self.current_iter = None; - continue; - } - } - else - { - // Cannot continue - break; - } - } - } - - BlockResult::Ok - } -} - pub struct Transmitter { flowgraph_handle: JoinHandle<()>, @@ -176,13 +74,17 @@ impl Transmitter .map(|x| if x { 1. } else { -1. }) }); - let (repeat, bits) = Repeat::new(bits, SAMPLE_PER_SYMBOL); + let (repeat, bits) = FlatMap::new(bits, |symbol| { + let mut v = vec![0.; SAMPLE_PER_SYMBOL - 1]; + v.push(symbol); + v + }); // gaussian fir let fir = Fir((0..SAMPLE_PER_SYMBOL) - .map(|x| gaussian(0.8, x as f32 / SAMPLE_PER_SYMBOL as f32)) - .collect()) - .normalized(); + .map(|x| gaussian(0.3, x as f32 / SAMPLE_PER_SYMBOL as f32)) + .collect()); + //.normalized(); let (bit_filter, bits) = FirFilter::new(bits, fir); let (to_freq, freq) = Map::new(bits, move |x| { @@ -194,18 +96,19 @@ impl Transmitter let (audio_tx, audio_rx) = mpsc::channel::>(); let reverb_length = 20; - // let (reverb, passband) = FirFilter::new( - // passband, - // Fir((0..reverb_length) - // .map(|x| (-15. * (x as f32) / (reverb_length as f32)).exp()) - // .collect()) - // .normalized(), - // ); + let (reverb, passband) = FirFilter::new( + passband, + Fir((0..reverb_length) + .map(|x| (-15. * (x as f32) / (reverb_length as f32)).exp()) + .collect()) + .normalized(), + ); let (udp_map, passband) = Scan::new( passband, UdpSocket::bind("127.0.0.1:25566").unwrap(), |sckt, sample| { + std::thread::sleep(Duration::from_micros(20)); sckt.send_to( &(sample.re + ((random::() * 2.) - 1.) * 0.0).to_le_bytes(), "127.0.0.1:25565", @@ -219,7 +122,7 @@ impl Transmitter let graph = flowgraph![ packet_rec, linearizer, - //reverb, + reverb, bit_filter, udp_map, to_freq, @@ -250,7 +153,7 @@ impl Transmitter { if let Ok(y) = audio_rx.try_recv() { - *x = y.re; + *x = y.re * 0.01; } else { diff --git a/examples/dpsk-modem/src/main.rs b/examples/dpsk-modem/src/main.rs index 45b8fe2..2d13d2b 100644 --- a/examples/dpsk-modem/src/main.rs +++ b/examples/dpsk-modem/src/main.rs @@ -16,9 +16,11 @@ use egui_plot::PlotPoints; use egui_plot::Points; use num::Complex; use num::Integer; +use oxydsp_dsp::blocks::filtering::fir::FirFilter; use oxydsp_dsp::blocks::iq::zero_if::ZeroIf; use oxydsp_dsp::blocks::math::basic::Multiplier; use oxydsp_dsp::blocks::synthesis::OscillatorSource; +use oxydsp_dsp::blocks::utilities::adapters::FlatMap; use oxydsp_dsp::blocks::utilities::adapters::MapResultTagged; use oxydsp_dsp::blocks::utilities::adapters::NullSink; use oxydsp_dsp::blocks::utilities::adapters::Repeat; @@ -73,14 +75,14 @@ fn demodulator() }); let (signal_source, signal) = IterSource::new(signal_rx.into_iter()); - let (zero_if, baseband) = ZeroIf::new( + let (mut zero_if, baseband) = ZeroIf::new( signal, DigitalFrequency::from_time_frequency(CARRIER, SAMPLE_RATE as f64).into(), ); - // zero_if.set_fir(Fir::lowpass( - // DigitalFrequency::from_time_frequency(CARRIER + 100., SAMPLE_RATE as f64), - // 100, - // )); + zero_if.set_fir(Fir::lowpass( + DigitalFrequency::from_time_frequency(CARRIER + 100., SAMPLE_RATE as f64), + 100, + )); // Matched corellator let (tee, baseband, baseband_late) = Tee::new(baseband); @@ -186,23 +188,42 @@ fn modulator() }); let mut tags = Tags::new(); - let (mut bit_source, bits) = RxSource::new(data_rx); + let (bit_source, bits) = RxSource::new(data_rx); //let last_tag = tags.allocate_tag("finished"); - let (phase_map, phase) = Scan::new(bits, 1., |state, bit| { + let (phase_map, phase) = Scan::new(bits, Complex::::new(1., 0.), |state, bit| { if bit { - *state *= -1.; + *state *= Complex::new(1., 0.); + } + else + { + *state *= Complex::new(-1., 0.) } *state }); - let (repeater, phase) = Repeat::new(phase, SAMPLE_PER_SYMBOL); + + // Convert to pulse train + let (repeater, phase) = FlatMap::new(phase, |x| { + let mut v = vec![Complex::::new(0., 0.); SAMPLE_PER_SYMBOL - 1]; + v.push(x); + v + }); + + // Pulse shaping + // gaussian fir + let fir = Fir((0..SAMPLE_PER_SYMBOL) + .map(|x| gaussian(0.3, x as f32 / SAMPLE_PER_SYMBOL as f32)) + .collect()); + //.normalized(); + + let (phase_filter, phase) = FirFilter::new(phase, fir); let (oscillator, passband) = OscillatorSource::::new( DigitalFrequency::from_time_frequency(CARRIER, SAMPLE_RATE as f64).into(), ); let (multiplier, passband) = Multiplier::new(passband, phase); - let (output_tx, output_rx) = sync_channel::>(48_000); + let (output_tx, output_rx) = channel::>(); let (tx_map, passband) = MapResultTagged::new(passband, move |s| { let _ = output_tx.send(s.0); // if s.retrieve(&last_tag).is_some() @@ -214,16 +235,24 @@ fn modulator() let null_sink = NullSink::new(passband); let graph = flowgraph![ - bit_source, phase_map, repeater, oscillator, multiplier, tx_map, null_sink + bit_source, + phase_map, + repeater, + oscillator, + multiplier, + tx_map, + null_sink, + phase_filter ]; graph.run(); let udp_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); while let Ok(sample) = output_rx.recv() { - std::thread::sleep(Duration::from_micros(20)); - let val = sample.re + random::() * 0.2; + std::thread::sleep(Duration::from_micros(15)); + let val = sample.re + random::() * 0.5; let _ = udp_socket.send_to(&val.to_le_bytes(), "127.0.0.1:25565"); + let _ = udp_socket.send_to(&val.to_le_bytes(), "127.0.0.1:25566"); } } @@ -252,3 +281,9 @@ pub fn from_bits(n: [bool; 8]) -> u8 | ((n[6] as u8) << 6) | ((n[7] as u8) << 7) } + +pub fn gaussian(sigma: f32, t: f32) -> f32 +{ + let sq = (t - 0.5) / sigma; + (-sq * sq).exp() +} diff --git a/oxydsp-dsp/src/blocks/utilities/adapters.rs b/oxydsp-dsp/src/blocks/utilities/adapters.rs index de148d4..14277b5 100644 --- a/oxydsp-dsp/src/blocks/utilities/adapters.rs +++ b/oxydsp-dsp/src/blocks/utilities/adapters.rs @@ -1,3 +1,5 @@ +use std::iter::FusedIterator; + use oxydsp_flowgraph::BlockIO; use oxydsp_flowgraph::block::Block; use oxydsp_flowgraph::block::BlockResult; @@ -395,3 +397,106 @@ impl Block for Tee BlockResult::Ok } } + +#[derive(BlockIO)] +pub struct FlatMap +where + I: 'static, + O: IntoIterator + 'static, + O::IntoIter: FusedIterator, + F: Fn(I) -> O, +{ + #[input] + input: In, + + #[output] + output: Out, + + current_iter: Option, + map: F, +} + +impl FlatMap +where + I: 'static, + O: IntoIterator + 'static, + O::IntoIter: FusedIterator, + F: Fn(I) -> O, +{ + pub fn new(input: In, map: F) -> (Self, In) + { + let (output, port) = oxydsp_flowgraph::io::stream(); + ( + Self { + input, + output, + current_iter: None, + map, + }, + port, + ) + } +} + +impl Block for FlatMap +where + I: 'static, + O: IntoIterator + 'static, + O::IntoIter: FusedIterator, + F: Fn(I) -> O, +{ + fn work(&mut self) -> BlockResult + { + let writer = self.output.write(); + let reader = self.input.read(); + + let max_write = writer.len(); + let mut written = 0; + + while written < max_write + { + if let Some(current_iter) = self.current_iter.as_mut() + { + if let Some(next_elt) = current_iter.next() + { + let _ = writer.push((next_elt, None).into()); + written += 1; + continue; + } + else + { + // Iterator empty + self.current_iter = None; + } + } + + if self.current_iter.is_none() + { + // Get input + if let Some(input) = reader.pop() + { + let mut new_iter = (self.map)(input.0).into_iter(); + if let Some(first_elt) = new_iter.next() + { + self.current_iter = Some(new_iter); + let _ = writer.push((first_elt, input.1).into()); + written += 1; + } + else + { + // Iterator empty + self.current_iter = None; + continue; + } + } + else + { + // Cannot continue + break; + } + } + } + + BlockResult::Ok + } +}