diff --git a/examples/qpsk-modem/mod.wav b/examples/qpsk-modem/mod.wav new file mode 100644 index 0000000..e3e8055 Binary files /dev/null and b/examples/qpsk-modem/mod.wav differ diff --git a/examples/qpsk-modem/mod_rrc.wav b/examples/qpsk-modem/mod_rrc.wav new file mode 100644 index 0000000..aeb34e7 Binary files /dev/null and b/examples/qpsk-modem/mod_rrc.wav differ diff --git a/examples/qpsk-modem/src/main.rs b/examples/qpsk-modem/src/main.rs index 86521f0..6ef7831 100644 --- a/examples/qpsk-modem/src/main.rs +++ b/examples/qpsk-modem/src/main.rs @@ -1,61 +1,351 @@ -use std::time::Instant; +use std::cell::RefCell; +use std::os::unix::thread; +use std::time::Duration; +use cpal::traits::DeviceTrait; +use cpal::traits::HostTrait; +use cpal::traits::StreamTrait; +use egui::Color32; +use egui_plot::Line; +use egui_plot::PlotPoints; +use egui_plot::Points; use num::Complex; +use num::complex::ComplexFloat; +use num::traits::sign; use oxydsp_dsp::blocks::filtering::fir::FirFilter; use oxydsp_dsp::blocks::filtering::pulse_shaping::PulseShaper; use oxydsp_dsp::blocks::iq::zero_if::ZeroIf; use oxydsp_dsp::blocks::math::basic::Multiplier; use oxydsp_dsp::blocks::synthesis::OscillatorSource; -use oxydsp_dsp::blocks::utilities::adapters::{Map, NullSink, Scan}; +use oxydsp_dsp::blocks::utilities::adapters::Map; +use oxydsp_dsp::blocks::utilities::adapters::NullSink; +use oxydsp_dsp::blocks::utilities::adapters::Scan; +use oxydsp_dsp::blocks::utilities::channels::RxSource; +use oxydsp_dsp::blocks::utilities::channels::TxSink; +use oxydsp_dsp::blocks::utilities::graph_control::GraphKiller; use oxydsp_dsp::blocks::utilities::iter::IterSource; use oxydsp_dsp::filtering::fir::Fir; +use oxydsp_dsp::filtering::history_buf::HistoryBuf; use oxydsp_dsp::units::DigitalFrequency; +use oxydsp_flowgraph::BlockIO; +use oxydsp_flowgraph::block::Block; use oxydsp_flowgraph::flowgraph; -use rand::{RngExt, SeedableRng, random}; +use oxydsp_flowgraph::io::In; +use oxydsp_flowgraph::io::Out; +use oxydsp_flowgraph::stream; +use oxydsp_flowgraph::tag::Tags; +use rand::random; const CARRRIER_FREQ: f64 = 1000.; const SAMPLE_RATE: usize = 48_000; +const SAMPLE_PER_SYMBOL: usize = 100; -fn main() +#[derive(BlockIO)] +pub struct CostasLoop { - let bits = (0..1024).map(|_| [random::(), random::()]); + #[input] + input: In, - let (iter_source, bits) = IterSource::new(bits.cycle()); - let (iq_map, iq) = Map::new(bits, |x| match x + #[output] + output: Out>, + + center_freq: DigitalFrequency, + nco: oxydsp_dsp::synthesis::oscillator::Nco, + + loop_filter: oxydsp_dsp::filtering::fir::FirFilter, + low_pass: oxydsp_dsp::filtering::fir::FirFilter, Complex, Complex>, +} + +impl CostasLoop +{ + pub fn new( + input: In, + start_frequency: DigitalFrequency, + loop_filter: Fir, + ) -> (Self, In>) + { + let (output, iq) = oxydsp_flowgraph::io::stream(); + ( + CostasLoop { + input, + output, + center_freq: start_frequency, + nco: start_frequency.into(), + loop_filter: oxydsp_dsp::filtering::fir::FirFilter::new(loop_filter), + low_pass: oxydsp_dsp::filtering::fir::FirFilter::new( + Fir::lowpass(start_frequency, 100).normalized_len(), + ), + }, + iq, + ) + } +} + +impl Block for CostasLoop +{ + fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult + { + self.output.push_iter(self.input.iter().map(|x| { + let (signal, tag) = x.into(); + let lo = self.nco.next().unwrap(); + let iq = self + .low_pass + .next(Complex::new(lo.re * signal, lo.im * signal)); + + let error = iq.re * iq.im.signum() - iq.im * iq.re.signum(); + let correction = self.loop_filter.next(error); + self.nco.set_frequency(DigitalFrequency::from_rad( + self.center_freq.as_rad() + correction as f64, + )); + + (iq, tag).into() + })); + oxydsp_flowgraph::block::BlockResult::Ok + } +} + +pub fn main() +{ + demodulator(); + //modulator(); +} + +pub fn modulator() +{ + let bit_source = (0..8192).map(|_| [random::(), random::()]); + + let mut tags = Tags::default(); + + let (mut iter_source, bits) = IterSource::new(bit_source); + let kill_tag = tags.allocate_tag("Kill tag"); + iter_source.tag_last_with(kill_tag.clone()); + + let (to_iq, iq) = Map::new(bits, |x| match x { [true, true] => Complex::new(1., 1.), [true, false] => Complex::new(1., -1.), [false, true] => Complex::new(-1., 1.), [false, false] => Complex::new(-1., -1.), }); - let (pulse_shaper, iq) = PulseShaper::new(iq, Fir::square(200), 200); - let (lo, carrier) = OscillatorSource::new(DigitalFrequency::from_time_frequency(CARRRIER_FREQ, SAMPLE_RATE as f64).into()); + let (pulse_shaper, iq) = PulseShaper::new( + iq, + Fir::root_raised_cosine(4 * SAMPLE_PER_SYMBOL, 0.5, SAMPLE_PER_SYMBOL), + SAMPLE_PER_SYMBOL, + ); + let (carrier_oscillator, carrier) = OscillatorSource::new( + DigitalFrequency::from_time_frequency(CARRRIER_FREQ, SAMPLE_RATE as f64).into(), + ); let (mixer, passband) = Multiplier::new(iq, carrier); - let (channel, passband) = Scan::new(passband, rand::rngs::SmallRng::seed_from_u64(0), |state, x| + let (tx, rx) = std::sync::mpsc::channel::>(); + let (graph_killer, passband) = GraphKiller::new(passband, kill_tag); + let tx_sink = TxSink::new(passband, tx); + + let graph = flowgraph![ + iter_source, + to_iq, + pulse_shaper, + carrier_oscillator, + mixer, + graph_killer, + tx_sink + ]; + graph.run(1); + + let spec = hound::WavSpec { + channels: 1, + sample_rate: SAMPLE_RATE as u32, + bits_per_sample: 16, + sample_format: hound::SampleFormat::Int, + }; + let mut writer = hound::WavWriter::create("mod.wav", spec).unwrap(); + for iq in rx.iter() { - x.re + state.sample::(rand_distr::StandardNormal) - }); + let amplitude = 0.5 * i16::MAX as f32; + writer.write_sample((iq.re * amplitude) as i16).unwrap(); + } + writer.finalize().unwrap(); +} - let (zero_if, iq) = ZeroIf::new(passband, DigitalFrequency::from_time_frequency(CARRRIER_FREQ, SAMPLE_RATE as f64).into()); - let (matched_filter, iq) = FirFilter::new(iq, Fir::::square(200)); - let (inspect, iq) = Scan::new(iq, (Instant::now(), 0), |(last, counter), x| +pub fn demodulator() +{ + let (audio_tx, audio_rx) = std::sync::mpsc::channel(); + let (rx_source, signal) = RxSource::new(audio_rx); + + let (downconverter, iq) = CostasLoop::new( + signal, + DigitalFrequency::from_time_frequency(CARRRIER_FREQ, SAMPLE_RATE as f64), + Fir::proportional_integral(100, 0.000, 0.0000), + ); + + //let agc_filter = Fir::proportional_integral(100, 0.1, 0.001); + // let (agc, iq) = Scan::new(iq, 1., |gain, iq| + // { + // let mu = 0.1; + // let mag = iq.abs(); + // *gain += mu * (1. - mag * *gain); + // + // iq * *gain + // }); + + let (matched_filter, iq) = FirFilter::new( + iq, + Fir::::root_raised_cosine(4 * SAMPLE_PER_SYMBOL, 0.5, SAMPLE_PER_SYMBOL) + .normalized_sqr(), + ); + + let (eye_i_tx, eye_i_rx) = std::sync::mpsc::channel::>(); + let (eye_q_tx, eye_q_rx) = std::sync::mpsc::channel::>(); + let (constellation_tx, constellation_rx) = std::sync::mpsc::channel(); + + // let (debug, iq) = Scan::new( + // iq, + // ( + // HistoryBuf::new(0., SAMPLE_PER_SYMBOL * 2), + // HistoryBuf::new(0., SAMPLE_PER_SYMBOL * 2), + // 0usize, + // ), + // move |(buf_i, buf_q, counter), x| { + // buf_i.push(x.re); + // buf_q.push(x.im); + // let _ = constellation_tx.send(x); + // *counter += 1; + // if *counter >= SAMPLE_PER_SYMBOL * 2 + // { + // let _ = eye_i_tx.send(buf_i.as_slice().iter().copied().collect::>()); + // let _ = eye_q_tx.send(buf_q.as_slice().iter().copied().collect::>()); + // *counter = 0; + // } + // x + // }, + // ); + let tx_sink = TxSink::new(iq, constellation_tx); + + let host = cpal::default_host(); + let device = host + .default_input_device() + .expect("no output device available"); + let mut supported_configs_range = device + .supported_input_configs() + .expect("error while querying configs"); + let supported_config = supported_configs_range + .next() + .expect("no supported config?!") + .with_sample_rate(SAMPLE_RATE as u32); + let _stream = device + .build_input_stream( + &supported_config.into(), + move |data: &[f32], _: &cpal::InputCallbackInfo| { + for x in data + { + //let _ = audio_tx.send(*x * 10.); + let _ = audio_tx.send(random::()); + //let _ = audio_tx.send(Complex::new(random::(), random::())); + } + }, + move |_err| {}, + None, // None=blocking, Some(Duration)=timeout + ) + .unwrap(); + + let graph = flowgraph![ + rx_source, + downconverter, + // agc, + matched_filter, + //debug, + //null_sink + tx_sink + ]; + graph.run(6); + + let mut eye_i_history = HistoryBuf::new(vec![], 200); + let mut eye_q_history = HistoryBuf::new(vec![], 200); + let mut constellation = HistoryBuf::new(Complex::new(0., 0.), 5000); + eframe::run_simple_native("Window", Default::default(), move |ctx, _frame| { + for eye in eye_i_rx.try_iter().take(200) { - *counter += 1; - if *counter >= 1_000_000 - { - let time = Instant::now() - *last; - println!("{:.2} Ms/s", 1. / time.as_secs_f32()); - *last = Instant::now(); - *counter = 0; - } - x - }); - let null_sink = NullSink::new(iq); + eye_i_history.push(eye); + } + for eye in eye_q_rx.try_iter().take(200) + { + eye_q_history.push(eye); + } + for point in constellation_rx.try_iter().take(5000) + { + constellation.push(point); + } - let graph = flowgraph![iter_source, iq_map, pulse_shaper, lo, mixer, channel, zero_if, matched_filter, inspect, null_sink]; - graph.run(6).join(); + egui::CentralPanel::default().show(ctx, |ui| { + egui_plot::Plot::new("plot") + .data_aspect(1.) + .show(ui, |plot_ui| { + plot_ui.points( + Points::new( + "Constellation", + constellation + .as_slice() + .iter() + .map(|point| [point.re as f64, point.im as f64]) + .collect::(), + ) + .color(Color32::YELLOW.gamma_multiply_u8(70)), + ); + + for (eye_i, eye_q) in eye_i_history + .as_slice() + .iter() + .zip(eye_q_history.as_slice().iter()) + { + plot_ui.line( + Line::new( + "In-Phase", + eye_i + .iter() + .enumerate() + .map(|(i, x)| { + [i as f64 / (SAMPLE_PER_SYMBOL as f64 * 2.) + 1., *x as f64] + }) + .collect::>(), + ) + .color(Color32::RED), + ); + + plot_ui.line( + Line::new( + "Quadrature", + eye_q + .iter() + .enumerate() + .map(|(i, x)| { + [*x as f64, i as f64 / (SAMPLE_PER_SYMBOL as f64 * 2.) - 2.] + }) + .collect::>(), + ) + .color(Color32::GREEN), + ); + + plot_ui.points( + Points::new( + "Constellation", + eye_i + .iter() + .zip(eye_q.iter()) + .skip(SAMPLE_PER_SYMBOL / 2) + .step_by(SAMPLE_PER_SYMBOL) + .map(|(i, q)| [*i as f64, *q as f64]) + .collect::(), + ) + .color(Color32::GREEN) + .radius(1.5), + ); + } + }); + }); + ctx.request_repaint(); + }) + .unwrap(); } pub fn to_bits(n: u8) -> [bool; 8] diff --git a/oxydsp-dsp/src/blocks/filtering/fir.rs b/oxydsp-dsp/src/blocks/filtering/fir.rs index a6283a6..48d7452 100644 --- a/oxydsp-dsp/src/blocks/filtering/fir.rs +++ b/oxydsp-dsp/src/blocks/filtering/fir.rs @@ -2,11 +2,8 @@ use num::Zero; use oxydsp_flowgraph::BlockIO; use oxydsp_flowgraph::block::Block; use oxydsp_flowgraph::block::BlockResult; -use oxydsp_flowgraph::block::SyncBlock; use oxydsp_flowgraph::io::In; use oxydsp_flowgraph::io::Out; -use oxydsp_flowgraph::io::PopIterable; -use oxydsp_flowgraph::sync_block; use std::iter::Sum; use std::ops::Add; use std::ops::Mul; @@ -57,7 +54,7 @@ where { fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - self.output.push_iter(self.input.pop_iter().map(|x| (self.filter.next(x.0), x.1).into())); + self.output.push_iter(self.input.iter().map(|x| (self.filter.next(x.0), x.1).into())); BlockResult::Ok } } diff --git a/oxydsp-dsp/src/blocks/filtering/pulse_shaping.rs b/oxydsp-dsp/src/blocks/filtering/pulse_shaping.rs index 7cc503f..488d4d7 100644 --- a/oxydsp-dsp/src/blocks/filtering/pulse_shaping.rs +++ b/oxydsp-dsp/src/blocks/filtering/pulse_shaping.rs @@ -47,14 +47,14 @@ impl + std::iter::Sum + std::clone::Clone { fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - let reader = self.input.read(); - let writer = self.output.write(); + let mut reader = self.input.iter(); + let mut writer = self.output.write_push(); for _ in 0..writer.len() { if self.remaining == 0 { - if let Some(input) = reader.pop() + if let Some(input) = reader.next() { let (data, tag) = input.into(); let _ = writer.push((self.pulse_shaper.next(data), tag).into()); diff --git a/oxydsp-dsp/src/blocks/iq/zero_if.rs b/oxydsp-dsp/src/blocks/iq/zero_if.rs index 8a51eb0..0178591 100644 --- a/oxydsp-dsp/src/blocks/iq/zero_if.rs +++ b/oxydsp-dsp/src/blocks/iq/zero_if.rs @@ -1,12 +1,11 @@ +use std::fmt::Debug; + use num::Complex; use num::Float; use oxydsp_flowgraph::BlockIO; use oxydsp_flowgraph::block::Block; -use oxydsp_flowgraph::block::SyncBlock; use oxydsp_flowgraph::io::In; use oxydsp_flowgraph::io::Out; -use oxydsp_flowgraph::io::PopIterable; -use oxydsp_flowgraph::sync_block; use rustfft::FftNum; use crate::filtering::fir::Fir; @@ -62,7 +61,7 @@ where { fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - self.output.push_iter(self.input.pop_iter().map(|input| { + self.output.push_iter(self.input.iter().map(|input| { let (data, tag) = input.into(); // Mix let lo_sample = self.local_oscillator.next().unwrap(); diff --git a/oxydsp-dsp/src/blocks/math/basic.rs b/oxydsp-dsp/src/blocks/math/basic.rs index 8913675..f37c62e 100644 --- a/oxydsp-dsp/src/blocks/math/basic.rs +++ b/oxydsp-dsp/src/blocks/math/basic.rs @@ -4,15 +4,11 @@ use std::ops::Mul; use oxydsp_flowgraph::BlockIO; use oxydsp_flowgraph::block::Block; use oxydsp_flowgraph::block::BlockResult; -use oxydsp_flowgraph::block::SyncBlock; use oxydsp_flowgraph::io::In; use oxydsp_flowgraph::io::Out; -use oxydsp_flowgraph::io::PopIterable; -use oxydsp_flowgraph::sync_block; -use oxydsp_flowgraph::tag::TagMergable; +use oxydsp_flowgraph::tag::Tag; #[derive(BlockIO)] -#[sync_block] pub struct Adder where Ia: Add + 'static, @@ -49,35 +45,26 @@ where } } -impl<'view, Ia, Ib, O> SyncBlock<'view> for Adder +impl Block for Adder where Ia: Add + 'static, Ib: 'static, O: 'static, { - fn sync_work(_state: Self::StateView, input: Self::Input) -> Option + fn work(&mut self) -> BlockResult { - Some(input.0 + input.1) + self.output.push_iter( + self.input_a + .iter() + .zip(self.input_b.iter()) + .map(|(x, y)| { + (x.0 + y.0, Tag::from_tag_opts(&[&x.1, &y.1])).into() + }) + ); + BlockResult::Ok } } -// impl Block for Adder -// where -// Ia: Add + 'static, -// Ib: 'static, -// O: 'static, -// { -// fn work(&mut self) -> BlockResult -// { -// self.output.push_iter( -// (&mut self.input_a, &mut self.input_b) -// .pop_iter() -// .map(|(a, b)| (a.0 + b.0, a.1.merge(&b.1))), -// ); -// BlockResult::Ok -// } -// } - #[derive(BlockIO)] pub struct Multiplier where @@ -124,9 +111,12 @@ where fn work(&mut self) -> BlockResult { self.output.push_iter( - (&mut self.input_a, &mut self.input_b) - .pop_iter() - .map(|(a, b)| (a.0 * b.0, a.1.merge(&b.1)).into()), + self.input_a + .iter() + .zip(self.input_b.iter()) + .map(|(x, y)| { + (x.0 * y.0, Tag::from_tag_opts(&[&x.1, &y.1])).into() + }) ); BlockResult::Ok } diff --git a/oxydsp-dsp/src/blocks/synthesis.rs b/oxydsp-dsp/src/blocks/synthesis.rs index 5a7e812..d923dd4 100644 --- a/oxydsp-dsp/src/blocks/synthesis.rs +++ b/oxydsp-dsp/src/blocks/synthesis.rs @@ -6,7 +6,6 @@ use oxydsp_flowgraph::block::Block; use oxydsp_flowgraph::block::BlockResult; use oxydsp_flowgraph::io::In; use oxydsp_flowgraph::io::Out; -use oxydsp_flowgraph::io::PopIterable; use oxydsp_flowgraph::io::stream; #[derive(BlockIO)] @@ -81,7 +80,7 @@ impl + 'static> Block for Nco fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { self.output - .push_iter(&mut self.frequency.pop_iter().map(|f| { + .push_iter(&mut self.frequency.iter().map(|f| { self.nco.set_frequency(f.0); (self.nco.next().unwrap(), f.1).into() })); diff --git a/oxydsp-dsp/src/blocks/ted/early_late.rs b/oxydsp-dsp/src/blocks/ted/early_late.rs index 59c3007..5f646e0 100644 --- a/oxydsp-dsp/src/blocks/ted/early_late.rs +++ b/oxydsp-dsp/src/blocks/ted/early_late.rs @@ -1,21 +1,19 @@ -use std::collections::VecDeque; use std::iter::Sum; use num::Float; use num::NumCast; use oxydsp_flowgraph::BlockIO; -use oxydsp_flowgraph::block::SyncBlock; +use oxydsp_flowgraph::block::Block; use oxydsp_flowgraph::io::In; use oxydsp_flowgraph::io::Out; -use oxydsp_flowgraph::sync_block; use oxydsp_flowgraph::tag::Tag; use oxydsp_flowgraph::tag::TagKey; use crate::filtering::fir::Fir; use crate::filtering::fir::FirFilter; +use crate::filtering::history_buf::HistoryBuf; #[derive(BlockIO)] -#[sync_block(tagged)] pub struct EarlyLateGate { #[input] @@ -27,7 +25,7 @@ pub struct EarlyLateGate, + window: HistoryBuf, // The current location of the window, in relation to the last sample window_location: usize, @@ -44,7 +42,7 @@ pub struct EarlyLateGate EarlyLateGate where - T: Float + Sum + Clone + 'static + Send + Sync + NumCast, + T: Float + Sum + Clone + 'static + Send + Sync + NumCast + Default, { pub fn new( input: In, @@ -58,7 +56,7 @@ where Self { input, output, - window: VecDeque::with_capacity(symbol_length), + window: HistoryBuf::new(Default::default(), symbol_length), symbol_length, window_location: 0, window_center: symbol_length / 2, @@ -72,48 +70,44 @@ where } } -impl<'view, T> SyncBlock<'view> for EarlyLateGate +impl<'view, T> Block for EarlyLateGate where T: Float + Sum + Clone + 'static + Send + Sync + NumCast, { - fn sync_work(state: Self::StateView, input: Self::Input) -> Option + fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - if state.window.len() < *state.symbol_length - { - state.window.push_back(input.0); - *state.window_location += 1; - return Some(input.0.into()); - } + self.output.push_iter(self.input.iter().map(|input| { + // Bring new sample in + self.window.push(input.0); + self.window_location += 1; - // Bring new sample in - state.window.pop_front(); - state.window.push_back(input.0); - *state.window_location += 1; + let sample = self.window.as_slice()[self.window_center]; + let mut tag = None; + if self.window_location >= self.next_sample as usize + { + let early_index = self.window_center - (0.25 * self.symbol_length as f32) as usize; + let late_index = self.window_center + (0.25 * self.symbol_length as f32) as usize; - let sample = state.window[*state.window_center]; - let mut tag = None; - if *state.window_location >= *state.next_sample as usize - { - let early_index = *state.window_center - (0.25 * *state.symbol_length as f32) as usize; - let late_index = *state.window_center + (0.25 * *state.symbol_length as f32) as usize; + let early_sample = self.window.as_slice()[early_index]; + let late_sample = self.window.as_slice()[late_index]; - let early_sample = state.window[early_index]; - let late_sample = state.window[late_index]; + let error = (late_sample - early_sample) * sample; + let correction = self.loop_filter.next(error); - let error = (late_sample - early_sample) * sample; - let correction = state.loop_filter.next(error); + // Figure out next sample location + self.next_sample += + (self.symbol_length as f32 + correction.to_f32().unwrap()).max(0.); - // Figure out next sample location - *state.next_sample += - (*state.symbol_length as f32 + correction.to_f32().unwrap()).max(0.); + // Turn everything back relative to current sample + self.next_sample -= self.window_location as f32; + self.window_location = 0; - // Turn everything back relative to current sample - *state.next_sample -= *state.window_location as f32; - *state.window_location = 0; + tag = Some(Tag::with_entry(self.symbol_tag_key.clone(), error)); + } - tag = Some(Tag::with_entry(state.symbol_tag_key.clone(), error)); - } + (sample, tag).into() + })); - Some((sample, tag).into()) + oxydsp_flowgraph::block::BlockResult::Ok } } diff --git a/oxydsp-dsp/src/blocks/utilities.rs b/oxydsp-dsp/src/blocks/utilities.rs index f346df0..6d4075c 100644 --- a/oxydsp-dsp/src/blocks/utilities.rs +++ b/oxydsp-dsp/src/blocks/utilities.rs @@ -2,3 +2,4 @@ pub mod adapters; pub mod channels; pub mod iter; pub mod squelch; +pub mod graph_control; diff --git a/oxydsp-dsp/src/blocks/utilities/adapters.rs b/oxydsp-dsp/src/blocks/utilities/adapters.rs index cc8995e..6ec408d 100644 --- a/oxydsp-dsp/src/blocks/utilities/adapters.rs +++ b/oxydsp-dsp/src/blocks/utilities/adapters.rs @@ -3,12 +3,9 @@ use std::iter::FusedIterator; use oxydsp_flowgraph::BlockIO; use oxydsp_flowgraph::block::Block; use oxydsp_flowgraph::block::BlockResult; -use oxydsp_flowgraph::block::SyncBlock; use oxydsp_flowgraph::io::In; use oxydsp_flowgraph::io::Out; -use oxydsp_flowgraph::io::PopIterable; use oxydsp_flowgraph::io::stream; -use oxydsp_flowgraph::sync_block; use oxydsp_flowgraph::tag::Tag; use oxydsp_flowgraph::tag::TagMergable; use oxydsp_flowgraph::tag::Tagged; @@ -44,7 +41,7 @@ where { self.output.push_iter( self.input - .pop_iter() + .iter() .map(|x| ((&self.map)(x.0), x.1).into()), ); BlockResult::Ok @@ -80,12 +77,12 @@ where { fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - let writer = self.output.write(); - let reader = self.input.read(); + let mut writer = self.output.write_push(); + let mut reader = self.input.iter(); for _ in 0..(writer.len().min(reader.len())) { - let (input, tag_opt) = reader.pop().unwrap().into(); + let (input, tag_opt) = reader.next().unwrap().into(); let (output, result) = (self.map)(input); let _ = writer.push((output, tag_opt).into()); match result @@ -131,12 +128,12 @@ where { fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - let writer = self.output.write(); - let reader = self.input.read(); + let mut writer = self.output.write_push(); + let mut reader = self.input.iter(); for _ in 0..(writer.len().min(reader.len())) { - let (input, tag_opt) = reader.pop().unwrap().into(); + let (input, tag_opt) = reader.next().unwrap().into(); let (tagged_out, result) = (self.map)((input, tag_opt.clone()).into()); let (output, tag_out) = tagged_out.into(); @@ -196,15 +193,18 @@ where O: 'static, F: Fn(&mut S, I) -> O, { - fn work(&mut self) -> BlockResult { - self.output.push_iter(self.input.pop_iter() - .map(|x| ((self.map)(&mut self.state, x.0), x.1).into())); + fn work(&mut self) -> BlockResult + { + self.output.push_iter( + self.input + .iter() + .map(|x| ((self.map)(&mut self.state, x.0), x.1).into()), + ); BlockResult::Ok } } #[derive(BlockIO)] -#[sync_block(tagged)] pub struct ScanTagged where F: Fn(&mut S, Tagged) -> Tagged, @@ -239,16 +239,20 @@ where } } -impl<'view, I, O, S, F> SyncBlock<'view> for ScanTagged +impl Block for ScanTagged where I: 'static, O: 'static, - S: 'view, - F: Fn(&mut S, Tagged) -> Tagged + 'view, + F: Fn(&mut S, Tagged) -> Tagged, { - fn sync_work(state: Self::StateView, input: Self::Input) -> Option + fn work(&mut self) -> BlockResult { - Some((*state.map)(state.state, input)) + self.output.push_iter(self.input.iter().map(|tagged| { + let cloned_tag = tagged.1.clone(); + let (output, tag) = (self.map)(&mut self.state, (tagged.0, cloned_tag).into()).into(); + (output, Tag::from_tag_opts(&[&tagged.1, &tag])).into() + })); + BlockResult::Ok } } @@ -288,15 +292,15 @@ impl Block for Repeat { fn work(&mut self) -> BlockResult { - let writer = self.output.write(); - let reader = self.input.read(); + let mut writer = self.output.write_push(); + let mut reader = self.input.iter(); let len = writer.len().min(reader.len() / self.repetitions + 1); for _ in 0..len { if self.remaining == 0 || self.current.is_none() { - if let Some(x) = reader.pop() + if let Some(x) = reader.next() { self.current = Some(x.into()); self.remaining = self.repetitions; @@ -324,7 +328,6 @@ impl Block for Repeat } #[derive(BlockIO)] -#[sync_block] pub struct NullSink { #[input] @@ -339,12 +342,12 @@ impl NullSink } } -impl<'view, I: 'static> SyncBlock<'view> for NullSink +impl Block for NullSink { - fn sync_work(_: Self::StateView, _: Self::Input) -> Option + fn work(&mut self) -> BlockResult { - // Don't do shit ! - Some(()) + self.input.iter().for_each(|_| ()); + BlockResult::Ok } } @@ -383,12 +386,9 @@ impl Block for Tee { fn work(&mut self) -> BlockResult { - let writer_a = self.output_a.write(); - let writer_b = self.output_b.write(); - for x in self - .input - .pop_iter() - .take(writer_a.len().min(writer_b.len())) + let mut writer_a = self.output_a.write_push(); + let mut writer_b = self.output_b.write_push(); + for x in self.input.iter().take(writer_a.len().min(writer_b.len())) { let _ = writer_a.push(x.clone()); let _ = writer_b.push(x); @@ -446,8 +446,8 @@ where { fn work(&mut self) -> BlockResult { - let writer = self.output.write(); - let reader = self.input.read(); + let mut writer = self.output.write_push(); + let mut reader = self.input.iter(); let max_write = writer.len(); let mut written = 0; @@ -472,7 +472,7 @@ where if self.current_iter.is_none() { // Get input - if let Some(input) = reader.pop() + if let Some(input) = reader.next() { let mut new_iter = (self.map)(input.0).into_iter(); if let Some(first_elt) = new_iter.next() diff --git a/oxydsp-dsp/src/blocks/utilities/channels.rs b/oxydsp-dsp/src/blocks/utilities/channels.rs index 578573d..ac41b0a 100644 --- a/oxydsp-dsp/src/blocks/utilities/channels.rs +++ b/oxydsp-dsp/src/blocks/utilities/channels.rs @@ -7,7 +7,6 @@ use oxydsp_flowgraph::block::Block; use oxydsp_flowgraph::block::BlockResult; use oxydsp_flowgraph::io::In; use oxydsp_flowgraph::io::Out; -use oxydsp_flowgraph::io::PopIterable; use oxydsp_flowgraph::io::stream; #[derive(BlockIO)] @@ -49,16 +48,10 @@ impl Block for RxSource, I> { fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - if self + self .output - .push_iter(self.input.try_iter().map(|x| (x, None).into())) - { - BlockResult::Ok - } - else - { - BlockResult::Terminated - } + .push_iter(self.input.try_iter().map(|x| (x, None).into())); + BlockResult::Ok } } @@ -68,7 +61,7 @@ impl Block for TxSink, I> { if self .input - .pop_iter() + .iter() .map(|x| self.output.send(x.0)) .any(|res| res.is_err()) { @@ -87,7 +80,7 @@ impl Block for TxSink, I> { if self .input - .pop_iter() + .iter() .map(|x| self.output.send(x.0)) .any(|res| res.is_err()) { diff --git a/oxydsp-dsp/src/blocks/utilities/graph_control.rs b/oxydsp-dsp/src/blocks/utilities/graph_control.rs new file mode 100644 index 0000000..6cc0dc7 --- /dev/null +++ b/oxydsp-dsp/src/blocks/utilities/graph_control.rs @@ -0,0 +1,53 @@ +use oxydsp_flowgraph::{BlockIO, block::Block, io::{In, Out}, tag::TagKey}; + +#[derive(BlockIO)] +pub struct GraphKiller +{ + #[input] + input: In, + + #[output] + output: Out, + + kill_tag: TagKey +} + +impl GraphKiller +{ + pub fn new(input: In, kill_on: TagKey) -> (Self, In) + { + let (output, port) = oxydsp_flowgraph::io::stream(); + ( + GraphKiller + { + input, + output, + kill_tag: kill_on, + }, + port + ) + } +} + +impl Block for GraphKiller +{ + fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult + { + let mut reader = self.input.iter(); + let mut writer = self.output.write_push(); + for _ in 0..reader.len().min(writer.len()) + { + let (data, tag) = reader.next().unwrap().into(); + + if tag.as_ref().is_some_and(|t| t.retrieve(&self.kill_tag).is_some()) + { + return oxydsp_flowgraph::block::BlockResult::Exit; + } + + let _ = writer.push((data, tag).into()); + } + + + oxydsp_flowgraph::block::BlockResult::Ok + } +} diff --git a/oxydsp-dsp/src/blocks/utilities/iter.rs b/oxydsp-dsp/src/blocks/utilities/iter.rs index 2f351a1..6cc6063 100644 --- a/oxydsp-dsp/src/blocks/utilities/iter.rs +++ b/oxydsp-dsp/src/blocks/utilities/iter.rs @@ -52,7 +52,7 @@ where { fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - let writer = self.output.write(); + let mut writer = self.output.write_push(); for _ in 0..writer.len() { diff --git a/oxydsp-dsp/src/blocks/utilities/squelch.rs b/oxydsp-dsp/src/blocks/utilities/squelch.rs index 21f18cd..8bdd0d9 100644 --- a/oxydsp-dsp/src/blocks/utilities/squelch.rs +++ b/oxydsp-dsp/src/blocks/utilities/squelch.rs @@ -11,7 +11,6 @@ use oxydsp_flowgraph::block::Block; use oxydsp_flowgraph::block::BlockResult; use oxydsp_flowgraph::io::In; use oxydsp_flowgraph::io::Out; -use oxydsp_flowgraph::io::PopIterable; #[derive(BlockIO)] pub struct Squelch @@ -61,8 +60,8 @@ where { fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - let writer = self.output.write(); - for x in self.input.pop_iter().take(writer.len()) + let mut writer = self.output.write_push(); + for x in self.input.iter().take(writer.len()) { let (element, tag) = x.into(); diff --git a/oxydsp-dsp/src/filtering/fir.rs b/oxydsp-dsp/src/filtering/fir.rs index c115453..bef6a4e 100644 --- a/oxydsp-dsp/src/filtering/fir.rs +++ b/oxydsp-dsp/src/filtering/fir.rs @@ -6,8 +6,6 @@ use num::Zero; use num::complex::ComplexFloat; use rustfft::FftNum; use rustfft::FftPlanner; -use std::array; -use std::collections::VecDeque; use std::f64::consts::PI; use std::iter::Sum; use std::ops::Add; diff --git a/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs index 7e0ec52..2d39c24 100644 --- a/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs +++ b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs @@ -56,7 +56,7 @@ fn block_io_get_inputs(fields: zyn::syn::Fields) -> zyn::TokenStream { let fields = fields.as_named().unwrap().named.clone(); zyn::zyn!( - fn get_inputs_mut(&mut self) -> Vec<&mut dyn oxydsp_flowgraph::io::AnonymousIn> + fn get_inputs_mut(&mut self) -> Vec<&mut dyn oxydsp_flowgraph::io::edge::AnonymousIn> { let mut acc = vec![]; use oxydsp_flowgraph::block::BlockInput; @@ -67,7 +67,7 @@ fn block_io_get_inputs(fields: zyn::syn::Fields) -> zyn::TokenStream acc } - fn get_inputs(&self) -> Vec<&dyn oxydsp_flowgraph::io::AnonymousIn> + fn get_inputs(&self) -> Vec<&dyn oxydsp_flowgraph::io::edge::AnonymousIn> { let mut acc = vec![]; use oxydsp_flowgraph::block::BlockInput; @@ -85,7 +85,7 @@ fn block_io_get_outputs(fields: zyn::syn::Fields) -> zyn::TokenStream { let fields = fields.as_named().unwrap().named.clone(); zyn::zyn!( - fn get_outputs_mut(&mut self) -> Vec<&mut dyn oxydsp_flowgraph::io::AnonymousOut> + fn get_outputs_mut(&mut self) -> Vec<&mut dyn oxydsp_flowgraph::io::edge::AnonymousOut> { let mut acc = vec![]; use oxydsp_flowgraph::block::BlockOutput; @@ -96,7 +96,7 @@ fn block_io_get_outputs(fields: zyn::syn::Fields) -> zyn::TokenStream acc } - fn get_outputs(&self) -> Vec<&dyn oxydsp_flowgraph::io::AnonymousOut> + fn get_outputs(&self) -> Vec<&dyn oxydsp_flowgraph::io::edge::AnonymousOut> { let mut acc = vec![]; use oxydsp_flowgraph::block::BlockOutput; diff --git a/oxydsp-flowgraph/src/block.rs b/oxydsp-flowgraph/src/block.rs index f2ad426..af7d213 100644 --- a/oxydsp-flowgraph/src/block.rs +++ b/oxydsp-flowgraph/src/block.rs @@ -1,5 +1,5 @@ -use crate::io::AnonymousIn; -use crate::io::AnonymousOut; +use crate::io::edge::AnonymousIn; +use crate::io::edge::AnonymousOut; use crate::io::In; use crate::io::Out; use crate::io::edge::BlockIOIndex; diff --git a/oxydsp-flowgraph/src/graph.rs b/oxydsp-flowgraph/src/graph.rs index 608c188..267b4ca 100644 --- a/oxydsp-flowgraph/src/graph.rs +++ b/oxydsp-flowgraph/src/graph.rs @@ -5,7 +5,6 @@ use std::thread::JoinHandle; use crossbeam_deque::Steal; use crossbeam_deque::Worker; -use crate::block; use crate::block::GraphableBlock; use crate::io::edge::BlockIOIndex; diff --git a/oxydsp-flowgraph/src/io.rs b/oxydsp-flowgraph/src/io.rs index e6f6eeb..18895d1 100644 --- a/oxydsp-flowgraph/src/io.rs +++ b/oxydsp-flowgraph/src/io.rs @@ -1,17 +1,17 @@ -use std::any::Any; +use std::mem::ManuallyDrop; use std::mem::MaybeUninit; use std::sync::Arc; use std::sync::Mutex; use crate::stream::StreamConsumer; use crate::stream::StreamProducer; -use crate::stream::{self}; +use crate::stream::StreamReader; +use crate::stream::StreamWriter; use crate::tag::TagSlot; use crate::tag::Tagged; pub mod edge; -use crate::io::edge::BlockIOIndex; use crate::io::edge::Edge; /// Represents a input port for a block @@ -34,111 +34,276 @@ pub struct Out edge: Arc>, } -/// Trait to manipulate a block's input in a type agnostic/erased way -pub trait AnonymousIn +// Input Output interfaces + +/// Output interface to write elements in a "push" fashion +pub struct OutPush<'a, T> { - /// Inform the input about the index of the blocks it's in, as well as its port index - fn set_index(&self, index: BlockIOIndex); + data_writer: ManuallyDrop>, + tag_writer: ManuallyDrop>, - /// Returns None or the block index of the block, and the block port of the corresponding - /// Out object - fn get_producer_block(&self) -> Option; - - /// Sets the internal stream object - fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer); + total_length: usize, + written_data: usize, + written_tags: usize, + start_index: usize, } -/// Trait to manipulate a block's output in a type agnostic/erased way -pub trait AnonymousOut +impl<'a, T> OutPush<'a, T> { - /// Inform the output about the index of the blocks it's in, as well as its port index - fn set_index(&self, index: BlockIOIndex); + pub fn len(&self) -> usize + { + let data_slices = self.data_writer.slices(); + // This gives better performance ! + // Probably because it prevents claiming the whole buffer at once + // But this is hacky. It should probably be managed at a lower level + // ((data_slices.0.len() + data_slices.1.len()) / 2) - self.written_data - /// Sets the internal stream object - fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer); + data_slices.0.len() + data_slices.1.len() - self.written_data + } - /// Returns None or the block index of the block, and the block port of the corresponding - /// In object - fn get_consumer_block(&self) -> Option; + pub fn is_empty(&self) -> bool + { + self.len() == 0 + } - /// Creates the stream with the correct corresponding type, in a type erased way. + pub fn push(&mut self, data: Tagged) -> Result<(), Tagged> + { + if self.written_data >= self.total_length + { + return Err(data); + } + + let data_slices = self.data_writer.slices_mut(); + let tag_slices = self.tag_writer.slices_mut(); + + // Write a data + let data_ref = { + if self.written_data < data_slices.0.len() + { + &mut data_slices.0[self.written_data] + } + else + { + &mut data_slices.1[self.written_data - data_slices.0.len()] + } + }; + // Index of the taken element within the stream. + *data_ref = MaybeUninit::new(data.0); + let element_index = self.start_index + self.written_data; + self.written_data += 1; + + // Check for corresponding tag + let tag_ref = { + if self.written_tags < tag_slices.0.len() + { + &mut tag_slices.0[self.written_tags] + } + else + { + &mut tag_slices.1[self.written_tags - tag_slices.0.len()] + } + }; + + if let Some(tag) = data.1 + { + *tag_ref = MaybeUninit::new(TagSlot { + position: element_index, + tag, + }); + self.written_tags += 1; + } + Ok(()) + } +} + +impl<'a, T> Drop for OutPush<'a, T> +{ + fn drop(&mut self) + { + let data_writer = + unsafe { ManuallyDrop::>::take(&mut self.data_writer) }; + let tag_writer = + unsafe { ManuallyDrop::>::take(&mut self.tag_writer) }; + tag_writer.produce(self.written_tags); + data_writer.produce(self.written_data); + } +} + +pub struct InIter<'a, T> +{ + data_reader: ManuallyDrop>, + tag_reader: ManuallyDrop>, + + total_length: usize, + total_tag_length: usize, + read_data: usize, + read_tags: usize, + start_index: usize, +} + +impl<'a, T> Iterator for InIter<'a, T> +{ + type Item = Tagged; + + fn next(&mut self) -> Option + { + if self.read_data >= self.total_length + { + return None; + } + + let data_slices = self.data_reader.slices_mut(); + let tag_slices = self.tag_reader.slices_mut(); + + // Take a data + // SAFETY: + // All takable should contain a valid element: guarteed by the queue + // We strictly monotonicly take all elements, in order, in the slices. + // No two same indices should be taken. + // We cound the number of taken elemnts and consume the correct amount from the queue + let data = { + if self.read_data < data_slices.0.len() + { + unsafe { data_slices.0[self.read_data].take() } + } + else + { + unsafe { data_slices.1[self.read_data - data_slices.0.len()].take() } + } + }; + // Index of the taken element within the stream. + let element_index = self.start_index + self.read_data; + self.read_data += 1; + + // Check for corresponding tag + let mut tag = None; + if self.read_tags < self.total_tag_length + { + let tag_ref = { + if self.read_tags < tag_slices.0.len() + { + &mut tag_slices.0[self.read_tags] + } + else + { + &mut tag_slices.1[self.read_tags - tag_slices.0.len()] + } + }; + + // SAFETY: + // Same as before : strictly monotic access in the tag slices + if unsafe { tag_ref.peek().position == element_index } + { + // The next tag in line is tagging the just-poped element. + // We get it + tag = Some(unsafe { tag_ref.take().tag }); + self.read_tags += 1; + } + } + + Some(Tagged::new(data, tag)) + } + + fn size_hint(&self) -> (usize, Option) + { + let len = + self.data_reader.slices().0.len() + self.data_reader.slices().1.len() - self.read_data; + (len, Some(len)) + } +} + +impl<'a, T> Drop for InIter<'a, T> +{ + fn drop(&mut self) + { + let data_reader = + unsafe { ManuallyDrop::>::take(&mut self.data_reader) }; + let tag_reader = + unsafe { ManuallyDrop::>::take(&mut self.tag_reader) }; + tag_reader.consume(self.read_tags); + data_reader.consume(self.read_data); + } +} + +impl<'a, T> ExactSizeIterator for InIter<'a, T> {} + +impl In +{ + pub fn iter<'a>(&'a mut self) -> InIter<'a, T> + { + let first_index = self.stream.as_ref().unwrap().first_index(); + let data_reader = self.stream.as_mut().unwrap().read_takable(); + let total_length = data_reader.slices().0.len() + data_reader.slices().1.len(); + let tag_reader = self.tag_stream.as_mut().unwrap().read_takable(); + let total_tag_length = tag_reader.slices().0.len() + tag_reader.slices().1.len(); + + InIter { + data_reader: ManuallyDrop::new(data_reader), + tag_reader: ManuallyDrop::new(tag_reader), + read_data: 0, + read_tags: 0, + total_length, + total_tag_length, + start_index: first_index, + } + } +} + +impl Out +{ + pub fn write_push<'a>(&'a mut self) -> OutPush<'a, T> + { + let first_index = self.stream.as_ref().unwrap().first_index(); + let data_writer = self.stream.as_mut().unwrap().write(); + let total_length = data_writer.slices().0.len() + data_writer.slices().1.len(); + + let tag_writer = self.tag_stream.as_mut().unwrap().write(); + + OutPush { + data_writer: ManuallyDrop::new(data_writer), + tag_writer: ManuallyDrop::new(tag_writer), + total_length, + written_data: 0, + written_tags: 0, + start_index: first_index, + } + } + + /// Pushes an iterator to the output, sending the maximum amount of elements + /// to the output. /// - /// This delegation of stream creation is necessary to allow the graph to manipulate - /// it, as it cannot know about the generic type of the stream. - fn create_anonymous_stream( - &self, - capacity: usize, - ) -> (AnonymousStreamProducer, AnonymousStreamConsumer); -} - -impl AnonymousIn for In -{ - fn set_index(&self, index: BlockIOIndex) + /// It will not consume the iterator more than what can be sent. + /// + /// ``` + /// let writer = output.write(); + /// + /// // Send only 42s to the output + /// writer.push_iter(std::iter::repeat(42)); + /// ``` + pub fn push_iter>>(&mut self, mut iter: I) -> bool { - self.edge.lock().unwrap().to = Some(index); + let mut pusher = self.write_push(); + let mut len = pusher.len(); + + while len > 0 + { + len -= 1; + match iter.next() + { + Some(element) => {let _ = pusher.push(element); }, + None => return false, + } + } + + return false; } - fn get_producer_block(&self) -> Option + /// Meta information + /// Returns a string of the type of the output + pub fn get_type_name(&self) -> &'static str { - self.edge.lock().unwrap().from + std::any::type_name::() } - - fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer) - { - let (stream, tag_stream) = consumer.downcast::(); - self.stream = Some(stream); - self.tag_stream = Some(tag_stream); - } -} - -impl AnonymousOut for Out -{ - fn set_index(&self, index: BlockIOIndex) - { - self.edge.lock().unwrap().from = Some(index); - } - - fn get_consumer_block(&self) -> Option - { - self.edge.lock().unwrap().to - } - - fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer) - { - let (stream, tag_stream) = producer.downcast::(); - self.stream = Some(stream); - self.tag_stream = Some(tag_stream); - } - - // Delegate stream creation to Out object - // which knows the stream type - fn create_anonymous_stream( - &self, - capacity: usize, - ) -> (AnonymousStreamProducer, AnonymousStreamConsumer) - { - let (tx, rx) = stream::bounded_queue::(capacity); - let (tx_tag, rx_tag) = stream::bounded_queue::(capacity); - ((tx, tx_tag).into(), (rx, rx_tag).into()) - } -} - -/// A Reader to get data from an input -pub struct InReader<'a, T> -{ - data_slice_1: &'a mut [MaybeUninit], - data_slice_2: &'a mut [MaybeUninit], - // data_reader: StreamReader<'a, T>, - // tag_reader: StreamReader<'a, TagSlot>, -} - -/// A writer to send data to an output -pub struct OutWriter<'a, T> -{ - data_slice_1: &'a mut [MaybeUninit], - data_slice_2: &'a mut [MaybeUninit], - // data_writer: StreamWriter<'a, T>, - // tag_writer: StreamWriter<'a, TagSlot>, } /// Creates a stream that can then be used to link blocks @@ -168,207 +333,50 @@ pub fn stream() -> (Out, In) ) } -impl In +pub fn streams() -> ([Out; N], [In; N]) { - /// Gets a reader view from an input. - /// - /// ``` - /// let reader = input.read(); - /// let data = reader.pop(); - /// ``` - pub fn read<'a>(&'a mut self) -> InReader - { - // let data_reader = self.stream.as_mut().unwrap().read(); - // let tag_reader = self.tag_stream.as_mut().unwrap().read(); - InReader { - // data_reader, - // tag_reader, - } - } + // Ugly simultanous initialization + let mut ins: [_; N] = std::array::from_fn(|_| None); + let mut outs: [_; N] = std::array::from_fn(|_| None); + + ins.iter_mut() + .zip(outs.iter_mut()) + .for_each(|(input, output)| { + let (newout, newin) = stream(); + *input = Some(newin); + *output = Some(newout); + }); + + let ins_some: [_; N] = std::array::from_fn(|i| ins[i].take().unwrap()); + let outs_some: [_; N] = std::array::from_fn(|i| outs[i].take().unwrap()); + + (outs_some, ins_some) } -impl Out -{ - /// Gets a reader view from an output. - /// - /// ``` - /// let writer = output.write(); - /// writer.push((data, tag).into()); - /// ``` - pub fn write<'a>(&'a mut self) -> OutWriter - { - OutWriter { - // data_writer: self.stream.as_mut().unwrap().write(), - // tag_writer: self.tag_stream.as_mut().unwrap().write(), - } - } - - /// Pushes an iterator to the output, sending the maximum amount of elements - /// to the output. - /// - /// It will not consume the iterator more than what can be sent. - /// - /// ``` - /// let writer = output.write(); - /// - /// // Send only 42s to the output - /// writer.push_iter(std::iter::repeat(42)); - /// ``` - pub fn push_iter>>(&mut self, mut iter: I) -> bool - { - false - // let writer = self.write(); - // let len = writer.len(); - // - // for _ in 0..len - // { - // if let Some(elt) = iter.next() - // { - // let _ = writer.push(elt); - // } - // else - // { - // return false; - // } - // } - // true - } - - /// Meta information - /// Returns a string of the type of the output - pub fn get_type_name(&self) -> &'static str - { - std::any::type_name::() - } -} - -// impl InReader -// { -// /// Gets the amount of elements that are available -// /// on the input. -// pub fn len(&self) -> usize -// { -// 0 -// //self.data_reader.len() -// } -// -// /// Returns true iif no elements are available on the input. -// pub fn is_empty(&self) -> bool -// { -// //self.len() == 0 -// true -// } -// -// /// Pops an element from the input. -// /// It is guaranteed to return `Some(data)` if -// /// if pop was called strictly less times than len -// pub fn pop(&self) -> Option> -// { -// None -// // let data = self.data_reader.pop_with_index(); -// // if let Some((data, index)) = data -// // { -// // let mut tag = None; -// // if self -// // .tag_reader -// // .peek(|t| t.position) -// // .is_some_and(|x| x == index) -// // { -// // tag = self.tag_reader.pop(); -// // } -// // Some((data, tag.map(|t| t.tag)).into()) -// // } -// // else -// // { -// // None -// // } -// } -// -// /// Pops an element from the input, discarding the tag. -// /// It is guaranteed to return `Some(data)` if -// /// if pop was called strictly less times than len -// pub fn pop_untag(&self) -> Option -// { -// None -// // self.pop().map(|data| data.into_inner()) -// } -// -// } -// -// impl OutWriter<'_, T> -// { -// /// Gets how much room is available on the output -// pub fn len(&self) -> usize -// { -// 0 -// //self.data_writer.len().min(self.tag_writer.len()) -// } -// -// /// Returns true iif no element can be sent -// pub fn is_empty(&self) -> bool -// { -// true -// //self.len() == 0 -// } -// -// /// Pushes some tagged data on the input. -// /// -// /// The operation succeeds (`Ok(())`) if there is enough room -// /// Or fails returning the given data to the caller. -// pub fn push(&self, data: Tagged) -> Result<(), Tagged> -// { -// Ok(()) -// // let (data, tag) = data.into(); -// // let position = self.data_writer.next_index(); -// // let tag = tag.map(|t| TagSlot { position, tag: t }); -// // -// // match self.data_writer.push(data) -// // { -// // Ok(_) if tag.is_some() => -// // { -// // let _ = self.tag_writer.push(tag.unwrap()); -// // Ok(()) -// // } -// // Ok(_) => Ok(()), -// // Err(data) => Err((data, tag.map(|t| t.tag)).into()), -// // } -// } -// -// /// Pushes some data on the input (not tagged). -// /// -// /// The operation succeeds (`Ok(())`) if there is enough room -// /// Or fails returning the given data to the caller. -// pub fn push_no_tag(&self, data: T) -> Result<(), T> -// { -// Ok(()) -// //self.data_writer.push(data) -// } -// } - // -------------------- // Iterator facilites // -------------------- -/// An iterator type to push data to output(s) -pub struct PopIter -{ - len: usize, - popped: usize, - reader: T, -} +// An iterator type to push data to output(s) +// pub struct PopIter +// { +// len: usize, +// popped: usize, +// reader: T, +// } -/// Type on which data can be popped from -pub trait PopIterable<'a> -{ - type Output; - - /// Returns an iterator on the input elements : - /// - /// ``` - /// (&mut input_a, &mut input_b, &mut input_c).pop_iter().for_each(|(a, b, c)| println!("Got {a}, {b} and {c} !")); - /// ``` - fn pop_iter(&'a mut self) -> PopIter; -} +// Type on which data can be popped from +// pub trait PopIterable<'a> +// { +// type Output; +// +// /// Returns an iterator on the input elements : +// /// +// /// ``` +// /// (&mut input_a, &mut input_b, &mut input_c).pop_iter().for_each(|(a, b, c)| println!("Got {a}, {b} and {c} !")); +// /// ``` +// fn pop_iter(&'a mut self) -> PopIter; +// } // impl<'a, T: 'static> PopIterable<'a> for In // { @@ -419,69 +427,3 @@ pub trait PopIterable<'a> // impl_iterator_for_pop_iter_tuple! {10} // impl_iterator_for_pop_iter_tuple! {11} // impl_iterator_for_pop_iter_tuple! {12} - -/// StreamProducer object for data and tags stored in a type -/// agnostic/erased way. -/// -/// This is needed for the graph system to manipulate and pass arround these objects -/// as they can't/don't know about the generic types of the stream objects -pub struct AnonymousStreamProducer -{ - inner: Box, - inner_tag: StreamProducer, -} - -/// StreamConsumer object for data and tags stored in a type -/// agnostic/erased way. -/// -/// This is needed for the graph system to manipulate and pass arround these objects -/// as they can't/don't know about the generic types of the stream objects -pub struct AnonymousStreamConsumer -{ - inner: Box, - inner_tag: StreamConsumer, -} - -impl From<(StreamProducer, StreamProducer)> for AnonymousStreamProducer -{ - fn from(value: (StreamProducer, StreamProducer)) -> Self - { - AnonymousStreamProducer { - inner: Box::new(value.0), - inner_tag: value.1, - } - } -} - -impl From<(StreamConsumer, StreamConsumer)> for AnonymousStreamConsumer -{ - fn from(value: (StreamConsumer, StreamConsumer)) -> Self - { - AnonymousStreamConsumer { - inner: Box::new(value.0), - inner_tag: value.1, - } - } -} - -impl AnonymousStreamProducer -{ - pub(crate) fn downcast(self) -> (StreamProducer, StreamProducer) - { - ( - *self.inner.downcast::>().unwrap(), - self.inner_tag, - ) - } -} - -impl AnonymousStreamConsumer -{ - pub(crate) fn downcast(self) -> (StreamConsumer, StreamConsumer) - { - ( - *self.inner.downcast::>().unwrap(), - self.inner_tag, - ) - } -} diff --git a/oxydsp-flowgraph/src/io/edge.rs b/oxydsp-flowgraph/src/io/edge.rs index 2b96a61..8cb1cab 100644 --- a/oxydsp-flowgraph/src/io/edge.rs +++ b/oxydsp-flowgraph/src/io/edge.rs @@ -1,3 +1,7 @@ +use std::any::Any; + +use crate::{io::{In, Out}, stream::{self, StreamConsumer, StreamProducer}, tag::TagSlot}; + /// Shared object between a block's input and output objects /// so they can "communicate" and know about each other #[derive(Default)] @@ -21,3 +25,158 @@ pub struct BlockIOIndex pub block_index: usize, pub port_index: usize, } + +/// Trait to manipulate a block's input in a type agnostic/erased way +pub trait AnonymousIn +{ + /// Inform the input about the index of the blocks it's in, as well as its port index + fn set_index(&self, index: BlockIOIndex); + + /// Returns None or the block index of the block, and the block port of the corresponding + /// Out object + fn get_producer_block(&self) -> Option; + + /// Sets the internal stream object + fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer); +} + +/// Trait to manipulate a block's output in a type agnostic/erased way +pub trait AnonymousOut +{ + /// Inform the output about the index of the blocks it's in, as well as its port index + fn set_index(&self, index: BlockIOIndex); + + /// Sets the internal stream object + fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer); + + /// Returns None or the block index of the block, and the block port of the corresponding + /// In object + fn get_consumer_block(&self) -> Option; + + /// Creates the stream with the correct corresponding type, in a type erased way. + /// + /// This delegation of stream creation is necessary to allow the graph to manipulate + /// it, as it cannot know about the generic type of the stream. + fn create_anonymous_stream( + &self, + capacity: usize, + ) -> (AnonymousStreamProducer, AnonymousStreamConsumer); +} + +impl AnonymousIn for In +{ + fn set_index(&self, index: BlockIOIndex) + { + self.edge.lock().unwrap().to = Some(index); + } + + fn get_producer_block(&self) -> Option + { + self.edge.lock().unwrap().from + } + + fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer) + { + let (stream, tag_stream) = consumer.downcast::(); + self.stream = Some(stream); + self.tag_stream = Some(tag_stream); + } +} + +impl AnonymousOut for Out +{ + fn set_index(&self, index: BlockIOIndex) + { + self.edge.lock().unwrap().from = Some(index); + } + + fn get_consumer_block(&self) -> Option + { + self.edge.lock().unwrap().to + } + + fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer) + { + let (stream, tag_stream) = producer.downcast::(); + self.stream = Some(stream); + self.tag_stream = Some(tag_stream); + } + + // Delegate stream creation to Out object + // which knows the stream type + fn create_anonymous_stream( + &self, + capacity: usize, + ) -> (AnonymousStreamProducer, AnonymousStreamConsumer) + { + let (tx, rx) = stream::bounded_queue::(capacity); + let (tx_tag, rx_tag) = stream::bounded_queue::(capacity); + ((tx, tx_tag).into(), (rx, rx_tag).into()) + } +} + +/// StreamProducer object for data and tags stored in a type +/// agnostic/erased way. +/// +/// This is needed for the graph system to manipulate and pass arround these objects +/// as they can't/don't know about the generic types of the stream objects +pub struct AnonymousStreamProducer +{ + inner: Box, + inner_tag: StreamProducer, +} + +/// StreamConsumer object for data and tags stored in a type +/// agnostic/erased way. +/// +/// This is needed for the graph system to manipulate and pass arround these objects +/// as they can't/don't know about the generic types of the stream objects +pub struct AnonymousStreamConsumer +{ + inner: Box, + inner_tag: StreamConsumer, +} + +impl From<(StreamProducer, StreamProducer)> for AnonymousStreamProducer +{ + fn from(value: (StreamProducer, StreamProducer)) -> Self + { + AnonymousStreamProducer { + inner: Box::new(value.0), + inner_tag: value.1, + } + } +} + +impl From<(StreamConsumer, StreamConsumer)> for AnonymousStreamConsumer +{ + fn from(value: (StreamConsumer, StreamConsumer)) -> Self + { + AnonymousStreamConsumer { + inner: Box::new(value.0), + inner_tag: value.1, + } + } +} + +impl AnonymousStreamProducer +{ + pub(crate) fn downcast(self) -> (StreamProducer, StreamProducer) + { + ( + *self.inner.downcast::>().unwrap(), + self.inner_tag, + ) + } +} + +impl AnonymousStreamConsumer +{ + pub(crate) fn downcast(self) -> (StreamConsumer, StreamConsumer) + { + ( + *self.inner.downcast::>().unwrap(), + self.inner_tag, + ) + } +} diff --git a/oxydsp-flowgraph/src/stream.rs b/oxydsp-flowgraph/src/stream.rs index c96be31..09e66d5 100644 --- a/oxydsp-flowgraph/src/stream.rs +++ b/oxydsp-flowgraph/src/stream.rs @@ -52,6 +52,36 @@ unsafe impl Sync for StreamProducer {} unsafe impl Send for StreamConsumer {} unsafe impl Sync for StreamConsumer {} +#[repr(transparent)] +pub struct Takable(MaybeUninit); + +impl Takable +{ + pub fn new(element: T) -> Self + { + Takable(MaybeUninit::new(element)) + } + + pub unsafe fn take(&mut self) -> T + { + unsafe { std::mem::replace(&mut self.0, MaybeUninit::uninit()).assume_init() } + } + + pub unsafe fn peek(&self) -> &T + { + unsafe { self.0.assume_init_ref() } + } + + pub unsafe fn peek_mut(&mut self) -> &mut T + { + unsafe { self.0.assume_init_mut() } + } +} + +unsafe fn takable_slice_from_maybe_uninitt(slice: &mut [MaybeUninit]) -> &mut [Takable] +{ + unsafe { std::mem::transmute(slice) } +} pub fn bounded_queue(capacity: usize) -> (StreamProducer, StreamConsumer) { @@ -89,8 +119,65 @@ pub fn bounded_queue(capacity: usize) -> (StreamProducer, StreamConsumer +{ + slices: (&'a mut [Takable], &'a mut [Takable]), + + // UNSAFE ! + consumer: &'a mut StreamConsumer, +} + +pub struct StreamWriter<'a, T> +{ + slices: (&'a mut [MaybeUninit], &'a mut [MaybeUninit]), + + // UNSAFE ! + producer: &'a mut StreamProducer, +} + +impl<'a, T> StreamReader<'a, T> +{ + pub fn slices(&self) -> (&[Takable], &[Takable]) + { + (self.slices.0, self.slices.1) + } + + pub fn slices_mut(&mut self) -> (&mut [Takable], &mut [Takable]) + { + (self.slices.0, self.slices.1) + } + + pub fn consume(self, amount: usize) + { + self.consumer.consume(amount); + } +} + +impl<'a, T> StreamWriter<'a, T> +{ + pub fn slices(&self) -> (&[MaybeUninit], &[MaybeUninit]) + { + (self.slices.0, self.slices.1) + } + + pub fn slices_mut(&mut self) -> (&mut [MaybeUninit], &mut [MaybeUninit]) + { + (self.slices.0, self.slices.1) + } + + pub fn produce(self, amount: usize) + { + self.producer.produce(amount); + } +} + impl StreamProducer { + pub fn first_index(&self) -> usize + { + self.inner.head.load(Ordering::Relaxed) + } + pub fn produce(&mut self, written: usize) { // Advance head. @@ -101,12 +188,10 @@ impl StreamProducer assert!(head + written - tail <= (self.inner.capacity_mask + 1)); // We want writes to the buffer to be visible when acquired in the pop side - self.inner - .head - .store(head + written, Ordering::Release); + self.inner.head.store(head + written, Ordering::Release); } - pub fn write(&mut self) -> (&mut [MaybeUninit], &mut [MaybeUninit]) + pub fn write<'a>(&'a mut self) -> StreamWriter<'a, T> { // We need to claim the maximum amount of elements. let tail = self.inner.tail.load(Ordering::Acquire); @@ -139,18 +224,18 @@ impl StreamProducer let k = &mut *self.inner.buffer.get(); let (start_to_head, head_to_end) = k.split_at_mut_unchecked(wrapped_head); - let (start_to_tail, _tail_to_head) = - start_to_head.split_at_mut_unchecked(wrapped_tail); // This functions borrows the stream mutably. As such, only one instance // of these slices can exist for a given stream. - (head_to_end, start_to_tail) + StreamWriter { + slices: (start_to_head, head_to_end), + producer: self, + } } } else { // We MUST have : tail < head - if wrapped_tail < wrapped_head { // Current configuration : @@ -158,7 +243,7 @@ impl StreamProducer // | | // tail head // ___ ____ - // slice1 slice2 + // slice2 slice1 // SAFETY: // @@ -175,7 +260,10 @@ impl StreamProducer let (start_to_tail, _tail_to_head) = start_to_head.split_at_mut_unchecked(wrapped_tail); - (head_to_end, start_to_tail) + StreamWriter { + slices: (head_to_end, start_to_tail), + producer: self, + } } } else @@ -203,10 +291,14 @@ impl StreamProducer // Head and tail are both indices of the slice unsafe { let k = &mut *self.inner.buffer.get(); - let (_start_to_head, head_to_tail) = k.split_at_mut_unchecked(wrapped_head); - let (head_to_tail, empty_slice) = - head_to_tail.split_at_mut_unchecked(wrapped_tail - wrapped_head); - (head_to_tail, empty_slice) + let (start_to_tail, _tail_to_end) = k.split_at_mut_unchecked(wrapped_tail); + let (_start_to_head, head_to_tail) = start_to_tail.split_at_mut_unchecked(wrapped_head); + let (empty_slice, head_to_tail) = head_to_tail.split_at_mut_unchecked(0); + + StreamWriter { + slices: (head_to_tail, empty_slice), + producer: self, + } } } } @@ -222,15 +314,18 @@ impl StreamConsumer let tail = self.inner.tail.load(Ordering::Relaxed); // Check bounds - assert!(tail + read <= head); + assert!(tail + read <= head); // We want writes to the buffer to be visible when acquired in the pop side - self.inner - .tail - .store(tail + read, Ordering::Release); + self.inner.tail.store(tail + read, Ordering::Release); } - pub fn read_uninit(&mut self) -> (&mut [MaybeUninit], &mut [MaybeUninit]) + pub fn first_index(&self) -> usize + { + self.inner.tail.load(Ordering::Relaxed) + } + + pub fn read_takable<'a>(&'a mut self) -> StreamReader<'a, T> { // We need to claim the maximum amount of elements. let head = self.inner.head.load(Ordering::Acquire); @@ -247,14 +342,20 @@ impl StreamConsumer // Buffer is empty. Return empty slice unsafe { let k = &mut *self.inner.buffer.get(); - let head_to_tail = &mut k[wrapped_head..wrapped_tail]; - let (empty_1, empty_2) = head_to_tail.split_at_mut_unchecked(0); - (empty_1, empty_2) + let empty = &mut k[0..0]; + let (empty_1, empty_2) = empty.split_at_mut_unchecked(0); + StreamReader { + slices: ( + takable_slice_from_maybe_uninitt(empty_1), + takable_slice_from_maybe_uninitt(empty_2), + ), + consumer: self, + } } } else { - // Necessarly: wrapped_tail < wrapped_head + // Necessarly: wrapped_tail <= wrapped_head // Two cases : The buffer overlaps the wrapping or not if wrapped_tail < wrapped_head { @@ -275,7 +376,15 @@ impl StreamConsumer let k = &mut (&mut *self.inner.buffer.get())[wrapped_tail..wrapped_head]; let (tail_to_head, empty_slice) = k.split_at_mut_unchecked(wrapped_head - wrapped_tail); - (tail_to_head, empty_slice) + assert_eq!(empty_slice.len(), 0); + + StreamReader { + slices: ( + takable_slice_from_maybe_uninitt(tail_to_head), + takable_slice_from_maybe_uninitt(empty_slice), + ), + consumer: self, + } } } else @@ -310,112 +419,31 @@ impl StreamConsumer let (start_to_head, _head_to_tail) = start_to_tail.split_at_mut_unchecked(wrapped_head); - (tail_to_end, start_to_head) + StreamReader { + slices: ( + takable_slice_from_maybe_uninitt(tail_to_end), + takable_slice_from_maybe_uninitt(start_to_head), + ), + consumer: self, + } } } } } - - // Creates a reader of contiguous elements that - // satisfy the predicate - // pub fn read_while(&mut self, predicate: F) -> StreamReader<'_, T> - // where - // F: Fn(&T) -> bool, - // { - // // Take a normal reader. This contains available elements to read. - // let mut reader = self.read(); - // - // // We need to trim the slices to keep only the satified elements - // - // // First slice - // let mut first_kept = 0; - // // SAFETY: - // // - // // Only us can have a reference to these slices of the buffer - // for element in unsafe { &*reader.first.get() } - // { - // // SAFETY - // // - // // If this element is in a reader returned by self.read - // // with no pop called, we know it is initialized - // let init_element = unsafe { element.assume_init_ref() }; - // let sat = predicate(init_element); - // if !sat - // { - // // Stop here - // // Forget about second slice - // reader.second_len = 0; - // reader.second = None; - // - // // Trim first slice - // reader.first_len = first_kept; - // unsafe { - // reader.first = std::mem::transmute::< - // &[MaybeUninit], - // &UnsafeCell<[MaybeUninit]>, - // >(&(&*reader.first.get())[0..first_kept]); - // } - // - // return reader; - // } - // first_kept += 1; - // } - // - // // If we are here, all of the elements of the first slice, satisfy the predicate - // - // if let Some(second_slice) = &mut reader.second - // { - // // Second slice - // let mut second_kept = 0; - // // SAFETY: - // // - // // Only us can have a reference to these slices of the buffer - // for element in unsafe { &*second_slice.get() } - // { - // // SAFETY - // // - // // If this element is in a reader returned by self.read - // // with no pop called, we know it is initialized - // let init_element = unsafe { element.assume_init_ref() }; - // let sat = predicate(init_element); - // if !sat - // { - // // Stop here - // // Trim second slice - // reader.second_len = second_kept; - // unsafe { - // reader.second = Some(std::mem::transmute::< - // &[MaybeUninit], - // &UnsafeCell<[MaybeUninit]>, - // >( - // &(&*second_slice.get())[0..first_kept] - // )); - // } - // return reader; - // } - // second_kept += 1; - // } - // } - // - // return reader; - // } } -impl StreamConsumer -{ - pub fn read(&mut self) -> (&[T], &[T]) - { - let (slice_1, slice_2) = self.read_uninit(); - unsafe - { - (std::mem::transmute(slice_1), std::mem::transmute(slice_2)) - } - - } -} +// impl StreamConsumer +// { +// pub fn read(&mut self) -> (&[T], &[T]) +// { +// let (slice_1, slice_2) = self.read_takable(); +// unsafe { (std::mem::transmute(slice_1), std::mem::transmute(slice_2)) } +// } +// } mod test { + #[allow(unused_imports)] use std::mem::MaybeUninit; #[allow(unused_imports)] @@ -428,7 +456,8 @@ mod test let (mut tx, mut rx) = bounded_queue::(4); { - let (a, b) = tx.write(); + let mut writer = tx.write(); + let (a, b) = writer.slices_mut(); assert_eq!(a.len(), 4); assert_eq!(b.len(), 0); @@ -441,24 +470,28 @@ mod test } { - let (a, b) = rx.read(); + let mut reader = rx.read_takable(); + let (a, b) = reader.slices_mut(); assert_eq!(a.len(), 4); assert_eq!(b.len(), 0); - assert_eq!(a[0], 0); - assert_eq!(a[1], 1); - assert_eq!(a[2], 2); - assert_eq!(a[3], 3); + unsafe { + assert_eq!(a[0].take(), 0); + assert_eq!(a[1].take(), 1); + assert_eq!(a[2].take(), 2); + assert_eq!(a[3].take(), 3); + } rx.consume(4); } // Put stream into weird situation { - let (a, b) = tx.write(); + let mut writer = tx.write(); + let (a, b) = writer.slices_mut(); assert_eq!(a.len(), 4); assert_eq!(b.len(), 0); - + a[0] = MaybeUninit::new(0); a[1] = MaybeUninit::new(1); a[2] = MaybeUninit::new(2); @@ -467,27 +500,33 @@ mod test } { - let (a, b) = rx.read(); + let mut reader = rx.read_takable(); + let (a, b) = reader.slices_mut(); assert_eq!(a.len(), 3); assert_eq!(b.len(), 0); - - assert_eq!(a[0], 0); - assert_eq!(a[1], 1); - assert_eq!(a[2], 2); + + unsafe + { + assert_eq!(a[0].take(), 0); + assert_eq!(a[1].take(), 1); + assert_eq!(a[2].take(), 2); + } rx.consume(1); } { - let (a, b) = tx.write(); + let writer = tx.write(); + let (a, b) = writer.slices(); assert_eq!(a.len(), 1); - assert_eq!(b.len(), 1); + assert_eq!(b.len(), 1); } { - let (a, b) = rx.read(); + let reader = rx.read_takable(); + let (a, b) = reader.slices(); assert_eq!(a.len(), 2); - assert_eq!(b.len(), 0); + assert_eq!(b.len(), 0); } } } diff --git a/oxydsp-flowgraph/src/tag.rs b/oxydsp-flowgraph/src/tag.rs index 2793077..409b44a 100644 --- a/oxydsp-flowgraph/src/tag.rs +++ b/oxydsp-flowgraph/src/tag.rs @@ -178,7 +178,7 @@ impl Tag } /// Creates a new tag, which is the combination of the given tags - pub fn from_tags(tag_opts: [&Tag; N]) -> Tag + pub fn from_tags(tag_opts: &[&Tag; N]) -> Tag { let new_tag = Self::default(); { @@ -197,7 +197,7 @@ impl Tag /// /// If all the tag options are None, None is returned /// Otherwise it is Some of the combination of all of the tags which are Some - pub fn from_tag_opts(tag_opts: [&Option; N]) -> Option + pub fn from_tag_opts(tag_opts: &[&Option; N]) -> Option { if tag_opts.iter().all(|t| t.is_none()) { @@ -256,7 +256,7 @@ impl TagMergable for Tag { fn merge(&self, other: &Self) -> Self { - Self::from_tags([self, other]) + Self::from_tags(&[self, other]) } } @@ -264,7 +264,7 @@ impl TagMergable> for Option { fn merge(&self, other: &Self) -> Self { - Tag::from_tag_opts([self, other]) + Tag::from_tag_opts(&[self, other]) } }