From f1f769e0e6e40cc1eb19066063e267d8d935912e Mon Sep 17 00:00:00 2001 From: Albin Chaboissier Date: Sun, 22 Mar 2026 13:29:06 +0100 Subject: [PATCH] Starting fsk demod --- example/mod.wav | Bin 265446 -> 391534 bytes example/out.dot | 6 +- example/src/main.rs | 157 +++++- oxydsp-dsp/src/blocks.rs | 2 + oxydsp-dsp/src/blocks/filtering.rs | 1 + oxydsp-dsp/src/blocks/filtering/fir.rs | 58 ++ oxydsp-dsp/src/blocks/math/basic.rs | 2 +- oxydsp-dsp/src/blocks/synthesis.rs | 5 +- oxydsp-dsp/src/blocks/ted.rs | 1 + oxydsp-dsp/src/blocks/ted/early_late.rs | 111 ++++ oxydsp-dsp/src/blocks/utilities/adapters.rs | 191 ++++++- oxydsp-dsp/src/blocks/utilities/channels.rs | 4 +- oxydsp-dsp/src/blocks/utilities/iter.rs | 47 +- oxydsp-dsp/src/filtering.rs | 1 + oxydsp-dsp/src/filtering/fir.rs | 120 +++++ oxydsp-dsp/src/lib.rs | 1 + oxydsp-dsp/src/units.rs | 3 +- .../oxydsp-flowgraph-macros/src/sync.rs | 500 +++++++++++------- oxydsp-flowgraph/src/block.rs | 4 +- oxydsp-flowgraph/src/event.rs | 5 + oxydsp-flowgraph/src/graph.rs | 11 +- oxydsp-flowgraph/src/io.rs | 22 +- oxydsp-flowgraph/src/lib.rs | 4 +- oxydsp-flowgraph/src/tag.rs | 39 +- 24 files changed, 1050 insertions(+), 245 deletions(-) create mode 100644 oxydsp-dsp/src/blocks/filtering.rs create mode 100644 oxydsp-dsp/src/blocks/filtering/fir.rs create mode 100644 oxydsp-dsp/src/blocks/ted.rs create mode 100644 oxydsp-dsp/src/blocks/ted/early_late.rs create mode 100644 oxydsp-dsp/src/filtering.rs create mode 100644 oxydsp-dsp/src/filtering/fir.rs create mode 100644 oxydsp-flowgraph/src/event.rs diff --git a/example/mod.wav b/example/mod.wav index 3b6132ae6d9336bf7c3db9430c8a58d3f88f4072..2a6cc4603713099df56a4a56e99baa3955a7041b 100644 GIT binary patch delta 692 zcmYjPJxBvF7$r%sXThOPI*Hmht)Pw$ojTlP5Dv+p z$Sn>!IEwC4w;O~G9R%N(q@_#XUh=)a?}oSa@^>-b-EOrG#?ei@Dw^ZyB55bAo$z)d z9+JxAydAFbjb_WarY!LB8sSvro$9vg@j{YWI$L>&THaN)35l=-{E^)Y8Ax$VN zp2<3|=4N;DZOd!&s(;98)+x1~@$#iov}e&Gey z1aVjf>K7tE-0VJ8ub=RT1UujiuES>Cp1=_{WaTl9pa>FhdvGJ(k=}Hy^u+grjAW6j F_y>CX4k-Wt delta 37 tcmaF&N&MMN0rntIH@AB{EECxknRoH9G-|hMGq!3owQ4hO)n?hv0|5LC3zh%? diff --git a/example/out.dot b/example/out.dot index ed996bf..a311915 100644 --- a/example/out.dot +++ b/example/out.dot @@ -8,14 +8,16 @@ Repeat_2 [label="{ { input}| Repeat |{ output} }"]; Nco_3 [label="{ { frequency}| Nco |{ output} }"]; OscillatorSource_4 [label="{ OscillatorSource |{ output} }"]; Multiplier_5 [label="{ { input_a| input_b}| Multiplier |{ output} }"]; -TxSink_6 [label="{ { input}| TxSink }"]; +MapResultTagged_6 [label="{ { input}| MapResultTagged |{ output} }"]; +NullSink_7 [label="{ { input}| NullSink }"]; IterSource_0:o0 -> Map_1:i0 [label="bool"]; Map_1:o0 -> Repeat_2:i0 [label="oxydsp_dsp::units::DigitalFrequency"]; Repeat_2:o0 -> Nco_3:i0 [label="oxydsp_dsp::units::DigitalFrequency"]; Nco_3:o0 -> Multiplier_5:i0 [label="num_complex::Complex"]; OscillatorSource_4:o0 -> Multiplier_5:i1 [label="num_complex::Complex"]; -Multiplier_5:o0 -> TxSink_6:i0 [label="num_complex::Complex"]; +Multiplier_5:o0 -> MapResultTagged_6:i0 [label="num_complex::Complex"]; +MapResultTagged_6:o0 -> NullSink_7:i0 [label="num_complex::Complex"]; } \ No newline at end of file diff --git a/example/src/main.rs b/example/src/main.rs index d7ffa97..73317db 100644 --- a/example/src/main.rs +++ b/example/src/main.rs @@ -4,17 +4,30 @@ use std::io::Write; use std::sync::mpsc; use eframe::NativeOptions; +use egui::Color32; use egui_plot::Line; +use egui_plot::MarkerShape; use egui_plot::PlotPoints; +use egui_plot::Points; +use egui_plot::Polygon; +use egui_plot::VLine; use num::Complex; +use num::Zero; +use oxydsp_dsp::blocks::filtering::fir::FirFilter; use oxydsp_dsp::blocks::math::basic::Adder; use oxydsp_dsp::blocks::math::basic::Multiplier; use oxydsp_dsp::blocks::synthesis::Nco; use oxydsp_dsp::blocks::synthesis::OscillatorSource; +use oxydsp_dsp::blocks::ted::early_late::EarlyLateGate; use oxydsp_dsp::blocks::utilities::adapters::Map; +use oxydsp_dsp::blocks::utilities::adapters::MapResult; +use oxydsp_dsp::blocks::utilities::adapters::MapResultTagged; +use oxydsp_dsp::blocks::utilities::adapters::NullSink; use oxydsp_dsp::blocks::utilities::adapters::Repeat; +use oxydsp_dsp::blocks::utilities::adapters::Scan; use oxydsp_dsp::blocks::utilities::channels::TxSink; use oxydsp_dsp::blocks::utilities::iter::IterSource; +use oxydsp_dsp::filtering::fir::Fir; use oxydsp_dsp::units::DigitalFrequency; use oxydsp_flowgraph::BlockIO; use oxydsp_flowgraph::block::Block; @@ -170,6 +183,11 @@ impl Printer } } +fn main() +{ + main_demod() +} + fn main_tst() { let (sourcea, a) = SourceTag::new("valuea".to_string()); @@ -181,22 +199,140 @@ fn main_tst() let _ = fg.run().join(); } -fn main() -{ - let sample_rate = 48_000; - let sample_per_symbol = 96; - let deviation = DigitalFrequency::from_time_frequency(500., sample_rate as f64); - let carrier = DigitalFrequency::from_time_frequency(1000., sample_rate as f64); +const SAMPLE_RATE: usize = 48_000; +const SAMPLE_PER_SYMBOL: usize = 96; +const DEVIATION: f64 = 500.; +const CARRIER: f64 = 1000.; +fn main_demod() +{ + let carrier = DigitalFrequency::from_time_frequency(CARRIER, SAMPLE_RATE as f64); + + let mut reader = hound::WavReader::open("mod.wav").unwrap(); + let sqr_sum = reader + .samples::() + .map(|sample| (sample.unwrap() as f32) / (i16::MAX as f32)) + .collect::>(); + let (iter_source, signal) = IterSource::new(sqr_sum.into_iter()); + + // Make an iq sampler + let (lo, lo_signal) = OscillatorSource::new(carrier.into()); + let (mixer, iq) = Multiplier::new(signal, lo_signal); + let (iq_bandpass, iq) = FirFilter::, Complex, Complex>::new( + iq, + Fir::lowpass(carrier, 100).normalized(), + ); + let (arg_extract, arg) = Scan::new(iq, Complex::zero(), |state, sample| { + let angle = *state / sample; + *state = sample; + angle.arg() + }); + let (sig_lowpass, arg) = + FirFilter::::new(arg, Fir(vec![1.; SAMPLE_PER_SYMBOL]).normalized()); + let (tx, rx) = mpsc::channel(); + + let elg_loop = Fir(vec![1.0f32; 10]); + let mut elg_loop = elg_loop.normalized(); + elg_loop.0[0] = 0.5; + let (elg, arg) = EarlyLateGate::new(arg, elg_loop, SAMPLE_PER_SYMBOL); + let (sender, arg) = MapResultTagged::new(arg, move |x| { + let _ = tx.send(( + x.0, + x.1.as_ref() + .is_some_and(|t| t.retrieve("elg_symbol").is_some()), + )); + if x.1 + .is_some_and(|t| t.retrieve("itersource_finished").is_some()) + { + println!("FINISHED !"); + (x.0.into(), BlockResult::Exit) + } + else + { + (x.0.into(), BlockResult::Ok) + } + }); + let null_sink = NullSink::new(arg); + + let graph = flowgraph![ + iter_source, + lo, + mixer, + iq_bandpass, + arg_extract, + sig_lowpass, + elg, + sender, + null_sink + ]; + let j = graph.run(); + + let mut output = vec![]; + while let Ok(x) = rx.recv() + { + output.push(x); + } + let _ = j.join(); + + eframe::run_simple_native("Plot", NativeOptions::default(), move |ctx, _frame| { + egui::CentralPanel::default().show(ctx, |ui| { + egui_plot::Plot::new("hello").show(ui, |plot_ui| { + plot_ui.line(Line::new( + "samples", + output + .iter() + .enumerate() + .map(|(i, s)| [i as f64, s.0 as f64]) + .collect::(), + )); + + plot_ui.points( + Points::new( + "symbols", + output + .iter() + .enumerate() + .filter(|(_, (_, x))| *x) + .map(|(i, (s, _))| [i as f64, *s as f64]) + .collect::>(), + ) + .id("symbols") + .radius(5.) + .shape(MarkerShape::Diamond), + ); + }); + ctx.request_repaint(); + }); + }) + .unwrap(); +} + +fn main_mod() +{ + let carrier = DigitalFrequency::from_time_frequency(CARRIER, SAMPLE_RATE as f64); + let deviation = DigitalFrequency::from_time_frequency(DEVIATION, SAMPLE_RATE as f64); let data = (0..255u8).flat_map(to_bits).collect::>(); let (bit_stream, bits) = IterSource::new(data.into_iter()); let (to_freq, freq) = Map::new(bits, move |x| [-deviation, deviation][x as usize]); - let (repeat, freq) = Repeat::new(freq, sample_per_symbol); + let (repeat, freq) = Repeat::new(freq, SAMPLE_PER_SYMBOL); let (base_oscillator, baseband) = Nco::::new(freq); let (local_oscillator, lo) = OscillatorSource::::new(carrier.into()); let (frontend, passband) = Multiplier::new(baseband, lo); let (tx, rx) = mpsc::channel::>(); - let sink = TxSink::new(passband, tx); + let (sender, passband) = MapResultTagged::new(passband, move |x| { + let _ = tx.send(x.0); + if x.1 + .is_some_and(|t| t.retrieve("itersource_finished").is_some()) + { + println!("FINISHED !"); + (x.0.into(), BlockResult::Exit) + } + else + { + (x.0.into(), BlockResult::Ok) + } + }); + let null_sink = NullSink::new(passband); let graph = flowgraph![ bit_stream, @@ -205,7 +341,8 @@ fn main() base_oscillator, local_oscillator, frontend, - sink, + sender, + null_sink, ]; File::create("out.dot") .unwrap() @@ -222,7 +359,7 @@ fn main() // Write signal let spec = hound::WavSpec { channels: 1, - sample_rate, + sample_rate: SAMPLE_RATE as u32, bits_per_sample: 16, sample_format: hound::SampleFormat::Int, }; diff --git a/oxydsp-dsp/src/blocks.rs b/oxydsp-dsp/src/blocks.rs index 154ed1b..84ce1ad 100644 --- a/oxydsp-dsp/src/blocks.rs +++ b/oxydsp-dsp/src/blocks.rs @@ -1,3 +1,5 @@ +pub mod filtering; pub mod math; pub mod synthesis; +pub mod ted; pub mod utilities; diff --git a/oxydsp-dsp/src/blocks/filtering.rs b/oxydsp-dsp/src/blocks/filtering.rs new file mode 100644 index 0000000..f4a0fff --- /dev/null +++ b/oxydsp-dsp/src/blocks/filtering.rs @@ -0,0 +1 @@ +pub mod fir; diff --git a/oxydsp-dsp/src/blocks/filtering/fir.rs b/oxydsp-dsp/src/blocks/filtering/fir.rs new file mode 100644 index 0000000..1b280a8 --- /dev/null +++ b/oxydsp-dsp/src/blocks/filtering/fir.rs @@ -0,0 +1,58 @@ +use oxydsp_flowgraph::BlockIO; +use oxydsp_flowgraph::block::SyncBlock; +use oxydsp_flowgraph::io::In; +use oxydsp_flowgraph::io::Out; +use oxydsp_flowgraph::sync_block; +use std::iter::Sum; +use std::ops::Mul; + +use crate::filtering::fir::Fir; + +#[derive(BlockIO)] +#[sync_block] +pub struct FirFilter +where + T: Clone + 'static, + F: Mul + Clone + 'static, + O: Sum + 'static, +{ + #[input] + input: In, + + #[output] + output: Out, + + filter: crate::filtering::fir::FirFilter, +} + +impl FirFilter +where + T: Clone + 'static, + F: Mul + Clone + 'static, + O: Sum + 'static, +{ + pub fn new(input: In, impulse_response: Fir) -> (Self, In) + { + let (output, filtered) = oxydsp_flowgraph::io::stream(); + ( + Self { + input, + output, + filter: crate::filtering::fir::FirFilter::new(impulse_response), + }, + filtered, + ) + } +} + +impl<'view, F, T, O> SyncBlock<'view> for FirFilter +where + T: Clone + 'view, + F: Mul + Clone + 'static, + O: Sum + 'static, +{ + fn sync_work(state: Self::StateView, input: Self::Input) -> Option + { + Some(state.filter.next(input)) + } +} diff --git a/oxydsp-dsp/src/blocks/math/basic.rs b/oxydsp-dsp/src/blocks/math/basic.rs index 17557f6..8913675 100644 --- a/oxydsp-dsp/src/blocks/math/basic.rs +++ b/oxydsp-dsp/src/blocks/math/basic.rs @@ -12,7 +12,7 @@ use oxydsp_flowgraph::sync_block; use oxydsp_flowgraph::tag::TagMergable; #[derive(BlockIO)] -#[sync_block(tagged)] +#[sync_block] pub struct Adder where Ia: Add + 'static, diff --git a/oxydsp-dsp/src/blocks/synthesis.rs b/oxydsp-dsp/src/blocks/synthesis.rs index 904ca9f..5a7e812 100644 --- a/oxydsp-dsp/src/blocks/synthesis.rs +++ b/oxydsp-dsp/src/blocks/synthesis.rs @@ -31,7 +31,8 @@ impl + 'static> Block for OscillatorSource { fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - self.output.push_iter((&mut self.nco).map(|x| (x, None))); + self.output + .push_iter((&mut self.nco).map(|x| (x, None).into())); BlockResult::Ok } } @@ -82,7 +83,7 @@ impl + 'static> Block for Nco self.output .push_iter(&mut self.frequency.pop_iter().map(|f| { self.nco.set_frequency(f.0); - (self.nco.next().unwrap(), f.1) + (self.nco.next().unwrap(), f.1).into() })); BlockResult::Ok } diff --git a/oxydsp-dsp/src/blocks/ted.rs b/oxydsp-dsp/src/blocks/ted.rs new file mode 100644 index 0000000..c59b07c --- /dev/null +++ b/oxydsp-dsp/src/blocks/ted.rs @@ -0,0 +1 @@ +pub mod early_late; diff --git a/oxydsp-dsp/src/blocks/ted/early_late.rs b/oxydsp-dsp/src/blocks/ted/early_late.rs new file mode 100644 index 0000000..b363b95 --- /dev/null +++ b/oxydsp-dsp/src/blocks/ted/early_late.rs @@ -0,0 +1,111 @@ +use std::collections::VecDeque; +use std::iter::Sum; + +use num::Float; +use oxydsp_flowgraph::BlockIO; +use oxydsp_flowgraph::block::SyncBlock; +use oxydsp_flowgraph::io::In; +use oxydsp_flowgraph::io::Out; +use oxydsp_flowgraph::sync_block; +use oxydsp_flowgraph::tag::Tag; + +use crate::filtering::fir::Fir; +use crate::filtering::fir::FirFilter; + +#[derive(BlockIO)] +#[sync_block(tagged)] +pub struct EarlyLateGate +{ + #[input] + input: In, + + #[output] + output: Out, + + symbol_length: usize, + + // Window looking at symbol_length samples at a time + window: VecDeque, + + // The current location of the window, in relation to the last sample + window_location: usize, + + window_center: usize, + + // The next window location, in relation to the last sample such that the window is centered on + // a symbol center (hopefully) + next_sample: usize, + loop_filter: FirFilter, +} + +impl EarlyLateGate +where + T: Float + Sum + Clone + 'static, +{ + pub fn new(input: In, loop_filter: Fir, symbol_length: usize) -> (Self, In) + { + let (output, samples) = oxydsp_flowgraph::io::stream(); + ( + Self { + input, + output, + window: VecDeque::with_capacity(symbol_length), + symbol_length, + window_location: 0, + window_center: symbol_length / 2, + next_sample: symbol_length, // We assume that the first symbol is 1.5 windows into + // the stream + loop_filter: FirFilter::new(loop_filter), + }, + samples, + ) + } +} + +impl<'view, T> SyncBlock<'view> for EarlyLateGate +where + T: Float + Sum + Clone + 'static, +{ + fn sync_work(state: Self::StateView, input: Self::Input) -> Option + { + if state.window.len() < *state.symbol_length + { + state.window.push_back(input.0); + return Some(input.0.into()); + } + + // Bring new sample in + state.window.pop_front(); + state.window.push_back(input.0); + *state.window_location += 1; + + let sample = state.window[*state.window_center]; + let mut tag = None; + if *state.window_location >= *state.next_sample + { + let new_tag = Tag::default(); + new_tag.tag("elg_symbol", ()); + tag = Some(new_tag); + + let early_index = *state.window_center - (0.25 * *state.symbol_length as f32) as usize; + let late_index = *state.window_center + (0.25 * *state.symbol_length as f32) as usize; + + let early_sample = state.window[early_index]; + let late_sample = state.window[late_index]; + + let error = (late_sample - early_sample) * sample; + let correction = state.loop_filter.next(error); + + // Figure out next sample location + *state.next_sample += (*state.symbol_length as isize + + correction.floor().to_isize().unwrap_or(0)) + .max(0) as usize; + + // Turn everything back relative to current sample + *state.next_sample -= *state.window_location; + *state.window_location = 0; + } + + Some((sample, tag).into()) + } +} diff --git a/oxydsp-dsp/src/blocks/utilities/adapters.rs b/oxydsp-dsp/src/blocks/utilities/adapters.rs index 1a17f42..beb95d4 100644 --- a/oxydsp-dsp/src/blocks/utilities/adapters.rs +++ b/oxydsp-dsp/src/blocks/utilities/adapters.rs @@ -1,11 +1,17 @@ +use core::sync; + use oxydsp_flowgraph::BlockIO; use oxydsp_flowgraph::block::Block; use oxydsp_flowgraph::block::BlockResult; +use oxydsp_flowgraph::block::SyncBlock; use oxydsp_flowgraph::io::In; use oxydsp_flowgraph::io::Out; use oxydsp_flowgraph::io::PopIterable; use oxydsp_flowgraph::io::stream; +use oxydsp_flowgraph::sync_block; use oxydsp_flowgraph::tag::Tag; +use oxydsp_flowgraph::tag::TagMergable; +use oxydsp_flowgraph::tag::Tagged; #[derive(BlockIO)] pub struct Map @@ -36,12 +42,168 @@ where { fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - self.output - .push_iter(self.input.pop_iter().map(|x| ((&self.map)(x.0), x.1))); + self.output.push_iter( + self.input + .pop_iter() + .map(|x| ((&self.map)(x.0), x.1).into()), + ); BlockResult::Ok } } +#[derive(BlockIO)] +pub struct MapResult +{ + #[input] + input: In, + + #[output] + output: Out, + + map: F, +} + +impl MapResult +where + F: Fn(I) -> (O, BlockResult), +{ + pub fn new(input: In, map: F) -> (Self, In) + { + let (output, mapped) = stream(); + (Self { input, output, map }, mapped) + } +} + +impl Block for MapResult +where + F: Fn(I) -> (O, BlockResult), +{ + fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult + { + let writer = self.output.write(); + let reader = self.input.read(); + + for _ in 0..(writer.len().min(reader.len())) + { + let (input, tag_opt) = reader.pop().unwrap().into(); + let (output, result) = (self.map)(input); + let _ = writer.push((output, tag_opt).into()); + match result + { + BlockResult::Terminated | BlockResult::Exit => + { + return result; + } + BlockResult::Ok => + {} + } + } + BlockResult::Ok + } +} + +#[derive(BlockIO)] +pub struct MapResultTagged +{ + #[input] + input: In, + + #[output] + output: Out, + + map: F, +} + +impl MapResultTagged +where + F: Fn(Tagged) -> (Tagged, BlockResult), +{ + pub fn new(input: In, map: F) -> (Self, In) + { + let (output, mapped) = stream(); + (Self { input, output, map }, mapped) + } +} + +impl Block for MapResultTagged +where + F: Fn(Tagged) -> (Tagged, BlockResult), +{ + fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult + { + let writer = self.output.write(); + let reader = self.input.read(); + + for _ in 0..(writer.len().min(reader.len())) + { + let (input, tag_opt) = reader.pop().unwrap().into(); + let (tagged_out, result) = (self.map)((input, tag_opt.clone()).into()); + let (output, tag_out) = tagged_out.into(); + + let _ = writer.push((output, tag_opt.merge(&tag_out)).into()); + match result + { + BlockResult::Terminated | BlockResult::Exit => + { + return result; + } + BlockResult::Ok => + {} + } + } + BlockResult::Ok + } +} + +#[derive(BlockIO)] +#[sync_block] +pub struct Scan +where + F: Fn(&mut S, I) -> O, +{ + #[input] + input: In, + + #[output] + output: Out, + + state: S, + + map: F, +} + +impl Scan +where + F: Fn(&mut S, I) -> O, +{ + pub fn new(input: In, initial_state: S, map: F) -> (Self, In) + { + let (output, mapped) = stream(); + ( + Self { + input, + output, + state: initial_state, + map, + }, + mapped, + ) + } +} + +impl<'view, I, O, S, F> SyncBlock<'view> for Scan +where + I: 'static, + O: 'static, + S: 'view, + F: Fn(&mut S, I) -> O + 'view, +{ + fn sync_work(state: Self::StateView, input: Self::Input) -> Option + { + Some((*state.map)(state.state, input)) + } +} + #[derive(BlockIO)] pub struct Repeat { @@ -110,3 +272,28 @@ impl Block for Repeat BlockResult::Ok } } + +#[derive(BlockIO)] +#[sync_block] +pub struct NullSink +{ + #[input] + input: In, +} + +impl NullSink +{ + pub fn new(input: In) -> Self + { + Self { input } + } +} + +impl<'view, I: 'static> SyncBlock<'view> for NullSink +{ + fn sync_work(_: Self::StateView, _: Self::Input) -> Option + { + // Don't do shit ! + Some(()) + } +} diff --git a/oxydsp-dsp/src/blocks/utilities/channels.rs b/oxydsp-dsp/src/blocks/utilities/channels.rs index 369b29e..909a95f 100644 --- a/oxydsp-dsp/src/blocks/utilities/channels.rs +++ b/oxydsp-dsp/src/blocks/utilities/channels.rs @@ -49,7 +49,9 @@ impl Block for RxSource, I> { fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - if self.output.push_iter(self.input.iter().map(|x| (x, None))) + if self + .output + .push_iter(self.input.iter().map(|x| (x, None).into())) { BlockResult::Ok } diff --git a/oxydsp-dsp/src/blocks/utilities/iter.rs b/oxydsp-dsp/src/blocks/utilities/iter.rs index ccaa179..6175583 100644 --- a/oxydsp-dsp/src/blocks/utilities/iter.rs +++ b/oxydsp-dsp/src/blocks/utilities/iter.rs @@ -1,15 +1,21 @@ -use oxydsp_flowgraph::{ - BlockIO, - block::{Block, BlockResult}, - io::{In, Out, stream}, -}; +use std::iter::Peekable; + +use oxydsp_flowgraph::BlockIO; +use oxydsp_flowgraph::block::Block; +use oxydsp_flowgraph::block::BlockResult; +use oxydsp_flowgraph::block::SyncBlock; +use oxydsp_flowgraph::io::In; +use oxydsp_flowgraph::io::Out; +use oxydsp_flowgraph::io::stream; +use oxydsp_flowgraph::sync_block; +use oxydsp_flowgraph::tag::Tag; #[derive(BlockIO)] pub struct IterSource where I::Item: 'static, { - iter: I, + iter: Peekable, #[output] output: Out, @@ -23,7 +29,13 @@ where pub fn new(iter: I) -> (Self, In) { let (output, items) = stream(); - (Self { iter, output }, items) + ( + Self { + iter: iter.peekable(), + output, + }, + items, + ) } } @@ -34,13 +46,22 @@ where { fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - if self.output.push_iter((&mut self.iter).map(|x| (x, None))) + let writer = self.output.write(); + + for _ in 0..writer.len() { - BlockResult::Ok - } - else - { - BlockResult::Terminated + if let Some(element) = self.iter.next() + { + let mut tag = None; + if self.iter.peek().is_none() + { + let new_tag = Tag::default(); + new_tag.tag("itersource_finished", ()); + tag = Some(new_tag); + } + let _ = writer.push((element, tag).into()); + } } + BlockResult::Ok } } diff --git a/oxydsp-dsp/src/filtering.rs b/oxydsp-dsp/src/filtering.rs new file mode 100644 index 0000000..f4a0fff --- /dev/null +++ b/oxydsp-dsp/src/filtering.rs @@ -0,0 +1 @@ +pub mod fir; diff --git a/oxydsp-dsp/src/filtering/fir.rs b/oxydsp-dsp/src/filtering/fir.rs new file mode 100644 index 0000000..4fa51da --- /dev/null +++ b/oxydsp-dsp/src/filtering/fir.rs @@ -0,0 +1,120 @@ +use std::collections::VecDeque; +use std::f64::consts::PI; +use std::iter::Sum; +use std::ops::Div; +use std::ops::Mul; +use std::process::Output; + +use num::Complex; +use num::Float; +use num::One; +use num::Zero; +use num::complex::ComplexFloat; +use num::zero; +use rustfft::FftNum; +use rustfft::FftPlanner; + +use crate::map; +use crate::units::DigitalFrequency; + +/// Finite impulse response +pub struct Fir(pub Vec); + +impl Fir> +where + T: FftNum + Float + Clone, +{ + pub fn from_transfer_function(tf: impl AsRef<[Complex]>) -> Fir> + { + let mut planner = FftPlanner::new(); + let tf_len = tf.as_ref().len(); + let ifft = planner.plan_fft_inverse(tf.as_ref().len()); + + let mut fir = tf.as_ref().to_vec(); + ifft.process(fir.as_mut_slice()); + + let mut shifted_fir = vec![]; + for i in 0..tf_len + { + let k = (tf_len - (tf_len / 2) + i) % tf_len; + shifted_fir.push(fir[k]); + } + + Fir(shifted_fir) + } + + pub fn lowpass(cutoff: DigitalFrequency, length: usize) -> Fir> + { + let mut tf = vec![Complex::::zero(); length]; + + let cutoff_bin = map(cutoff.as_rad(), 0., 2. * PI, 0., length as f64).floor() as usize; + for i in 0..cutoff_bin + { + tf[i] = Complex::::one(); + tf[length - i - 1] = Complex::::one(); + } + + Self::from_transfer_function(tf) + } +} + +impl Fir +where + T: ComplexFloat + Div + Copy + Sum, + T::Real: Float, +{ + pub fn normalized(mut self) -> Self + { + let sum: T = self.0.iter().copied().sum(); + let len = Float::sqrt(sum.im() * sum.im() + sum.re() * sum.re()); + + self.0.iter_mut().for_each(|x| *x = *x / len); + self + } +} + +pub struct FirFilter +where + F: Mul, + O: Sum, +{ + fir: Vec, + taps: VecDeque, +} + +impl FirFilter +where + T: Clone, + F: Mul + Clone, + O: Sum, +{ + pub fn new(impulse_response: Fir) -> Self + { + let len = impulse_response.0.len(); + Self { + fir: impulse_response.0, + taps: VecDeque::with_capacity(len), + } + } + + pub fn next(&mut self, input: T) -> O + { + if self.taps.len() == self.fir.len() + { + self.taps.pop_front(); + } + self.taps.push_back(input); + + self.fir + .iter() + .zip(self.taps.iter()) + .map(|(a, b)| a.clone() * b.clone()) + .sum() + } +} + +// Completely stolen from sdrpp code +pub fn estimate_fir_length(transition_width: f64, sample_rate: f64) -> f64 +{ + 3.8 * sample_rate / transition_width +} diff --git a/oxydsp-dsp/src/lib.rs b/oxydsp-dsp/src/lib.rs index 768c266..cc324e5 100644 --- a/oxydsp-dsp/src/lib.rs +++ b/oxydsp-dsp/src/lib.rs @@ -1,6 +1,7 @@ use num::Float; pub mod blocks; +pub mod filtering; pub mod synthesis; pub mod units; diff --git a/oxydsp-dsp/src/units.rs b/oxydsp-dsp/src/units.rs index 862a985..895c081 100644 --- a/oxydsp-dsp/src/units.rs +++ b/oxydsp-dsp/src/units.rs @@ -1,4 +1,5 @@ -use std::{f64::consts::PI, ops::Neg}; +use std::f64::consts::PI; +use std::ops::Neg; use crate::map; diff --git a/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/sync.rs b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/sync.rs index abfedbd..11160d7 100644 --- a/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/sync.rs +++ b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/sync.rs @@ -8,6 +8,7 @@ use zyn::ext::TypeExt; use zyn::quote::quote; use zyn::syn::Field; use zyn::syn::GenericParam; +use zyn::syn::Lifetime; use zyn::syn::parse_quote; use crate::SyncBlockConfig; @@ -23,28 +24,41 @@ pub fn sync_block_impl(item: zyn::syn::ItemStruct, config: SyncBlockConfig) -> z // module to keep everything clean mod {{ item.ident | snake | ident: "{}_synchronous_block" }} { + use super::*; + @sync_block_view_struct(item = item.clone()) } @sync_block_syncio_impl(item = item.clone(), config = *config) - @sync_block_impl_block(item = item.clone()) + @sync_block_impl_block(item = item.clone(), tagged = config.tagged) ) } +// Block implementation for sync block #[zyn::element] fn sync_block_syncio_impl(item: zyn::syn::ItemStruct, config: SyncBlockConfig) -> zyn::TokenStream { let view_lifetime: GenericParam = parse_quote!('view); let mut view_generics = item.generics.clone(); + view_generics.params.iter_mut().for_each(|x| match x + { + GenericParam::Lifetime(_) => + {} + GenericParam::Type(type_param) => type_param + .bounds + .push(zyn::syn::TypeParamBound::Lifetime(parse_quote!('view))), + GenericParam::Const(_) => + {} + }); view_generics.params.insert(0, view_lifetime); - let (view_impl_generics, view_type_generics, _view_where_clause) = + let (view_impl_generics, view_type_generics, view_where_clause) = view_generics.split_for_impl(); - let (_impl_generics, type_generics, where_clause) = item.generics.split_for_impl(); + let (_impl_generics, type_generics, _where_clause) = item.generics.split_for_impl(); zyn::zyn!( impl {{ view_impl_generics }} oxydsp_flowgraph::block::SyncBlockIO<'view> for {{ item.ident }} {{ type_generics }} - {{ where_clause }} + {{ view_where_clause }} { // Path within module type StateView = {{ item.ident | snake | ident: "{}_synchronous_block" }}::{{ item.ident | ident:"{}View" }} {{ view_type_generics }}; @@ -54,6 +68,7 @@ fn sync_block_syncio_impl(item: zyn::syn::ItemStruct, config: SyncBlockConfig) - ) } +// Input/Output types for block fn sync_block_io_types( item: zyn::syn::ItemStruct, io: &'static str, @@ -116,15 +131,26 @@ fn sync_block_io_types( .into_token_stream() } +// View struct for sync block #[zyn::element] fn sync_block_view_struct(item: zyn::syn::ItemStruct) -> zyn::TokenStream { // Create view liftime to add to struct definition let lifetime: GenericParam = parse_quote!('view); let mut generics = item.generics.clone(); + generics.params.iter_mut().for_each(|x| match x + { + GenericParam::Lifetime(_) => + {} + GenericParam::Type(type_param) => type_param + .bounds + .push(zyn::syn::TypeParamBound::Lifetime(parse_quote!('view))), + GenericParam::Const(_) => + {} + }); generics.params.insert(0, lifetime); - let (_impl_generics, type_generics, where_clause) = generics.split_for_impl(); + let (impl_generics, _type_generics, where_clause) = generics.split_for_impl(); let fields = &item.fields.as_named().unwrap().named; let mut state_fields = vec![]; @@ -169,7 +195,8 @@ fn sync_block_view_struct(item: zyn::syn::ItemStruct) -> zyn::TokenStream .filter(|tokens| !tokens.is_empty()); zyn::zyn!( - pub struct {{ item.ident | ident:"{}View" }} {{ type_generics }} + pub struct {{ item.ident | ident:"{}View" }} {{ impl_generics }} + {{ where_clause }} { @for (field in state_fields.iter()) { @@ -185,187 +212,7 @@ fn sync_block_view_struct(item: zyn::syn::ItemStruct) -> zyn::TokenStream ) } -#[zyn::element] -fn sync_block_impl_block(item: zyn::syn::ItemStruct) -> zyn::TokenStream -{ - let item2 = item.clone(); - let (impl_generics, type_generics, where_clause) = item2.generics.split_for_impl(); - let fields = &item.fields.as_named().unwrap().named; - - // Retrieve fields - let input_fields = fields - .iter() - .filter(|f| f.attrs.iter().any(|attr| attr.is("input"))) - .cloned() - .collect::>(); - let output_fields = fields - .iter() - .filter(|f| f.attrs.iter().any(|attr| attr.is("output"))) - .cloned() - .collect::>(); - - zyn::zyn!( - impl {{ impl_generics }} oxydsp_flowgraph::block::Block for {{ item.ident }} {{ type_generics }} - {{ where_clause }} - { - fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult - { - - // Get writers from outputs - let mut max_len = usize::MAX; - @for (out_field in output_fields.iter()) - { - let {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}} = self.{{ out_field.ident }}.write(); - } - - // Compute max_len - let max_len = [ - usize::MAX, - @for (out_field in output_fields.iter()) - { - {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.len(), - } - ].iter().min().unwrap(); - - - @if (!input_fields.is_empty()) - { - @sync_block_block_impl_with_inputs(item = item.clone(), input_fields = input_fields.clone(), output_fields = output_fields.clone()) - } - @else - { - @sync_block_block_impl_without_inputs(item = item.clone(), output_fields = output_fields.clone()) - } - - oxydsp_flowgraph::block::BlockResult::Ok - } - } - ) -} - -#[zyn::element] -fn sync_block_block_impl_without_inputs( - item: zyn::syn::ItemStruct, - output_fields: Vec, -) -> zyn::TokenStream -{ - zyn::zyn!( - for _ in 0..max_len - { - // Get outputs - @if (output_fields.len() == 1) - { - let {{output_fields[0].ident.clone().unwrap() | ident:"{}_element"}} - } - @else - { - let state = {{ sync_block_make_view_struct(item.clone()) }}; - let (@for (out_field in output_fields.iter()) - { - {{out_field.ident.clone().unwrap() | ident:"{}_element"}}, - } - ) - } - = ::sync_work(state, ()).unwrap(); - - // Now the output samples must be sent to their resepective outputs - @for (out_field in output_fields.iter()) - { - {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.push( - ( - {{ out_field.ident.clone().unwrap() | ident: "{}_element"}}, None - ) - ); - } - } - ) -} - -#[zyn::element] -fn sync_block_block_impl_with_inputs( - item: zyn::syn::ItemStruct, - input_fields: Vec, - output_fields: Vec, -) -> zyn::TokenStream -{ - zyn::zyn!( - - // Iterate on inputs - ( - @for (in_field in input_fields.iter()) - { - &mut self.{{ in_field.ident }}, - } - ).pop_iter() - .take(max_len) - .for_each( - // Deconstruct foreach arguments - | - (@for (in_field in input_fields.iter()) - { - oxydsp_flowgraph::tag::Tagged({{in_field.ident.clone().unwrap() | ident:"{}_element"}}, - {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}), - }) - | - { - // Create output tag - let common_tag = oxydsp_flowgraph::tag::Tag::merge_tag_opts([ - @for (in_field in input_fields.iter()) - { - {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone(), - } - ]); - - let state = {{ sync_block_make_view_struct(item.clone()) }}; - // Compute output sample - @if (output_fields.is_empty()) - { - let _ - } - @else if (output_fields.len() == 1) - { - let oxydsp_flowgraph::tag::Tagged({{output_fields[0].ident.clone().unwrap() | ident:"{}_element"}}, {{output_fields[0].ident.clone().unwrap() | ident:"{}_tag_opt"}}) - } - @else - { - let (@for (out_field in output_fields.iter()) - { - oxydsp_flowgraph::tag::Tagged({{out_field.ident.clone().unwrap() | ident:"{}_element"}}, {{out_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}), - } - ) - } - = ::sync_work(state, - @if (input_fields.len() == 1) - { - oxydsp_flowgraph::tag::Tagged({{input_fields[0].ident.clone().unwrap() | ident:"{}_element"}}, {{input_fields[0].ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone()) - } - @else - { - (@for (in_field in input_fields.iter()) - { - oxydsp_flowgraph::tag::Tagged({{in_field.ident.clone().unwrap() | ident:"{}_element"}}, {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone()), - } - ) - } - ).unwrap(); - - // Now the output samples must be sent to their resepective outputs - @for (out_field in output_fields.iter()) - { - {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.push( - oxydsp_flowgraph::tag::Tagged( - {{ out_field.ident.clone().unwrap() | ident: "{}_element"}}, - {{ out_field.ident.clone().unwrap() | ident: "{}_tag_opt"}}.merge(common_tag), - ) - ); - } - // - - } - ); - ) -} - +// Instantiates the view struct fn sync_block_make_view_struct(item: zyn::syn::ItemStruct) -> zyn::TokenStream { let fields = &item.fields.as_named().unwrap().named; @@ -395,3 +242,282 @@ fn sync_block_make_view_struct(item: zyn::syn::ItemStruct) -> zyn::TokenStream ) .into_token_stream() } + +// Impl Block for syncio +#[zyn::element] +fn sync_block_impl_block(item: zyn::syn::ItemStruct, tagged: bool) -> zyn::TokenStream +{ + let item2 = item.clone(); + let (impl_generics, type_generics, where_clause) = item2.generics.split_for_impl(); + let fields = &item.fields.as_named().unwrap().named; + + // Retrieve fields + let input_fields = fields + .iter() + .filter(|f| f.attrs.iter().any(|attr| attr.is("input"))) + .cloned() + .collect::>(); + let output_fields = fields + .iter() + .filter(|f| f.attrs.iter().any(|attr| attr.is("output"))) + .cloned() + .collect::>(); + + zyn::zyn!( + impl {{ impl_generics }} oxydsp_flowgraph::block::Block for {{ item.ident }} {{ type_generics }} + {{ where_clause }} + { + fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult + { + use oxydsp_flowgraph::tag::TagMergable; + // Get writers from outputs + let mut max_len = usize::MAX; + @for (out_field in output_fields.iter()) + { + let {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}} = self.{{ out_field.ident }}.write(); + } + + // Compute max_len + let max_len = *([ + usize::MAX, + @for (out_field in output_fields.iter()) + { + {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.len(), + } + ].iter().min().unwrap()); + + + @if (!input_fields.is_empty()) + { + @sync_block_block_impl_with_inputs(item = item.clone(), input_fields = input_fields.clone(), output_fields = output_fields.clone(), tagged = *tagged) + } + @else + { + @sync_block_block_impl_without_inputs(item = item.clone(), output_fields = output_fields.clone(), tagged = *tagged) + } + + oxydsp_flowgraph::block::BlockResult::Ok + } + } + ) +} + +// Impl Block for syncio (no inputs) +#[zyn::element] +fn sync_block_block_impl_without_inputs( + item: zyn::syn::ItemStruct, + output_fields: Vec, + tagged: bool, +) -> zyn::TokenStream +{ + zyn::zyn!( + for _ in 0..max_len + { + // Get outputs + let state = {{ sync_block_make_view_struct(item.clone()) }}; + let @sync_block_output_descons(output_fields = output_fields.clone(), tagged = *tagged) = + ::sync_work(state, ()).unwrap(); + + // Now the output samples must be sent to their resepective outputs + @for (out_field in output_fields.iter()) + { + @if (*tagged) + { + let _ = {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.push( + oxydsp_flowgraph::tag::Tagged( + {{ out_field.ident.clone().unwrap() | ident: "{}_element"}}, + {{ out_field.ident.clone().unwrap() | ident: "{}_tag_opt"}}, + ) + ); + } @else + { + let _ = {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.push( + oxydsp_flowgraph::tag::Tagged( + {{ out_field.ident.clone().unwrap() | ident: "{}_element"}}, + None, + ) + ); + } + } + } + ) +} + +// Impl Block for syncio (with inputs) +#[zyn::element] +fn sync_block_block_impl_with_inputs( + item: zyn::syn::ItemStruct, + input_fields: Vec, + output_fields: Vec, + tagged: bool, +) -> zyn::TokenStream +{ + zyn::zyn!( + + use oxydsp_flowgraph::io::PopIterable; + // Iterate on inputs + ( + @for (in_field in input_fields.iter()) + { + &mut self.{{ in_field.ident }}, + } + ).pop_iter() + .take(max_len) + .for_each( + // Deconstruct foreach arguments + | + (@for (in_field in input_fields.iter()) + { + oxydsp_flowgraph::tag::Tagged({{in_field.ident.clone().unwrap() | ident:"{}_element"}}, + {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}), + }) + | + { + // Create output tag + let common_tag = oxydsp_flowgraph::tag::Tag::merge_tag_opts([ + @for (in_field in input_fields.iter()) + { + {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone(), + } + ]); + + let state = {{ sync_block_make_view_struct(item.clone()) }}; + // Compute output sample + let @sync_block_output_descons(output_fields = output_fields.clone(), tagged = *tagged) + = ::sync_work(state, @sync_block_input_cons(input_fields = input_fields.clone(), tagged = *tagged)).unwrap(); + + // Now the output samples must be sent to their resepective outputs + @for (out_field in output_fields.iter()) + { + @if (*tagged) + { + let _ = {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.push( + oxydsp_flowgraph::tag::Tagged( + {{ out_field.ident.clone().unwrap() | ident: "{}_element"}}, + {{ out_field.ident.clone().unwrap() | ident: "{}_tag_opt"}}.merge(&common_tag), + ) + ); + } @else + { + let _ = {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.push( + oxydsp_flowgraph::tag::Tagged( + {{ out_field.ident.clone().unwrap() | ident: "{}_element"}}, + common_tag, + ) + ); + } + } + // + + } + ); + ) +} + +#[zyn::element] +fn sync_block_output_descons(output_fields: Vec, tagged: bool) -> zyn::TokenStream +{ + #[allow(clippy::collapsible_else_if)] + if *tagged + { + // If tagged : deconstruct tags + if output_fields.is_empty() + { + zyn::zyn!(_) + } + else if output_fields.len() == 1 + { + zyn::zyn!( + oxydsp_flowgraph::tag::Tagged({{output_fields[0].ident.clone().unwrap() | ident:"{}_element"}}, {{output_fields[0].ident.clone().unwrap() | ident:"{}_tag_opt"}}) + ) + } + else + { + zyn::zyn!( + (@for (in_field in output_fields.iter()) + { + oxydsp_flowgraph::tag::Tagged({{in_field.ident.clone().unwrap() | ident:"{}_element"}}, {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone()), + } + ) + ) + } + } + else + { + // Otherwise just get the output element + if output_fields.is_empty() + { + zyn::zyn!(_) + } + else if output_fields.len() == 1 + { + zyn::zyn!( + {{output_fields[0].ident.clone().unwrap() | ident:"{}_element"}} + ) + } + else + { + zyn::zyn!( + (@for (in_field in output_fields.iter()) + { + {{in_field.ident.clone().unwrap() | ident:"{}_element"}}, + } + ) + ) + } + } +} + +#[zyn::element] +fn sync_block_input_cons(input_fields: Vec, tagged: bool) -> zyn::TokenStream +{ + #[allow(clippy::collapsible_else_if)] + if *tagged + { + // If tagged : deconstruct tags + if input_fields.is_empty() + { + zyn::zyn!(()) + } + else if input_fields.len() == 1 + { + zyn::zyn!( + oxydsp_flowgraph::tag::Tagged({{input_fields[0].ident.clone().unwrap() | ident:"{}_element"}}, {{input_fields[0].ident.clone().unwrap() | ident:"{}_tag_opt"}}) + ) + } + else + { + zyn::zyn!( + (@for (in_field in input_fields.iter()) + { + oxydsp_flowgraph::tag::Tagged({{in_field.ident.clone().unwrap() | ident:"{}_element"}}, {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone()), + } + ) + ) + } + } + else + { + // Otherwise just get the output element + if input_fields.is_empty() + { + zyn::zyn!(_) + } + else if input_fields.len() == 1 + { + zyn::zyn!( + {{input_fields[0].ident.clone().unwrap() | ident:"{}_element"}} + ) + } + else + { + zyn::zyn!( + (@for (in_field in input_fields.iter()) + { + {{in_field.ident.clone().unwrap() | ident:"{}_element"}}, + } + ) + ) + } + } +} diff --git a/oxydsp-flowgraph/src/block.rs b/oxydsp-flowgraph/src/block.rs index 612b589..bc65279 100644 --- a/oxydsp-flowgraph/src/block.rs +++ b/oxydsp-flowgraph/src/block.rs @@ -9,8 +9,10 @@ pub enum BlockResult // Signifies that the block finished its work // Running it again would be useless - // This triggers the graph shutdown Terminated, + + // Kill graph + Exit, } pub trait BlockIO diff --git a/oxydsp-flowgraph/src/event.rs b/oxydsp-flowgraph/src/event.rs new file mode 100644 index 0000000..cc0ce53 --- /dev/null +++ b/oxydsp-flowgraph/src/event.rs @@ -0,0 +1,5 @@ +// Represents a FlowGrahWide, simultaneous event +pub enum FlowGraphEvent +{ + Kill(String), +} diff --git a/oxydsp-flowgraph/src/graph.rs b/oxydsp-flowgraph/src/graph.rs index 20f6e1b..f40429f 100644 --- a/oxydsp-flowgraph/src/graph.rs +++ b/oxydsp-flowgraph/src/graph.rs @@ -49,20 +49,15 @@ impl FlowGraph crate::block::BlockResult::Ok => {} crate::block::BlockResult::Terminated => + { //break 'outer; + } + crate::block::BlockResult::Exit => { break 'outer; } } } } - - for _ in 0..10_000 - { - for x in self.blocks.iter_mut() - { - x.work(); - } - } }) } diff --git a/oxydsp-flowgraph/src/io.rs b/oxydsp-flowgraph/src/io.rs index 3178470..9f7079b 100644 --- a/oxydsp-flowgraph/src/io.rs +++ b/oxydsp-flowgraph/src/io.rs @@ -131,7 +131,7 @@ impl Out } } - pub fn push_iter)>>(&mut self, mut iter: I) -> bool + pub fn push_iter>>(&mut self, mut iter: I) -> bool { let writer = self.write(); let len = writer.len(); @@ -140,17 +140,7 @@ impl Out { if let Some(elt) = iter.next() { - match elt.1 - { - Some(tag) => - { - let _ = writer.push(Tagged(elt.0, Some(tag))); - } - None => - { - let _ = writer.push_no_tag(elt.0); - } - } + let _ = writer.push(elt); } else { @@ -220,7 +210,13 @@ impl OutWriter<'_, T> pub fn push(&self, data: Tagged) -> Result<(), Tagged> { - let (data, tag) = data.into(); + let (data, mut tag) = data.into(); + let position = self.data_writer.next_index(); + if let Some(tag) = &mut tag + { + tag.position = position + } + match self.data_writer.push(data) { Ok(_) if tag.is_some() => diff --git a/oxydsp-flowgraph/src/lib.rs b/oxydsp-flowgraph/src/lib.rs index 27c2a23..f80f847 100644 --- a/oxydsp-flowgraph/src/lib.rs +++ b/oxydsp-flowgraph/src/lib.rs @@ -3,8 +3,10 @@ pub mod block; pub mod edge; +pub mod event; pub mod graph; pub mod io; pub mod stream; pub mod tag; -pub use oxydsp_flowgraph_macros::{BlockIO, sync_block}; +pub use oxydsp_flowgraph_macros::BlockIO; +pub use oxydsp_flowgraph_macros::sync_block; diff --git a/oxydsp-flowgraph/src/tag.rs b/oxydsp-flowgraph/src/tag.rs index 2a667c9..a334b7e 100644 --- a/oxydsp-flowgraph/src/tag.rs +++ b/oxydsp-flowgraph/src/tag.rs @@ -22,6 +22,14 @@ pub struct Tag impl Tag { + pub fn new() -> Self + { + Self { + position: 0, + data: Default::default(), + } + } + pub fn merge_tag_opts(tag_opts: [Option; N]) -> Option { let mut out_tag = None; @@ -31,6 +39,27 @@ impl Tag } out_tag } + + pub fn tag(&self, key: impl AsRef, value: T) + { + self.data + .lock() + .unwrap() + .insert(key.as_ref().to_owned(), Arc::new(value)); + } + + pub fn retrieve(&self, key: impl AsRef) -> Option> + { + self.data.lock().unwrap().get(key.as_ref()).cloned() + } +} + +impl Default for Tag +{ + fn default() -> Self + { + Self::new() + } } pub trait TagValue: Clone {} @@ -93,6 +122,10 @@ impl Tagged { pub fn new(inner: T, tag: Option) -> Self { + if tag.is_none() + { + //println!("data has no tag"); + } Self(inner, tag) } @@ -156,11 +189,11 @@ impl From<(T, Option)> for Tagged } } -impl Into<(T, Option)> for Tagged +impl From> for (T, Option) { - fn into(self) -> (T, Option) + fn from(val: Tagged) -> Self { - (self.0, self.1) + (val.0, val.1) } }