diff --git a/Cargo.lock b/Cargo.lock index 8c2b636..82e3a60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3076,6 +3076,14 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" +[[package]] +name = "simple" +version = "0.1.0" +dependencies = [ + "oxydsp-dsp", + "oxydsp-flowgraph", +] + [[package]] name = "slab" version = "0.4.12" diff --git a/examples/bfsk-modem-tx/src/transmitter.rs b/examples/bfsk-modem-tx/src/transmitter.rs index 666ff8e..977fd78 100644 --- a/examples/bfsk-modem-tx/src/transmitter.rs +++ b/examples/bfsk-modem-tx/src/transmitter.rs @@ -8,7 +8,6 @@ use oxydsp_dsp::blocks::synthesis::Nco; use oxydsp_dsp::blocks::synthesis::OscillatorSource; use oxydsp_dsp::blocks::utilities::adapters::FlatMap; use oxydsp_dsp::blocks::utilities::adapters::Map; -use oxydsp_dsp::blocks::utilities::adapters::Scan; use oxydsp_dsp::blocks::utilities::channels::RxSource; use oxydsp_dsp::blocks::utilities::channels::TxSink; use oxydsp_dsp::filtering::fir::Fir; @@ -17,20 +16,17 @@ use oxydsp_flowgraph::flowgraph; use oxydsp_flowgraph::io::In; use rand::random; use std::f32::consts::PI; -use std::net::UdpSocket; use std::ops::BitXor; use std::sync::mpsc; use std::sync::mpsc::Receiver; use std::sync::mpsc::SyncSender; use std::sync::mpsc::sync_channel; use std::thread::JoinHandle; -use std::time::Duration; use crate::CARRIER; use crate::DEVIATION; use crate::SAMPLE_PER_SYMBOL; use crate::SAMPLE_RATE; -use crate::gaussian; use crate::to_bits; pub struct Transmitter diff --git a/examples/qpsk-modem/mod.wav b/examples/qpsk-modem/mod.wav index e3e8055..a11e6ae 100644 Binary files a/examples/qpsk-modem/mod.wav and b/examples/qpsk-modem/mod.wav differ diff --git a/examples/qpsk-modem/mod_rrc.wav b/examples/qpsk-modem/mod_rrc.wav deleted file mode 100644 index aeb34e7..0000000 Binary files a/examples/qpsk-modem/mod_rrc.wav and /dev/null differ diff --git a/examples/qpsk-modem/src/main.rs b/examples/qpsk-modem/src/main.rs index 6ef7831..f429c68 100644 --- a/examples/qpsk-modem/src/main.rs +++ b/examples/qpsk-modem/src/main.rs @@ -1,38 +1,37 @@ -use std::cell::RefCell; -use std::os::unix::thread; -use std::time::Duration; use cpal::traits::DeviceTrait; use cpal::traits::HostTrait; -use cpal::traits::StreamTrait; use egui::Color32; use egui_plot::Line; use egui_plot::PlotPoints; use egui_plot::Points; use num::Complex; use num::complex::ComplexFloat; -use num::traits::sign; use oxydsp_dsp::blocks::filtering::fir::FirFilter; use oxydsp_dsp::blocks::filtering::pulse_shaping::PulseShaper; -use oxydsp_dsp::blocks::iq::zero_if::ZeroIf; use oxydsp_dsp::blocks::math::basic::Multiplier; use oxydsp_dsp::blocks::synthesis::OscillatorSource; +use oxydsp_dsp::blocks::ted::early_late::EarlyLateGate; use oxydsp_dsp::blocks::utilities::adapters::Map; +use oxydsp_dsp::blocks::utilities::adapters::Merger; use oxydsp_dsp::blocks::utilities::adapters::NullSink; use oxydsp_dsp::blocks::utilities::adapters::Scan; +use oxydsp_dsp::blocks::utilities::adapters::ScanTagged; +use oxydsp_dsp::blocks::utilities::adapters::Splitter; use oxydsp_dsp::blocks::utilities::channels::RxSource; use oxydsp_dsp::blocks::utilities::channels::TxSink; use oxydsp_dsp::blocks::utilities::graph_control::GraphKiller; use oxydsp_dsp::blocks::utilities::iter::IterSource; use oxydsp_dsp::filtering::fir::Fir; use oxydsp_dsp::filtering::history_buf::HistoryBuf; +use oxydsp_dsp::map; use oxydsp_dsp::units::DigitalFrequency; use oxydsp_flowgraph::BlockIO; use oxydsp_flowgraph::block::Block; use oxydsp_flowgraph::flowgraph; use oxydsp_flowgraph::io::In; use oxydsp_flowgraph::io::Out; -use oxydsp_flowgraph::stream; +use oxydsp_flowgraph::tag::Tagged; use oxydsp_flowgraph::tag::Tags; use rand::random; @@ -128,9 +127,14 @@ pub fn modulator() [false, false] => Complex::new(-1., -1.), }); + // let (pulse_shaper, iq) = PulseShaper::new( + // iq, + // Fir::root_raised_cosine(4 * SAMPLE_PER_SYMBOL, 1., SAMPLE_PER_SYMBOL), + // SAMPLE_PER_SYMBOL, + // ); let (pulse_shaper, iq) = PulseShaper::new( iq, - Fir::root_raised_cosine(4 * SAMPLE_PER_SYMBOL, 0.5, SAMPLE_PER_SYMBOL), + Fir::gaussian(SAMPLE_PER_SYMBOL, 3.), SAMPLE_PER_SYMBOL, ); let (carrier_oscillator, carrier) = OscillatorSource::new( @@ -176,51 +180,82 @@ pub fn demodulator() let (downconverter, iq) = CostasLoop::new( signal, DigitalFrequency::from_time_frequency(CARRRIER_FREQ, SAMPLE_RATE as f64), - Fir::proportional_integral(100, 0.000, 0.0000), + Fir::proportional_integral(100, 0.01, 0.00005), ); - //let agc_filter = Fir::proportional_integral(100, 0.1, 0.001); - // let (agc, iq) = Scan::new(iq, 1., |gain, iq| - // { - // let mu = 0.1; - // let mag = iq.abs(); - // *gain += mu * (1. - mag * *gain); - // - // iq * *gain - // }); + //let mut agc_filter = oxydsp_dsp::filtering::fir::FirFilter::new(Fir::proportional_integral(100, 0.1, 0.001)); + let (agc_error_tx, agc_error_rx) = std::sync::mpsc::channel(); + let (agc, iq) = Scan::new(iq, (1., 0.), move |(gain, error_low), iq| { + let out = *gain * iq; - let (matched_filter, iq) = FirFilter::new( + let error = 1. - out.abs(); + let alpha = 0.01; + *error_low = (1. - alpha) * *error_low + alpha * error; + let _ = agc_error_tx.send(*error_low); + *gain += 0.01 * error; // Feedback + + out + }); + + const DECIMATION: usize = 1; + // let (matched_filter, iq) = FirFilter::<_, _, _, DECIMATION>::new_decimating( + // iq, + // Fir::>::lowpass(DigitalFrequency::from_time_frequency(2000., SAMPLE_RATE as f64), 100) + // .normalized_len().convoluted_with(&Fir::>::root_raised_cosine(4 * SAMPLE_PER_SYMBOL, 1., SAMPLE_PER_SYMBOL) + // .normalized_sqr()) + // ); + // let (matched_filter, iq) = FirFilter::<_, _, _, DECIMATION>::new_decimating( + // iq, + // Fir::>::lowpass(DigitalFrequency::from_time_frequency(2000., SAMPLE_RATE as f64), 100) + // .normalized_len().convoluted_with(&Fir::>::gaussian(SAMPLE_PER_SYMBOL, 3.).normalized_sqr()) + // ); + let (matched_filter, iq) = FirFilter::<_, _, _, DECIMATION>::new_decimating( iq, - Fir::::root_raised_cosine(4 * SAMPLE_PER_SYMBOL, 0.5, SAMPLE_PER_SYMBOL) - .normalized_sqr(), + Fir::>::gaussian(SAMPLE_PER_SYMBOL, 3.).normalized_sqr() ); + let (splitter, [iq_i, iq_q]) = Splitter::new(iq); + let (proj_i, i) = Map::new(iq_i, |x| x.re); + let (proj_q, q) = Map::new(iq_q, |x| x.im); + + let mut tags = Tags::default(); + + let elg_filter = Fir::proportional_integral(100, 0.2, 0.002); + let i_key = tags.allocate_tag("i tag"); + let q_key = tags.allocate_tag("q tag"); + let (elg_i, i) = EarlyLateGate::new(i, elg_filter.clone(), SAMPLE_PER_SYMBOL / DECIMATION, i_key.clone()); + let (elg_q, q) = EarlyLateGate::new(q, elg_filter.clone(), SAMPLE_PER_SYMBOL / DECIMATION, q_key.clone()); + let (eye_i_tx, eye_i_rx) = std::sync::mpsc::channel::>(); let (eye_q_tx, eye_q_rx) = std::sync::mpsc::channel::>(); + + let (merger, iq) = Merger::new([i, q]); + let (constellation_tx, constellation_rx) = std::sync::mpsc::channel(); - // let (debug, iq) = Scan::new( - // iq, - // ( - // HistoryBuf::new(0., SAMPLE_PER_SYMBOL * 2), - // HistoryBuf::new(0., SAMPLE_PER_SYMBOL * 2), - // 0usize, - // ), - // move |(buf_i, buf_q, counter), x| { - // buf_i.push(x.re); - // buf_q.push(x.im); - // let _ = constellation_tx.send(x); - // *counter += 1; - // if *counter >= SAMPLE_PER_SYMBOL * 2 - // { - // let _ = eye_i_tx.send(buf_i.as_slice().iter().copied().collect::>()); - // let _ = eye_q_tx.send(buf_q.as_slice().iter().copied().collect::>()); - // *counter = 0; - // } - // x - // }, - // ); - let tx_sink = TxSink::new(iq, constellation_tx); + let (debug, iq) = ScanTagged::new( + iq, + ( + HistoryBuf::new(0., (SAMPLE_PER_SYMBOL * 2) / DECIMATION), + HistoryBuf::new(0., (SAMPLE_PER_SYMBOL * 2) / DECIMATION), + ), + move |(buf_i, buf_q), input| { + let ([re, im], tag) = input.into(); + buf_i.push(re); + buf_q.push(im); + + if tag.is_some_and(|t| t.retrieve(&i_key).is_some()) + { + let _ = constellation_tx.send(Complex::new(re, im)); + let _ = eye_i_tx.send(buf_i.as_slice().iter().copied().collect::>()); + let _ = eye_q_tx.send(buf_q.as_slice().iter().copied().collect::>()); + } + //(Complex::new(re, im), None).into() + let k: Tagged<()> = ((), None).into(); + k + }, + ); + let tx_sink = NullSink::new(iq); let host = cpal::default_host(); let device = host @@ -239,9 +274,7 @@ pub fn demodulator() move |data: &[f32], _: &cpal::InputCallbackInfo| { for x in data { - //let _ = audio_tx.send(*x * 10.); - let _ = audio_tx.send(random::()); - //let _ = audio_tx.send(Complex::new(random::(), random::())); + let _ = audio_tx.send(*x * 100.); } }, move |_err| {}, @@ -252,17 +285,23 @@ pub fn demodulator() let graph = flowgraph![ rx_source, downconverter, - // agc, + agc, matched_filter, - //debug, - //null_sink + splitter, + proj_i, + proj_q, + elg_i, + elg_q, + merger, + debug, tx_sink ]; graph.run(6); let mut eye_i_history = HistoryBuf::new(vec![], 200); let mut eye_q_history = HistoryBuf::new(vec![], 200); - let mut constellation = HistoryBuf::new(Complex::new(0., 0.), 5000); + let mut constellation = HistoryBuf::new(Complex::new(0., 0.), 500); + let mut agc_error = HistoryBuf::new(0., 50_000); eframe::run_simple_native("Window", Default::default(), move |ctx, _frame| { for eye in eye_i_rx.try_iter().take(200) { @@ -272,10 +311,14 @@ pub fn demodulator() { eye_q_history.push(eye); } - for point in constellation_rx.try_iter().take(5000) + for point in constellation_rx.try_iter().take(500) { constellation.push(point); } + for x in agc_error_rx.try_iter().take(5000) + { + agc_error.push(x); + } egui::CentralPanel::default().show(ctx, |ui| { egui_plot::Plot::new("plot") @@ -293,6 +336,19 @@ pub fn demodulator() .color(Color32::YELLOW.gamma_multiply_u8(70)), ); + plot_ui.line( + Line::new( + "AGC Error", + agc_error + .as_slice() + .iter() + .enumerate() + .map(|(i, point)| [map(i as f64, 0., 50_000., -2., 2.), *point as f64 + 2.]) + .collect::(), + ) + .color(Color32::YELLOW.gamma_multiply_u8(70)), + ); + for (eye_i, eye_q) in eye_i_history .as_slice() .iter() @@ -305,7 +361,7 @@ pub fn demodulator() .iter() .enumerate() .map(|(i, x)| { - [i as f64 / (SAMPLE_PER_SYMBOL as f64 * 2.) + 1., *x as f64] + [i as f64 / ((SAMPLE_PER_SYMBOL as f64 * 2.) / DECIMATION as f64) + 1., *x as f64] }) .collect::>(), ) @@ -319,27 +375,27 @@ pub fn demodulator() .iter() .enumerate() .map(|(i, x)| { - [*x as f64, i as f64 / (SAMPLE_PER_SYMBOL as f64 * 2.) - 2.] + [*x as f64, i as f64 / ((SAMPLE_PER_SYMBOL as f64 * 2.) / DECIMATION as f64) - 2.] }) .collect::>(), ) .color(Color32::GREEN), ); - plot_ui.points( - Points::new( - "Constellation", - eye_i - .iter() - .zip(eye_q.iter()) - .skip(SAMPLE_PER_SYMBOL / 2) - .step_by(SAMPLE_PER_SYMBOL) - .map(|(i, q)| [*i as f64, *q as f64]) - .collect::(), - ) - .color(Color32::GREEN) - .radius(1.5), - ); + // plot_ui.points( + // Points::new( + // "Constellation", + // eye_i + // .iter() + // .zip(eye_q.iter()) + // .skip(SAMPLE_PER_SYMBOL / DECIMATION) + // .step_by(SAMPLE_PER_SYMBOL / DECIMATION) + // .map(|(i, q)| [*i as f64, *q as f64]) + // .collect::(), + // ) + // .color(Color32::GREEN) + // .radius(1.5), + // ); } }); }); diff --git a/examples/simple/Cargo.toml b/examples/simple/Cargo.toml new file mode 100644 index 0000000..7770956 --- /dev/null +++ b/examples/simple/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "simple" +version = "0.1.0" +edition = "2024" + +[dependencies] +oxydsp-flowgraph = {path = "../../oxydsp-flowgraph/"} +oxydsp-dsp = {path = "../../oxydsp-dsp/"} diff --git a/examples/simple/src/main.rs b/examples/simple/src/main.rs new file mode 100644 index 0000000..fab65e2 --- /dev/null +++ b/examples/simple/src/main.rs @@ -0,0 +1,31 @@ +use std::time::Duration; + +use oxydsp_dsp::blocks::utilities::adapters::NullSink; +use oxydsp_dsp::blocks::utilities::adapters::Scan; +use oxydsp_dsp::blocks::utilities::channels::RxSource; +use oxydsp_flowgraph::flowgraph; + +fn main() +{ + let (tx, rx) = std::sync::mpsc::channel(); + let (rx_source, numbers) = RxSource::new(rx); + let (inspect, numbers) = Scan::new(numbers, 0, |state, x: usize| { + if x.is_multiple_of(100) + { + println!("{}", x); + } + x + }); + let null_sink = NullSink::new(numbers); + let graph = flowgraph![rx_source, inspect, null_sink]; + + std::thread::spawn(move || { + let mut x = 0usize; + loop + { + let _ = tx.send(x); + x += 1; + } + }); + graph.run(1).join(); +} diff --git a/oxydsp-dsp/src/blocks/filtering/fir.rs b/oxydsp-dsp/src/blocks/filtering/fir.rs index 48d7452..d7aafb3 100644 --- a/oxydsp-dsp/src/blocks/filtering/fir.rs +++ b/oxydsp-dsp/src/blocks/filtering/fir.rs @@ -4,6 +4,7 @@ use oxydsp_flowgraph::block::Block; use oxydsp_flowgraph::block::BlockResult; use oxydsp_flowgraph::io::In; use oxydsp_flowgraph::io::Out; +use oxydsp_flowgraph::tag::Tag; use std::iter::Sum; use std::ops::Add; use std::ops::Mul; @@ -11,7 +12,7 @@ use std::ops::Mul; use crate::filtering::fir::Fir; #[derive(BlockIO)] -pub struct FirFilter +pub struct FirFilter where T: Clone + Zero + 'static, F: Mul + Clone + 'static, @@ -26,7 +27,7 @@ where filter: crate::filtering::fir::FirFilter, } -impl FirFilter +impl FirFilter where T: Clone + Zero + 'static, F: Mul + Clone + 'static, @@ -34,6 +35,19 @@ where { pub fn new(input: In, impulse_response: Fir) -> (Self, In) { + Self::new_decimating(input, impulse_response) + } +} + +impl FirFilter +where + T: Clone + Zero + 'static, + F: Mul + Clone + 'static, + O: Add + Sum + Clone + Zero, +{ + pub fn new_decimating(input: In, impulse_response: Fir) -> (Self, In) + { + const { assert!(D != 0); }; let (output, filtered) = oxydsp_flowgraph::io::stream(); ( Self { @@ -46,15 +60,35 @@ where } } -impl Block for FirFilter +impl Block for FirFilter where T: Clone + Zero, F: Mul + Clone + 'static, O: Add + Sum + Clone + Zero, { - fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult + fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - self.output.push_iter(self.input.iter().map(|x| (self.filter.next(x.0), x.1).into())); + if D > 1 + { + let mut input_iter = self.input.iter(); + let mut output_pusher = self.output.write_push(); + while input_iter.len() >= D && output_pusher.len() > 0 + { + // TODO: Maybe find a better way to do this. + // I hope this is at least well optimized by the compilator + let batch: [_; D] = std::array::from_fn(|_| input_iter.next().unwrap()); + let tag_batch: [_; D] = std::array::from_fn(|i| &batch[i].1); + let data_batch: [_; D] = std::array::from_fn(|i| batch[i].0.clone()); + self.filter.insert_batch_ref(&data_batch); + let _ = output_pusher + .push((self.filter.filtered(), Tag::from_tag_opts(tag_batch.into_iter())).into()); + } + } + else if D == 1 + { + self.output.push_iter(self.input.iter().map(|x| (self.filter.next(x.0), x.1).into())); + } + BlockResult::Ok } } diff --git a/oxydsp-dsp/src/blocks/math/basic.rs b/oxydsp-dsp/src/blocks/math/basic.rs index f37c62e..8696952 100644 --- a/oxydsp-dsp/src/blocks/math/basic.rs +++ b/oxydsp-dsp/src/blocks/math/basic.rs @@ -58,7 +58,7 @@ where .iter() .zip(self.input_b.iter()) .map(|(x, y)| { - (x.0 + y.0, Tag::from_tag_opts(&[&x.1, &y.1])).into() + (x.0 + y.0, Tag::from_tag_opts([&x.1, &y.1].into_iter())).into() }) ); BlockResult::Ok @@ -115,7 +115,7 @@ where .iter() .zip(self.input_b.iter()) .map(|(x, y)| { - (x.0 * y.0, Tag::from_tag_opts(&[&x.1, &y.1])).into() + (x.0 * y.0, Tag::from_tag_opts([&x.1, &y.1].into_iter())).into() }) ); BlockResult::Ok diff --git a/oxydsp-dsp/src/blocks/utilities/adapters.rs b/oxydsp-dsp/src/blocks/utilities/adapters.rs index 6ec408d..e7f7b7a 100644 --- a/oxydsp-dsp/src/blocks/utilities/adapters.rs +++ b/oxydsp-dsp/src/blocks/utilities/adapters.rs @@ -1,4 +1,5 @@ use std::iter::FusedIterator; +use std::mem::MaybeUninit; use oxydsp_flowgraph::BlockIO; use oxydsp_flowgraph::block::Block; @@ -24,7 +25,7 @@ pub struct Map impl Map where - F: Fn(I) -> O, + F: FnMut(I) -> O, { pub fn new(input: In, map: F) -> (Self, In) { @@ -35,15 +36,12 @@ where impl Block for Map where - F: Fn(I) -> O, + F: FnMut(I) -> O, { fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - self.output.push_iter( - self.input - .iter() - .map(|x| ((&self.map)(x.0), x.1).into()), - ); + self.output + .push_iter(self.input.iter().map(|x| ((&mut self.map)(x.0), x.1).into())); BlockResult::Ok } } @@ -155,7 +153,7 @@ where #[derive(BlockIO)] pub struct Scan where - F: Fn(&mut S, I) -> O, + F: FnMut(&mut S, I) -> O, { #[input] input: In, @@ -170,7 +168,7 @@ where impl Scan where - F: Fn(&mut S, I) -> O, + F: FnMut(&mut S, I) -> O, { pub fn new(input: In, initial_state: S, map: F) -> (Self, In) { @@ -191,7 +189,7 @@ impl Block for Scan where I: 'static, O: 'static, - F: Fn(&mut S, I) -> O, + F: FnMut(&mut S, I) -> O, { fn work(&mut self) -> BlockResult { @@ -250,7 +248,7 @@ where self.output.push_iter(self.input.iter().map(|tagged| { let cloned_tag = tagged.1.clone(); let (output, tag) = (self.map)(&mut self.state, (tagged.0, cloned_tag).into()).into(); - (output, Tag::from_tag_opts(&[&tagged.1, &tag])).into() + (output, Tag::from_tag_opts([&tagged.1, &tag].into_iter())).into() })); BlockResult::Ok } @@ -499,3 +497,122 @@ where BlockResult::Ok } } + +#[derive(BlockIO)] +pub struct Splitter +{ + #[input] + input: In, + + #[output] + outputs: [Out; N], +} + +impl Splitter +{ + pub fn new(input: In) -> (Self, [In; N]) + { + const { assert!(N > 0) } + let (outs, ins) = oxydsp_flowgraph::io::streams(); + ( + Self { + input, + outputs: outs, + }, + ins, + ) + } +} + +impl Block for Splitter +{ + fn work(&mut self) -> BlockResult + { + let mut input_iter = self.input.iter(); + let mut outputs = self + .outputs + .iter_mut() + .map(|x| x.write_push()) + .collect::>(); + + let length = input_iter + .len() + .min(outputs.iter().map(|x| x.len()).min().unwrap()); + + for _ in 0..length + { + let pulled = input_iter.next().unwrap(); + outputs.iter_mut().for_each(|x| { + let _ = x.push(pulled.clone()); + }); + } + + BlockResult::Ok + } +} + +#[derive(BlockIO)] +pub struct Merger +{ + #[input] + inputs: [In; N], + + #[output] + output: Out<[T; N]>, +} + +impl Merger +{ + pub fn new(inputs: [In; N]) -> (Self, In<[T; N]>) + { + const { assert!(N > 0) } + let (out, inp) = oxydsp_flowgraph::io::stream(); + + ( + Self { + inputs, + output: out, + }, + inp, + ) + } +} + +impl Block for Merger +{ + fn work(&mut self) -> BlockResult + { + let mut inputs = self.inputs.iter_mut().map(|x| x.iter()).collect::>(); + let mut output = self.output.write_push(); + + let len = inputs + .len() + .min(inputs.iter().map(|x| x.len()).min().unwrap()); + + let mut datas: [_; N] = std::array::from_fn(|_| MaybeUninit::uninit()); + let mut tags: [_; N] = std::array::from_fn(|_| MaybeUninit::uninit()); + for _ in 0..len + { + for (i, (data, tag)) in inputs + .iter_mut() + .map(|x| x.next().unwrap().into()) + .enumerate() + { + datas[i] = MaybeUninit::new(data); + tags[i] = MaybeUninit::new(tag); + } + + let ok_datas: [_; N] = unsafe { + std::array::from_fn(|i| std::mem::replace(&mut datas[i], MaybeUninit::uninit()).assume_init()) + }; + let ok_tags = unsafe { + std::mem::transmute::<&[MaybeUninit>; N], &[Option; N]>(&tags) + }; + + let tag = Tag::from_tag_opts(ok_tags.into_iter()); + output.push((ok_datas, tag).into()); + } + + BlockResult::Ok + } +} diff --git a/oxydsp-dsp/src/blocks/utilities/channels.rs b/oxydsp-dsp/src/blocks/utilities/channels.rs index ac41b0a..7741853 100644 --- a/oxydsp-dsp/src/blocks/utilities/channels.rs +++ b/oxydsp-dsp/src/blocks/utilities/channels.rs @@ -1,3 +1,4 @@ +use std::fmt::Debug; use std::sync::mpsc::Receiver; use std::sync::mpsc::Sender; use std::sync::mpsc::SyncSender; @@ -44,12 +45,11 @@ impl TxSink } } -impl Block for RxSource, I> +impl Block for RxSource, I> { fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - self - .output + self.output .push_iter(self.input.try_iter().map(|x| (x, None).into())); BlockResult::Ok } diff --git a/oxydsp-dsp/src/filtering/fir.rs b/oxydsp-dsp/src/filtering/fir.rs index bef6a4e..079b9af 100644 --- a/oxydsp-dsp/src/filtering/fir.rs +++ b/oxydsp-dsp/src/filtering/fir.rs @@ -1,6 +1,7 @@ use num::Complex; use num::Float; use num::FromPrimitive; +use num::Num; use num::One; use num::Zero; use num::complex::ComplexFloat; @@ -16,7 +17,6 @@ use crate::filtering::history_buf::HistoryBuf; use crate::map; use crate::units::DigitalFrequency; - /// Represents a finite impulse response as a vector /// of values in time. /// @@ -26,8 +26,33 @@ use crate::units::DigitalFrequency; /// /// For a reverb ir for example the clap would be at index 0 /// and the reverb tail towards the end of the vector. +#[derive(Clone)] pub struct Fir(pub Vec); +impl Fir +where + T: Num + Clone + Sum, +{ + pub fn convoluted_with(&self, other: &Fir) -> Fir + { + // Perform convolution + let mut new_fir = vec![]; + let mut filter = FirFilter::::new(self.clone()); + + for x in self.0.iter().rev() + { + new_fir.push(filter.next(x.clone())); + } + + for _ in 0..other.0.len() + { + new_fir.push(filter.next(T::zero())); + } + + Fir(new_fir) + } +} + impl Fir> where T: FftNum + Float + Clone, @@ -205,6 +230,32 @@ where .collect(), ) } + + /// Creates a gaussion fir of length `length` + /// The maximum amplitude of the fir is 1. + /// The gaussian curve is centered in the fir. + /// The parameter `standard_deviations` specifies how many multiples of the + /// standard deviations is on the left and right of the center before the fir cuts it off. + pub fn gaussian(length: usize, standard_deviations: f32) -> Self + { + Self( + (0..length) + .map(|x| { + let t = map( + x as f64, + 0., + length as f64, + -standard_deviations as f64, + standard_deviations as f64, + ); + + // Gaussian with sd=1 + let sq = t / 2.; + T::from_f64(-sq * sq).unwrap().exp() + }) + .collect(), + ) + } } /// A simple convolutional finite impulse response filter @@ -215,7 +266,7 @@ where { fir: Vec, //taps: VecDeque, - taps: HistoryBuf + taps: HistoryBuf, } impl FirFilter @@ -234,14 +285,34 @@ where } } - /// Gets the next output given an input sample. + /// Inserts a new sample in the filter and get its new value /// /// At the beginning, the delay line starts with zeroes. pub fn next(&mut self, input: T) -> O + { + self.insert(input); + self.filtered() + } + + pub fn insert(&mut self, input: T) { self.taps.push(input); + } + + pub fn insert_batch_ref(&mut self, input: &[T]) + { + for x in input.iter().cloned() + { + self.taps.push(x); + } + } + + /// Gets the current value of the filter + /// given the sampels that it currently holds + pub fn filtered(&self) -> O + { let taps = self.taps.as_slice(); - Self::dot_prod(&self.fir, taps) + Self::dot_prod(&self.fir, taps) } pub fn dot_prod(a: &[F], b: &[T]) -> O @@ -254,12 +325,11 @@ where let (b_chunks, b_remainder) = b.as_chunks::<4>(); for (x, y) in a_chunks.iter().zip(b_chunks.iter()) - { - sum[0] = sum[0].clone() + x[0].clone() * y[0].clone(); - sum[1] = sum[1].clone() + x[1].clone() * y[1].clone(); - sum[2] = sum[2].clone() + x[2].clone() * y[2].clone(); - sum[3] = sum[3].clone() + x[3].clone() * y[3].clone(); + sum[0] = sum[0].clone() + x[0].clone() * y[0].clone(); + sum[1] = sum[1].clone() + x[1].clone() * y[1].clone(); + sum[2] = sum[2].clone() + x[2].clone() * y[2].clone(); + sum[3] = sum[3].clone() + x[3].clone() * y[3].clone(); } let mut sum = sum[0].clone() + sum[1].clone() + sum[2].clone() + sum[3].clone(); diff --git a/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs index 2d39c24..b4a9cff 100644 --- a/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs +++ b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs @@ -141,9 +141,10 @@ fn block_io_get_meta(ident: zyn::syn::Ident, fields: zyn::syn::Fields) -> zyn::T fn get_output_type_names(&self) -> Vec<&'static str> { let mut output = Vec::new(); + use oxydsp_flowgraph::block::BlockOutput; @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) { - output.push(self.{{ field.1.ident.clone() }}.get_type_name()); + output.extend(self.{{ field.1.ident.clone() }}.get_type_names()); } return output; } diff --git a/oxydsp-flowgraph/src/block.rs b/oxydsp-flowgraph/src/block.rs index af7d213..c181311 100644 --- a/oxydsp-flowgraph/src/block.rs +++ b/oxydsp-flowgraph/src/block.rs @@ -23,7 +23,7 @@ pub trait BlockInput fn get_inputs(&self) -> Vec<&dyn AnonymousIn>; // Meta information - fn get_types_names(&self) -> Vec<&'static str>; + fn get_type_names(&self) -> Vec<&'static str>; } pub trait BlockOutput @@ -32,7 +32,7 @@ pub trait BlockOutput fn get_outputs(&self) -> Vec<&dyn AnonymousOut>; // Meta information - fn get_types_names(&self) -> Vec<&'static str>; + fn get_type_names(&self) -> Vec<&'static str>; } pub trait BlockIO @@ -114,7 +114,7 @@ impl BlockInput for In vec![self] } - fn get_types_names(&self) -> Vec<&'static str> + fn get_type_names(&self) -> Vec<&'static str> { vec![std::any::type_name::()] } @@ -146,11 +146,11 @@ impl BlockInput for Option } } - fn get_types_names(&self) -> Vec<&'static str> + fn get_type_names(&self) -> Vec<&'static str> { if let Some(input) = self { - input.get_types_names() + input.get_type_names() } else { @@ -181,7 +181,7 @@ impl BlockInput for [I; N] output } - fn get_types_names(&self) -> Vec<&'static str> + fn get_type_names(&self) -> Vec<&'static str> { vec![std::any::type_name::(); N] } @@ -199,7 +199,7 @@ impl BlockOutput for Out vec![self] } - fn get_types_names(&self) -> Vec<&'static str> + fn get_type_names(&self) -> Vec<&'static str> { vec![std::any::type_name::()] } @@ -231,11 +231,11 @@ impl BlockOutput for Option } } - fn get_types_names(&self) -> Vec<&'static str> + fn get_type_names(&self) -> Vec<&'static str> { if let Some(input) = self { - input.get_types_names() + input.get_type_names() } else { @@ -266,7 +266,7 @@ impl BlockOutput for [I; N] result } - fn get_types_names(&self) -> Vec<&'static str> + fn get_type_names(&self) -> Vec<&'static str> { vec![std::any::type_name::(); N] } diff --git a/oxydsp-flowgraph/src/io.rs b/oxydsp-flowgraph/src/io.rs index 18895d1..4f6e058 100644 --- a/oxydsp-flowgraph/src/io.rs +++ b/oxydsp-flowgraph/src/io.rs @@ -1,3 +1,4 @@ +use std::any::Any; use std::mem::ManuallyDrop; use std::mem::MaybeUninit; use std::sync::Arc; @@ -48,7 +49,7 @@ pub struct OutPush<'a, T> start_index: usize, } -impl<'a, T> OutPush<'a, T> +impl<'a, T: 'static> OutPush<'a, T> { pub fn len(&self) -> usize { @@ -68,6 +69,7 @@ impl<'a, T> OutPush<'a, T> pub fn push(&mut self, data: Tagged) -> Result<(), Tagged> { + // println!("\n\n\n"); if self.written_data >= self.total_length { return Err(data); @@ -120,10 +122,12 @@ impl<'a, T> Drop for OutPush<'a, T> { fn drop(&mut self) { - let data_writer = + let mut data_writer = unsafe { ManuallyDrop::>::take(&mut self.data_writer) }; - let tag_writer = + let mut tag_writer = unsafe { ManuallyDrop::>::take(&mut self.tag_writer) }; + + tag_writer.produce(self.written_tags); data_writer.produce(self.written_data); } @@ -216,9 +220,9 @@ impl<'a, T> Drop for InIter<'a, T> { fn drop(&mut self) { - let data_reader = + let mut data_reader = unsafe { ManuallyDrop::>::take(&mut self.data_reader) }; - let tag_reader = + let mut tag_reader = unsafe { ManuallyDrop::>::take(&mut self.tag_reader) }; tag_reader.consume(self.read_tags); data_reader.consume(self.read_data); @@ -290,7 +294,9 @@ impl Out len -= 1; match iter.next() { - Some(element) => {let _ = pusher.push(element); }, + Some(element) => { + let _ = pusher.push(element); + }, None => return false, } } diff --git a/oxydsp-flowgraph/src/stream.rs b/oxydsp-flowgraph/src/stream.rs index 09e66d5..00534de 100644 --- a/oxydsp-flowgraph/src/stream.rs +++ b/oxydsp-flowgraph/src/stream.rs @@ -124,7 +124,7 @@ pub struct StreamReader<'a, T> slices: (&'a mut [Takable], &'a mut [Takable]), // UNSAFE ! - consumer: &'a mut StreamConsumer, + inner: &'a mut StreamConsumer, } pub struct StreamWriter<'a, T> @@ -132,7 +132,7 @@ pub struct StreamWriter<'a, T> slices: (&'a mut [MaybeUninit], &'a mut [MaybeUninit]), // UNSAFE ! - producer: &'a mut StreamProducer, + inner: &'a mut StreamProducer, } impl<'a, T> StreamReader<'a, T> @@ -147,9 +147,9 @@ impl<'a, T> StreamReader<'a, T> (self.slices.0, self.slices.1) } - pub fn consume(self, amount: usize) + pub fn consume(&mut self, read: usize) { - self.consumer.consume(amount); + self.inner.consume(read); } } @@ -165,9 +165,9 @@ impl<'a, T> StreamWriter<'a, T> (self.slices.0, self.slices.1) } - pub fn produce(self, amount: usize) + pub fn produce(&mut self, written: usize) { - self.producer.produce(amount); + self.inner.produce(written); } } @@ -228,8 +228,8 @@ impl StreamProducer // This functions borrows the stream mutably. As such, only one instance // of these slices can exist for a given stream. StreamWriter { - slices: (start_to_head, head_to_end), - producer: self, + slices: (head_to_end, start_to_head), + inner: self, } } } @@ -262,7 +262,7 @@ impl StreamProducer StreamWriter { slices: (head_to_end, start_to_tail), - producer: self, + inner: self, } } } @@ -292,12 +292,13 @@ impl StreamProducer unsafe { let k = &mut *self.inner.buffer.get(); let (start_to_tail, _tail_to_end) = k.split_at_mut_unchecked(wrapped_tail); - let (_start_to_head, head_to_tail) = start_to_tail.split_at_mut_unchecked(wrapped_head); + let (_start_to_head, head_to_tail) = + start_to_tail.split_at_mut_unchecked(wrapped_head); let (empty_slice, head_to_tail) = head_to_tail.split_at_mut_unchecked(0); StreamWriter { slices: (head_to_tail, empty_slice), - producer: self, + inner: self, } } } @@ -349,7 +350,7 @@ impl StreamConsumer takable_slice_from_maybe_uninitt(empty_1), takable_slice_from_maybe_uninitt(empty_2), ), - consumer: self, + inner: self, } } } @@ -383,7 +384,7 @@ impl StreamConsumer takable_slice_from_maybe_uninitt(tail_to_head), takable_slice_from_maybe_uninitt(empty_slice), ), - consumer: self, + inner: self, } } } @@ -424,7 +425,7 @@ impl StreamConsumer takable_slice_from_maybe_uninitt(tail_to_end), takable_slice_from_maybe_uninitt(start_to_head), ), - consumer: self, + inner: self, } } } @@ -505,8 +506,7 @@ mod test assert_eq!(a.len(), 3); assert_eq!(b.len(), 0); - unsafe - { + unsafe { assert_eq!(a[0].take(), 0); assert_eq!(a[1].take(), 1); assert_eq!(a[2].take(), 2); diff --git a/oxydsp-flowgraph/src/tag.rs b/oxydsp-flowgraph/src/tag.rs index 409b44a..815c65e 100644 --- a/oxydsp-flowgraph/src/tag.rs +++ b/oxydsp-flowgraph/src/tag.rs @@ -197,24 +197,29 @@ impl Tag /// /// If all the tag options are None, None is returned /// Otherwise it is Some of the combination of all of the tags which are Some - pub fn from_tag_opts(tag_opts: &[&Option; N]) -> Option + pub fn from_tag_opts<'a>(mut tag_opts: impl Iterator>) -> Option { - if tag_opts.iter().all(|t| t.is_none()) - { - return None; - } let new_tag = Self::default(); + let mut some_tags = 0; { let mut writer = new_tag.data.write().unwrap(); - for tag in tag_opts.iter().filter(|t| t.is_some()) + for tag in tag_opts.filter(|t| t.is_some()) { + some_tags += 1; let reader = tag.as_ref().unwrap().data.read().unwrap(); writer.extend(reader.iter().map(|x| (*x.0, x.1.clone()))); } } - Some(new_tag) + if some_tags > 0 + { + Some(new_tag) + } + else + { + None + } } /// Adds a new entry in the tag. If it already exists, it is overwritten @@ -264,7 +269,7 @@ impl TagMergable> for Option { fn merge(&self, other: &Self) -> Self { - Tag::from_tag_opts(&[self, other]) + Tag::from_tag_opts([self, other].into_iter()) } }