diff --git a/Cargo.lock b/Cargo.lock index 8504b79..29f7034 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -811,6 +811,25 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -2489,12 +2508,14 @@ dependencies = [ "num", "oxydsp-flowgraph", "rustfft", + "wide", ] [[package]] name = "oxydsp-flowgraph" version = "0.1.0" dependencies = [ + "crossbeam-deque", "oxydsp-flowgraph-macros", ] @@ -2701,6 +2722,22 @@ version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5a041e753da8b807c9255f28de81879c78c876392ff2469cde94799b2896b9d" +[[package]] +name = "qpsk-modem" +version = "0.1.0" +dependencies = [ + "cpal", + "eframe", + "egui", + "egui_plot", + "hound", + "num", + "oxydsp-dsp", + "oxydsp-flowgraph", + "rand", + "rand_distr", +] + [[package]] name = "quick-error" version = "2.0.1" @@ -2764,6 +2801,16 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c8d0fd677905edcbeedbf2edb6494d676f0e98d54d5cf9bda0b061cb8fb8aba" +[[package]] +name = "rand_distr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d431c2703ccf129de4d45253c03f49ebb22b97d6ad79ee3ecfc7e3f4862c1d8" +dependencies = [ + "num-traits", + "rand", +] + [[package]] name = "range-alloc" version = "0.1.5" @@ -2876,6 +2923,15 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "safe_arch" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f7caad094bd561859bcd467734a720c3c1f5d1f338995351fefe2190c45efed" +dependencies = [ + "bytemuck", +] + [[package]] name = "same-file" version = "1.0.6" @@ -3873,6 +3929,16 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wide" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "198f6abc41fab83526d10880fa5c17e2b4ee44e763949b4bb34e2fd1e8ca48e4" +dependencies = [ + "bytemuck", + "safe_arch", +] + [[package]] name = "winapi-util" version = "0.1.11" diff --git a/Cargo.toml b/Cargo.toml index 59edb5c..d14ddbc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,3 +5,8 @@ members = [ "oxydsp-dsp", "oxydsp-flowgraph" ] + +[profile.release-with-debug] +inherits = "release" +debug = true + diff --git a/examples/bfsk-modem/src/transmitter.rs b/examples/bfsk-modem/src/transmitter.rs index 3afaf5d..d91068a 100644 --- a/examples/bfsk-modem/src/transmitter.rs +++ b/examples/bfsk-modem/src/transmitter.rs @@ -6,6 +6,7 @@ use oxydsp_dsp::blocks::filtering::fir::FirFilter; use oxydsp_dsp::blocks::math::basic::Multiplier; use oxydsp_dsp::blocks::synthesis::Nco; use oxydsp_dsp::blocks::synthesis::OscillatorSource; +use oxydsp_dsp::blocks::utilities::adapters::FlatMap; use oxydsp_dsp::blocks::utilities::adapters::Map; use oxydsp_dsp::blocks::utilities::adapters::Repeat; use oxydsp_dsp::blocks::utilities::adapters::Scan; diff --git a/examples/dpsk-modem/src/main.rs b/examples/dpsk-modem/src/main.rs index 2d13d2b..2fb84a4 100644 --- a/examples/dpsk-modem/src/main.rs +++ b/examples/dpsk-modem/src/main.rs @@ -33,6 +33,7 @@ use oxydsp_dsp::filtering::fir::Fir; use oxydsp_dsp::units::DigitalFrequency; use oxydsp_flowgraph::block::BlockResult; use oxydsp_flowgraph::flowgraph; +use oxydsp_flowgraph::io::AnonymousIn; use oxydsp_flowgraph::tag::Tags; use rand::random; diff --git a/examples/qpsk-modem/Cargo.toml b/examples/qpsk-modem/Cargo.toml new file mode 100644 index 0000000..febdcd9 --- /dev/null +++ b/examples/qpsk-modem/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "qpsk-modem" +version = "0.1.0" +edition = "2024" + +[dependencies] +oxydsp-flowgraph = {path = "../../oxydsp-flowgraph/"} +oxydsp-dsp = {path = "../../oxydsp-dsp/"} +egui = "0.33.3" +egui_plot = "0.34.1" +eframe = { version = "0.33.3", features = ["default_fonts", "wayland"] } +num = "0.4.3" +hound = "3.5.1" +rand = "0.10.0" +cpal = "0.17.3" +rand_distr = "0.6.0" diff --git a/examples/qpsk-modem/src/main.rs b/examples/qpsk-modem/src/main.rs new file mode 100644 index 0000000..86521f0 --- /dev/null +++ b/examples/qpsk-modem/src/main.rs @@ -0,0 +1,85 @@ +use std::time::Instant; + +use num::Complex; +use oxydsp_dsp::blocks::filtering::fir::FirFilter; +use oxydsp_dsp::blocks::filtering::pulse_shaping::PulseShaper; +use oxydsp_dsp::blocks::iq::zero_if::ZeroIf; +use oxydsp_dsp::blocks::math::basic::Multiplier; +use oxydsp_dsp::blocks::synthesis::OscillatorSource; +use oxydsp_dsp::blocks::utilities::adapters::{Map, NullSink, Scan}; +use oxydsp_dsp::blocks::utilities::iter::IterSource; +use oxydsp_dsp::filtering::fir::Fir; +use oxydsp_dsp::units::DigitalFrequency; +use oxydsp_flowgraph::flowgraph; +use rand::{RngExt, SeedableRng, random}; + +const CARRRIER_FREQ: f64 = 1000.; +const SAMPLE_RATE: usize = 48_000; + +fn main() +{ + let bits = (0..1024).map(|_| [random::(), random::()]); + + let (iter_source, bits) = IterSource::new(bits.cycle()); + let (iq_map, iq) = Map::new(bits, |x| match x + { + [true, true] => Complex::new(1., 1.), + [true, false] => Complex::new(1., -1.), + [false, true] => Complex::new(-1., 1.), + [false, false] => Complex::new(-1., -1.), + }); + let (pulse_shaper, iq) = PulseShaper::new(iq, Fir::square(200), 200); + + let (lo, carrier) = OscillatorSource::new(DigitalFrequency::from_time_frequency(CARRRIER_FREQ, SAMPLE_RATE as f64).into()); + let (mixer, passband) = Multiplier::new(iq, carrier); + + let (channel, passband) = Scan::new(passband, rand::rngs::SmallRng::seed_from_u64(0), |state, x| + { + x.re + state.sample::(rand_distr::StandardNormal) + }); + + let (zero_if, iq) = ZeroIf::new(passband, DigitalFrequency::from_time_frequency(CARRRIER_FREQ, SAMPLE_RATE as f64).into()); + let (matched_filter, iq) = FirFilter::new(iq, Fir::::square(200)); + let (inspect, iq) = Scan::new(iq, (Instant::now(), 0), |(last, counter), x| + { + *counter += 1; + if *counter >= 1_000_000 + { + let time = Instant::now() - *last; + println!("{:.2} Ms/s", 1. / time.as_secs_f32()); + *last = Instant::now(); + *counter = 0; + } + x + }); + let null_sink = NullSink::new(iq); + + let graph = flowgraph![iter_source, iq_map, pulse_shaper, lo, mixer, channel, zero_if, matched_filter, inspect, null_sink]; + graph.run(6).join(); +} + +pub fn to_bits(n: u8) -> [bool; 8] +{ + [ + (n & 1) == 1, + (n >> 1) & 1 == 1, + (n >> 2) & 1 == 1, + (n >> 3) & 1 == 1, + (n >> 4) & 1 == 1, + (n >> 5) & 1 == 1, + (n >> 6) & 1 == 1, + (n >> 7) & 1 == 1, + ] +} + +pub fn from_bits(n: [bool; 8]) -> u8 +{ + (n[0] as u8) + | ((n[1] as u8) << 1) + | ((n[2] as u8) << 2) + | ((n[3] as u8) << 3) + | ((n[4] as u8) << 4) + | ((n[5] as u8) << 5) + | ((n[6] as u8) << 6) + | ((n[7] as u8) << 7) +} diff --git a/oxydsp-dsp/Cargo.toml b/oxydsp-dsp/Cargo.toml index 323aeb8..56d2e98 100644 --- a/oxydsp-dsp/Cargo.toml +++ b/oxydsp-dsp/Cargo.toml @@ -7,3 +7,4 @@ edition = "2024" num = "0.4.3" oxydsp-flowgraph = {path = "../oxydsp-flowgraph/"} rustfft = "6.4.1" +wide = "1.2.0" diff --git a/oxydsp-dsp/src/blocks/filtering.rs b/oxydsp-dsp/src/blocks/filtering.rs index f4a0fff..d953df4 100644 --- a/oxydsp-dsp/src/blocks/filtering.rs +++ b/oxydsp-dsp/src/blocks/filtering.rs @@ -1 +1,2 @@ pub mod fir; +pub mod pulse_shaping; diff --git a/oxydsp-dsp/src/blocks/filtering/fir.rs b/oxydsp-dsp/src/blocks/filtering/fir.rs index 1b280a8..a6283a6 100644 --- a/oxydsp-dsp/src/blocks/filtering/fir.rs +++ b/oxydsp-dsp/src/blocks/filtering/fir.rs @@ -1,20 +1,24 @@ +use num::Zero; use oxydsp_flowgraph::BlockIO; +use oxydsp_flowgraph::block::Block; +use oxydsp_flowgraph::block::BlockResult; use oxydsp_flowgraph::block::SyncBlock; use oxydsp_flowgraph::io::In; use oxydsp_flowgraph::io::Out; +use oxydsp_flowgraph::io::PopIterable; use oxydsp_flowgraph::sync_block; use std::iter::Sum; +use std::ops::Add; use std::ops::Mul; use crate::filtering::fir::Fir; #[derive(BlockIO)] -#[sync_block] pub struct FirFilter where - T: Clone + 'static, + T: Clone + Zero + 'static, F: Mul + Clone + 'static, - O: Sum + 'static, + O: Add + Sum + Clone + Zero + 'static, { #[input] input: In, @@ -27,9 +31,9 @@ where impl FirFilter where - T: Clone + 'static, + T: Clone + Zero + 'static, F: Mul + Clone + 'static, - O: Sum + 'static, + O: Add + Sum + Clone + Zero, { pub fn new(input: In, impulse_response: Fir) -> (Self, In) { @@ -45,14 +49,15 @@ where } } -impl<'view, F, T, O> SyncBlock<'view> for FirFilter +impl Block for FirFilter where - T: Clone + 'view, + T: Clone + Zero, F: Mul + Clone + 'static, - O: Sum + 'static, + O: Add + Sum + Clone + Zero, { - fn sync_work(state: Self::StateView, input: Self::Input) -> Option + fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - Some(state.filter.next(input)) + self.output.push_iter(self.input.pop_iter().map(|x| (self.filter.next(x.0), x.1).into())); + BlockResult::Ok } } diff --git a/oxydsp-dsp/src/blocks/filtering/pulse_shaping.rs b/oxydsp-dsp/src/blocks/filtering/pulse_shaping.rs new file mode 100644 index 0000000..7cc503f --- /dev/null +++ b/oxydsp-dsp/src/blocks/filtering/pulse_shaping.rs @@ -0,0 +1,77 @@ +use std::iter::Sum; +use std::ops::Add; + +use num::Zero; +use oxydsp_flowgraph::BlockIO; +use oxydsp_flowgraph::block::Block; +use oxydsp_flowgraph::io::In; +use oxydsp_flowgraph::io::Out; + +use crate::filtering::fir::Fir; +use crate::filtering::fir::FirFilter; + +#[derive(BlockIO)] +pub struct PulseShaper + std::iter::Sum + Add + Sum + Clone + Zero> +{ + #[input] + input: In, + + #[output] + output: Out, + + symbol_length: usize, + remaining: usize, + pulse_shaper: FirFilter, +} + +impl + std::iter::Sum + std::clone::Clone + Zero> PulseShaper +{ + pub fn new(input: In, pulse_shape: Fir, symbol_length: usize) -> (Self, In) + { + let (output, pulse_shaped) = oxydsp_flowgraph::io::stream(); + ( + Self { + input, + output, + symbol_length, + remaining: 0, + pulse_shaper: FirFilter::new(pulse_shape), + }, + pulse_shaped, + ) + } +} + +impl + std::iter::Sum + std::clone::Clone + Zero> Block + for PulseShaper +{ + fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult + { + let reader = self.input.read(); + let writer = self.output.write(); + + for _ in 0..writer.len() + { + if self.remaining == 0 + { + if let Some(input) = reader.pop() + { + let (data, tag) = input.into(); + let _ = writer.push((self.pulse_shaper.next(data), tag).into()); + self.remaining = self.symbol_length - 1; + } + else + { + return oxydsp_flowgraph::block::BlockResult::Ok; + } + } + else + { + let _ = writer.push(self.pulse_shaper.next(T::zero()).into()); + self.remaining -= 1; + } + } + + oxydsp_flowgraph::block::BlockResult::Ok + } +} diff --git a/oxydsp-dsp/src/blocks/iq/zero_if.rs b/oxydsp-dsp/src/blocks/iq/zero_if.rs index e9f37b8..8a51eb0 100644 --- a/oxydsp-dsp/src/blocks/iq/zero_if.rs +++ b/oxydsp-dsp/src/blocks/iq/zero_if.rs @@ -1,19 +1,19 @@ -use num::{Complex, Float}; -use oxydsp_flowgraph::{ - BlockIO, - block::SyncBlock, - io::{In, Out}, - sync_block, -}; +use num::Complex; +use num::Float; +use oxydsp_flowgraph::BlockIO; +use oxydsp_flowgraph::block::Block; +use oxydsp_flowgraph::block::SyncBlock; +use oxydsp_flowgraph::io::In; +use oxydsp_flowgraph::io::Out; +use oxydsp_flowgraph::io::PopIterable; +use oxydsp_flowgraph::sync_block; use rustfft::FftNum; -use crate::{ - filtering::fir::{Fir, FirFilter}, - synthesis::oscillator::Nco, -}; +use crate::filtering::fir::Fir; +use crate::filtering::fir::FirFilter; +use crate::synthesis::oscillator::Nco; #[derive(BlockIO)] -#[sync_block] pub struct ZeroIf + 'static> { #[input] @@ -28,7 +28,13 @@ pub struct ZeroIf + 'static> impl ZeroIf where - T: std::clone::Clone + num::Num + FftNum + From + 'static + num::Float, + T: std::clone::Clone + + num::Num + + FftNum + + From + + 'static + + num::Float + + num::traits::FloatConst, { pub fn new(input: In, lo: Nco) -> (Self, In>) { @@ -38,7 +44,7 @@ where input, output, local_oscillator: lo, - filter: FirFilter::new(Fir::lowpass(lo.frequency(), 100)), + filter: FirFilter::new(Fir::lowpass(lo.frequency(), 100).normalized_len()), }, port, ) @@ -50,16 +56,20 @@ where } } -impl<'view, T> SyncBlock<'view> for ZeroIf +impl Block for ZeroIf where T: std::clone::Clone + num::Num + Float + From + 'static + num::Float, { - fn sync_work(state: Self::StateView, input: Self::Input) -> Option + fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - // Mix - let lo_sample = state.local_oscillator.next().unwrap(); - let iq = Complex::new(input * lo_sample.re, input * lo_sample.im); + self.output.push_iter(self.input.pop_iter().map(|input| { + let (data, tag) = input.into(); + // Mix + let lo_sample = self.local_oscillator.next().unwrap(); + let iq = Complex::new(data * lo_sample.re, data * lo_sample.im); - Some(state.filter.next(iq)) + (self.filter.next(iq), tag).into() + })); + oxydsp_flowgraph::block::BlockResult::Ok } } diff --git a/oxydsp-dsp/src/blocks/utilities/adapters.rs b/oxydsp-dsp/src/blocks/utilities/adapters.rs index 14277b5..cc8995e 100644 --- a/oxydsp-dsp/src/blocks/utilities/adapters.rs +++ b/oxydsp-dsp/src/blocks/utilities/adapters.rs @@ -156,7 +156,6 @@ where } #[derive(BlockIO)] -#[sync_block] pub struct Scan where F: Fn(&mut S, I) -> O, @@ -191,16 +190,16 @@ where } } -impl<'view, I, O, S, F> SyncBlock<'view> for Scan +impl Block for Scan where I: 'static, O: 'static, - S: 'view, - F: Fn(&mut S, I) -> O + 'view, + F: Fn(&mut S, I) -> O, { - fn sync_work(state: Self::StateView, input: Self::Input) -> Option - { - Some((*state.map)(state.state, input)) + fn work(&mut self) -> BlockResult { + self.output.push_iter(self.input.pop_iter() + .map(|x| ((self.map)(&mut self.state, x.0), x.1).into())); + BlockResult::Ok } } diff --git a/oxydsp-dsp/src/blocks/utilities/squelch.rs b/oxydsp-dsp/src/blocks/utilities/squelch.rs index 10edb79..21f18cd 100644 --- a/oxydsp-dsp/src/blocks/utilities/squelch.rs +++ b/oxydsp-dsp/src/blocks/utilities/squelch.rs @@ -1,13 +1,17 @@ -use std::{collections::VecDeque, iter::Sum}; +use std::collections::VecDeque; +use std::iter::Sum; -use num::{Float, FromPrimitive, One, Zero, complex::ComplexFloat}; -use oxydsp_flowgraph::{ - BlockIO, - block::{Block, BlockResult}, - io::{In, Out, PopIterable}, -}; - -use crate::filtering::fir::{Fir, FirFilter}; +use num::Float; +use num::FromPrimitive; +use num::One; +use num::Zero; +use num::complex::ComplexFloat; +use oxydsp_flowgraph::BlockIO; +use oxydsp_flowgraph::block::Block; +use oxydsp_flowgraph::block::BlockResult; +use oxydsp_flowgraph::io::In; +use oxydsp_flowgraph::io::Out; +use oxydsp_flowgraph::io::PopIterable; #[derive(BlockIO)] pub struct Squelch diff --git a/oxydsp-dsp/src/filtering.rs b/oxydsp-dsp/src/filtering.rs index f4a0fff..c01c158 100644 --- a/oxydsp-dsp/src/filtering.rs +++ b/oxydsp-dsp/src/filtering.rs @@ -1 +1,2 @@ pub mod fir; +pub mod history_buf; diff --git a/oxydsp-dsp/src/filtering/fir.rs b/oxydsp-dsp/src/filtering/fir.rs index 4fa51da..c115453 100644 --- a/oxydsp-dsp/src/filtering/fir.rs +++ b/oxydsp-dsp/src/filtering/fir.rs @@ -1,29 +1,52 @@ -use std::collections::VecDeque; -use std::f64::consts::PI; -use std::iter::Sum; -use std::ops::Div; -use std::ops::Mul; -use std::process::Output; - use num::Complex; use num::Float; +use num::FromPrimitive; use num::One; use num::Zero; use num::complex::ComplexFloat; -use num::zero; use rustfft::FftNum; use rustfft::FftPlanner; +use std::array; +use std::collections::VecDeque; +use std::f64::consts::PI; +use std::iter::Sum; +use std::ops::Add; +use std::ops::Div; +use std::ops::Mul; +use crate::filtering::history_buf::HistoryBuf; use crate::map; use crate::units::DigitalFrequency; -/// Finite impulse response + +/// Represents a finite impulse response as a vector +/// of values in time. +/// +/// Convention +/// indices : 0 ----------------- fir.0.len - 1 +/// time : ----------------> +/// +/// For a reverb ir for example the clap would be at index 0 +/// and the reverb tail towards the end of the vector. pub struct Fir(pub Vec); impl Fir> where T: FftNum + Float + Clone, { + /// Synthesizes an impulse response from a transfer function using an inverse fourrier + /// transform. + /// + /// The input units are thus : + /// Transfer function : + /// start center end + /// [ ] + /// frequency : + /// 0 pi 2*pi + /// = pi, -pi 0 + /// nyquist's frequency + /// (As the frequencies are periodic) + /// pub fn from_transfer_function(tf: impl AsRef<[Complex]>) -> Fir> { let mut planner = FftPlanner::new(); @@ -43,6 +66,12 @@ where Fir(shifted_fir) } + /// Creates a low pass filter with the following ideal transfer function using the ifft method: + /// + /// ________________ ________________ + /// |____________________| + /// 0 cuttoff -cuttoff 0 + /// pub fn lowpass(cutoff: DigitalFrequency, length: usize) -> Fir> { let mut tf = vec![Complex::::zero(); length]; @@ -63,6 +92,8 @@ where T: ComplexFloat + Div + Copy + Sum, T::Real: Float, { + /// Returns the same impulse response + /// normalized by the length of the sum of the vectors. pub fn normalized(mut self) -> Self { let sum: T = self.0.iter().copied().sum(); @@ -73,43 +104,173 @@ where } } +impl Fir +where + T: ComplexFloat + Div, + T::Real: Float + FromPrimitive, +{ + /// Returns the same impulse response + /// normalized by the length of the impulse response. + pub fn normalized_len(mut self) -> Self + { + let len = T::Real::from_usize(self.0.len()).unwrap(); + self.0.iter_mut().for_each(|x| *x = *x / len); + self + } +} + +impl Fir +where + T: ComplexFloat + Div + Copy + Add, + T::Real: Float, +{ + /// Returns the same impulse response + /// normalized by the energy or the sum of the squares of the magnitues + /// of the impulse response + pub fn normalized_sqr(mut self) -> Self + { + let sum = self + .0 + .iter() + .copied() + .map(|x| x.abs() * x.abs()) + .reduce(|x, y| x + y) + .unwrap(); + + self.0.iter_mut().for_each(|x| *x = *x / sum); + self + } +} + +impl Fir +where + T: ComplexFloat + FromPrimitive, +{ + /// Creates a square unit impulse response : + /// a vector of length `length` filled with ones + pub fn square(length: usize) -> Self + { + Self((0..length).map(|_| T::one()).collect()) + } + + /// Creates a simple proportional integral (PI) loop impulse response : + /// + /// FIR: + /// ```text + /// _ ................................... Kp (`proportional_gain`) + /// | + /// | + /// | + ///  |____________________________ ...... Ki (`integral_gain`) + ///    | + /// + /// 0 ------------------------- `length` + /// ``` + /// + pub fn proportional_integral(length: usize, proportional_gain: T, integral_gain: T) -> Self + { + Self( + (0..length) + .map(|i| { + if i == 0 + { + proportional_gain + integral_gain + } + else + { + integral_gain + } + }) + .collect(), + ) + } + + /// Creates a root raised cosine (RRC) FIR of length `length` + /// with the given roll off factor. + /// + /// The corresponding RC (convolution of this filter with itself) + /// has its zero crossing every `symbol_length` samples (except at 0). + pub fn root_raised_cosine(length: usize, roll_off: f64, symbol_length: usize) -> Self + { + Self( + (0..length) + .map(|i| { + let t = map( + i as f64, + 0., + length as f64, + -(length as f64) * 0.5, + length as f64 * 0.5, + ) / symbol_length as f64; + T::from_f64(root_raised_cosine(t, roll_off, 1.)).unwrap() + }) + .collect(), + ) + } +} + +/// A simple convolutional finite impulse response filter pub struct FirFilter where F: Mul, - O: Sum, + O: Add + Sum + Clone + Zero, { fir: Vec, - taps: VecDeque, + //taps: VecDeque, + taps: HistoryBuf } impl FirFilter where - T: Clone, + T: Clone + Zero, F: Mul + Clone, - O: Sum, + O: Add + Sum + Clone + Zero, { + /// Creates a filter with the given impulse response pub fn new(impulse_response: Fir) -> Self { let len = impulse_response.0.len(); Self { fir: impulse_response.0, - taps: VecDeque::with_capacity(len), + taps: HistoryBuf::new(T::zero(), len), } } + /// Gets the next output given an input sample. + /// + /// At the beginning, the delay line starts with zeroes. pub fn next(&mut self, input: T) -> O { - if self.taps.len() == self.fir.len() - { - self.taps.pop_front(); - } - self.taps.push_back(input); + self.taps.push(input); + let taps = self.taps.as_slice(); + Self::dot_prod(&self.fir, taps) + } - self.fir - .iter() - .zip(self.taps.iter()) - .map(|(a, b)| a.clone() * b.clone()) - .sum() + pub fn dot_prod(a: &[F], b: &[T]) -> O + { + assert_eq!(a.len(), b.len()); + + let mut sum: [_; 4] = [O::zero(), O::zero(), O::zero(), O::zero()]; + + let (a_chunks, a_remainder) = a.as_chunks::<4>(); + let (b_chunks, b_remainder) = b.as_chunks::<4>(); + + for (x, y) in a_chunks.iter().zip(b_chunks.iter()) + + { + sum[0] = sum[0].clone() + x[0].clone() * y[0].clone(); + sum[1] = sum[1].clone() + x[1].clone() * y[1].clone(); + sum[2] = sum[2].clone() + x[2].clone() * y[2].clone(); + sum[3] = sum[3].clone() + x[3].clone() * y[3].clone(); + } + + let mut sum = sum[0].clone() + sum[1].clone() + sum[2].clone() + sum[3].clone(); + for (x, y) in a_remainder.iter().zip(b_remainder.iter()) + { + sum = sum + x.clone() * y.clone(); + } + + sum } } @@ -118,3 +279,31 @@ pub fn estimate_fir_length(transition_width: f64, sample_rate: f64) -> f64 { 3.8 * sample_rate / transition_width } + +/// Root raised cosine function +pub fn root_raised_cosine(t: f64, beta: f64, symbol_time: f64) -> f64 +{ + let eps = 1e-8; + + if t.abs() < eps + { + // t = 0 special case + return (1.0 / symbol_time.sqrt()) * (1.0 + beta * (4.0 / PI - 1.0)); + } + + if beta > 0.0 && (t.abs() - symbol_time / (4.0 * beta)).abs() < eps + { + // t = ±T / (4β) special case + let term1 = (1.0 + 2.0 / PI) * (PI / (4.0 * beta)).sin(); + let term2 = (1.0 - 2.0 / PI) * (PI / (4.0 * beta)).cos(); + return (beta / (symbol_time.sqrt() * 2.0_f64.sqrt())) * (term1 + term2); + } + + // General case + let numerator = (PI * t * (1.0 - beta) / symbol_time).sin() + + 4.0 * beta * t / symbol_time * (PI * t * (1.0 + beta) / symbol_time).cos(); + + let denominator = PI * t * (1.0 - (4.0 * beta * t / symbol_time).powi(2)) / symbol_time; + + (1.0 / symbol_time.sqrt()) * (numerator / denominator) +} diff --git a/oxydsp-dsp/src/filtering/history_buf.rs b/oxydsp-dsp/src/filtering/history_buf.rs new file mode 100644 index 0000000..abfd450 --- /dev/null +++ b/oxydsp-dsp/src/filtering/history_buf.rs @@ -0,0 +1,48 @@ +/// A queue that always contains the same amount of elements. +/// +/// It intuitively record the history of `length` values of a value +/// This implementations allows to get a single contiguous slice view on the history +pub struct HistoryBuf +{ + buffer: Box<[T]>, + start: usize, + length: usize, +} + +impl HistoryBuf +{ + pub fn new(default: T, length: usize) -> Self + { + Self + { + buffer: vec![default; 2 * length].into_boxed_slice(), + start: 0, + length + } + } + + pub fn from_fn(length: usize, mut f: impl FnMut(usize) -> T) -> Self + { + Self + { + buffer: (0..(2 * length)).map(|i| f(i)).collect(), + start: 0, + length + } + } + + pub fn push(&mut self, data: T) + { + self.buffer[self.start] = data.clone(); + self.buffer[self.start + self.length] = data.clone(); + + self.start += 1; + self.start %= self.length; + } + + pub fn as_slice(&self) -> &[T] + { + &self.buffer[self.start..(self.start + self.length)] + } +} + diff --git a/oxydsp-dsp/src/lib.rs b/oxydsp-dsp/src/lib.rs index cc324e5..5be9e5b 100644 --- a/oxydsp-dsp/src/lib.rs +++ b/oxydsp-dsp/src/lib.rs @@ -5,6 +5,8 @@ pub mod filtering; pub mod synthesis; pub mod units; +/// Maps a float from a range onto another +/// linearly fn map(x: T, x_min: T, x_max: T, o_min: T, o_max: T) -> T { ((x - x_min) / (x_max - x_min)) * (o_max - o_min) + o_min diff --git a/oxydsp-dsp/src/synthesis/oscillator.rs b/oxydsp-dsp/src/synthesis/oscillator.rs index be94261..9f0768b 100644 --- a/oxydsp-dsp/src/synthesis/oscillator.rs +++ b/oxydsp-dsp/src/synthesis/oscillator.rs @@ -8,6 +8,21 @@ use crate::map; use crate::units::DigitalFrequency; use crate::units::Phase; +/// Numericaly controlled oscillator +/// +/// ``` +/// let nco: Nco = DigitalFrequency::from_rad(2 * f32::PI).into(); +/// // Or +/// let nco: Nco = Nco::from(DigitalFrequency::from_rad(2 * f32::PI)); +/// +/// // Rotates by 90deg per sample +/// // The next function is from the iterator implementation +/// assert_eq!(nco.next(), Complex::new(1., 0.)); +/// assert_eq!(nco.next(), Complex::new(0., 1.)); +/// assert_eq!(nco.next(), Complex::new(-1., 0.)); +/// assert_eq!(nco.next(), Complex::new(0., -1.)); +/// assert_eq!(nco.next(), Complex::new(1., 0.)); +/// ``` #[derive(Clone, Copy)] pub struct Nco { @@ -21,6 +36,7 @@ pub struct Nco impl Nco { + /// Creates a new Nco with a specific frequency starting at phase 0 pub fn new(frequency: DigitalFrequency) -> Self { Self { @@ -30,11 +46,13 @@ impl Nco } } + /// Gets the current frequency of the oscillator pub fn frequency(&self) -> DigitalFrequency { DigitalFrequency(self.d_phase) } + /// Creates a new Nco with a specific frequency and starting phase pub fn with_phase(frequency: DigitalFrequency, phase: Phase) -> Self { Self { @@ -44,16 +62,19 @@ impl Nco } } + /// Sets the current phase. pub fn set_phase(&mut self, phase: Phase) { self.phase = phase.0.0; } + /// Sets the current phase pub fn set_frequency(&mut self, frequency: DigitalFrequency) { self.d_phase = frequency.0; } + /// Steps the oscillator by one sample pub fn step(&mut self) { let (new, _) = self.phase.overflowing_add(self.d_phase); @@ -63,6 +84,8 @@ impl Nco impl> Nco { + /// Gets the current value of the oscillator as a + /// complex number pub fn sample(&self) -> Complex { let t = map( diff --git a/oxydsp-dsp/src/units.rs b/oxydsp-dsp/src/units.rs index 895c081..d83a55e 100644 --- a/oxydsp-dsp/src/units.rs +++ b/oxydsp-dsp/src/units.rs @@ -3,15 +3,21 @@ use std::ops::Neg; use crate::map; -// Represents digital frequency +/// Represents a digital, sampled frequency +/// as radians per samples in [0; 2*pi[ range +/// mapped to the whole [0; usize::MAX] range #[derive(Clone, Copy, PartialEq, PartialOrd)] pub struct DigitalFrequency(pub usize); +/// Represents an absolute phase offset +/// as radians in [0; 2*pi[ range +/// mapped to the whole [0; usize::MAX] range #[derive(Clone, Copy, PartialEq, PartialOrd)] pub struct Phase(pub DigitalFrequency); impl DigitalFrequency { + /// Creates a DigitalFrequency from rads per samples pub fn from_rad(radians_per_sample: f64) -> Self { // Frequnecy wraps arround : Going at 2 pi radians per second @@ -22,16 +28,21 @@ impl DigitalFrequency DigitalFrequency(map(f, 0., 2. * PI, 0., usize::MAX as f64).floor() as usize) } + /// Creates a DigitalFrequency from a frequency given in hertz (s^(-1)) + /// in the context of a sample rate also given in hertz pub fn from_time_frequency(hertz: f64, sample_rate: f64) -> Self { Self::from_rad(map(hertz, 0., sample_rate, 0., 2. * PI)) } + /// Gets the frequency as radians per sample pub fn as_rad(&self) -> f64 { map(self.0 as f64, 0., usize::MAX as f64, 0., 2. * PI) } + /// Gets the frequency as hertz in the context of a sample rate + /// also given in hert pub fn as_time_frequency(&self, sample_rate: f64) -> f64 { map(self.0 as f64, 0., usize::MAX as f64, 0., sample_rate) @@ -42,6 +53,7 @@ impl Neg for DigitalFrequency { type Output = Self; + /// Returns the "negative frequency" fn neg(self) -> Self::Output { DigitalFrequency(usize::MAX - self.0) diff --git a/oxydsp-flowgraph/Cargo.toml b/oxydsp-flowgraph/Cargo.toml index ffe0d5d..2ea4601 100644 --- a/oxydsp-flowgraph/Cargo.toml +++ b/oxydsp-flowgraph/Cargo.toml @@ -4,4 +4,5 @@ version = "0.1.0" edition = "2024" [dependencies] +crossbeam-deque = "0.8.6" oxydsp-flowgraph-macros = { path = "./oxydsp-flowgraph-macros" } diff --git a/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs index bd2f2b4..7e0ec52 100644 --- a/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs +++ b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs @@ -1,19 +1,11 @@ use proc_macro::TokenStream; -use zyn::FromInput; use zyn::ToTokens; use zyn::ext::AttrExt; use zyn::ext::FieldsExt; -use zyn::ext::ItemExt; use zyn::format_ident; -use zyn::ident; use zyn::parse_input; -use zyn::syn::Attribute; -use zyn::syn::GenericParam; use zyn::syn::Index; use zyn::syn::Lit; -use zyn::syn::TypeGenerics; -use zyn::syn::parse_quote; -use zyn::syn::punctuated::Punctuated; use zyn::syn::spanned::Spanned; mod sync; @@ -52,53 +44,67 @@ pub fn block_io( impl {{impl_generics}} oxydsp_flowgraph::block::BlockIO for {{ ident.clone() }} {{ type_generics }} {{ where_clause }} { - @block_io_set_index(fields = fields.clone()) - @block_io_get_successors(fields = fields.clone()) - @block_io_counts(fields = fields.clone()) - @block_io_set_streams(fields = fields.clone()) - @block_io_create_stream(fields = fields.clone()) + @block_io_get_inputs(fields = fields.clone()) + @block_io_get_outputs(fields = fields.clone()) @block_io_get_meta(ident = ident.clone(), fields = fields.clone()) } ) } #[zyn::element] -fn block_io_set_index(fields: zyn::syn::Fields) -> zyn::TokenStream +fn block_io_get_inputs(fields: zyn::syn::Fields) -> zyn::TokenStream { let fields = fields.as_named().unwrap().named.clone(); zyn::zyn!( - fn set_index(&self, block_index: usize) + fn get_inputs_mut(&mut self) -> Vec<&mut dyn oxydsp_flowgraph::io::AnonymousIn> { - use oxydsp_flowgraph::edge::BlockIOIndex; + let mut acc = vec![]; + use oxydsp_flowgraph::block::BlockInput; @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate()) { - self.{{field.1.ident}}.set_block_index(BlockIOIndex {block_index, port_index: {{ field.0 }} }); + acc.extend(self.{{field.1.ident}}.get_inputs_mut()); } + acc + } - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) + fn get_inputs(&self) -> Vec<&dyn oxydsp_flowgraph::io::AnonymousIn> + { + let mut acc = vec![]; + use oxydsp_flowgraph::block::BlockInput; + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate()) { - self.{{field.1.ident}}.set_block_index(BlockIOIndex {block_index, port_index: {{ field.0 }} }); + acc.extend(self.{{field.1.ident}}.get_inputs()); } + acc } ) } #[zyn::element] -fn block_io_get_successors(fields: zyn::syn::Fields) -> zyn::TokenStream +fn block_io_get_outputs(fields: zyn::syn::Fields) -> zyn::TokenStream { let fields = fields.as_named().unwrap().named.clone(); zyn::zyn!( - fn get_successors(&self) -> Vec + fn get_outputs_mut(&mut self) -> Vec<&mut dyn oxydsp_flowgraph::io::AnonymousOut> { - let mut output = vec![]; + let mut acc = vec![]; + use oxydsp_flowgraph::block::BlockOutput; @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) { - if let Some(block_index) = self.{{ field.1.ident }}.get_consumer_block() - { - output.push(block_index); - } + acc.extend(self.{{field.1.ident}}.get_outputs_mut()); } - output + acc + } + + fn get_outputs(&self) -> Vec<&dyn oxydsp_flowgraph::io::AnonymousOut> + { + let mut acc = vec![]; + use oxydsp_flowgraph::block::BlockOutput; + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) + { + acc.extend(self.{{field.1.ident}}.get_outputs()); + } + acc } ) } @@ -144,94 +150,6 @@ fn block_io_get_meta(ident: zyn::syn::Ident, fields: zyn::syn::Fields) -> zyn::T ) } -#[zyn::element] -fn block_io_counts(fields: zyn::syn::Fields) -> zyn::TokenStream -{ - let fields = fields.as_named().unwrap().named.clone(); - let input_count = fields - .iter() - .filter(|x| x.attrs.iter().any(|x| x.is("input"))) - .count(); - let output_count = fields - .iter() - .filter(|x| x.attrs.iter().any(|x| x.is("output"))) - .count(); - zyn::zyn!( - fn input_count(&self) -> usize - { - return { { input_count } }; - } - - fn output_count(&self) -> usize - { - return { { output_count } }; - } - ) -} - -#[zyn::element(debug = "pretty")] -fn block_io_set_streams(fields: zyn::syn::Fields) -> zyn::TokenStream -{ - zyn::zyn!( - #[allow(unreachable_code)] - fn set_anonymous_out_stream( - &mut self, - output_index: usize, - producer: oxydsp_flowgraph::io::AnonymousStreamProducer, - ) - { - match output_index - { - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) - { - {{ field.0 }} => self.{{field.1.ident}}.set_anonymous_stream(producer), - } - _ => panic!("output_index out of bounds.") - }; - } - - #[allow(unreachable_code)] - fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: oxydsp_flowgraph::io::AnonymousStreamConsumer) - { - match input_index - { - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate()) - { - {{ field.0 }} => self.{{field.1.ident}}.set_anonymous_stream(consumer), - } - _ => panic!("output_index out of bounds.") - }; - } - ) -} - -#[zyn::element] -fn block_io_create_stream(fields: zyn::syn::Fields) -> zyn::TokenStream -{ - zyn::zyn!( - #[allow(unreachable_code)] - fn create_anonymous_stream_for( - &mut self, - output_index: usize, - capacity: usize, - ) -> ( - oxydsp_flowgraph::io::AnonymousStreamProducer, - oxydsp_flowgraph::io::AnonymousStreamConsumer, - ) - { - let output = match output_index - { - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) - { - {{ field.0 }} => self.{{ field.1.ident }}.create_anonymous_stream(capacity), - } - _ => panic!("output_index out of bounds."), - }; - return output; - } - ) -} - #[zyn::element] fn out_inner_type(ty: zyn::syn::Type) -> zyn::TokenStream { diff --git a/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/sync.rs b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/sync.rs index 502c8b4..aab6d29 100644 --- a/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/sync.rs +++ b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/sync.rs @@ -1,14 +1,10 @@ -use zyn::Fields; -use zyn::FromInput; use zyn::ToTokens; -use zyn::ast::at; use zyn::ext::AttrExt; use zyn::ext::FieldsExt; use zyn::ext::TypeExt; use zyn::quote::quote; use zyn::syn::Field; use zyn::syn::GenericParam; -use zyn::syn::Lifetime; use zyn::syn::parse_quote; use crate::SyncBlockConfig; diff --git a/oxydsp-flowgraph/src/block.rs b/oxydsp-flowgraph/src/block.rs index bc65279..f2ad426 100644 --- a/oxydsp-flowgraph/src/block.rs +++ b/oxydsp-flowgraph/src/block.rs @@ -1,6 +1,8 @@ -use crate::edge::BlockIOIndex; -use crate::io::AnonymousStreamConsumer; -use crate::io::AnonymousStreamProducer; +use crate::io::AnonymousIn; +use crate::io::AnonymousOut; +use crate::io::In; +use crate::io::Out; +use crate::io::edge::BlockIOIndex; pub enum BlockResult { @@ -15,28 +17,60 @@ pub enum BlockResult Exit, } +pub trait BlockInput +{ + fn get_inputs_mut(&mut self) -> Vec<&mut dyn AnonymousIn>; + fn get_inputs(&self) -> Vec<&dyn AnonymousIn>; + + // Meta information + fn get_types_names(&self) -> Vec<&'static str>; +} + +pub trait BlockOutput +{ + fn get_outputs_mut(&mut self) -> Vec<&mut dyn AnonymousOut>; + fn get_outputs(&self) -> Vec<&dyn AnonymousOut>; + + // Meta information + fn get_types_names(&self) -> Vec<&'static str>; +} + pub trait BlockIO { + fn get_inputs_mut(&mut self) -> Vec<&mut dyn AnonymousIn>; + fn get_outputs_mut(&mut self) -> Vec<&mut dyn AnonymousOut>; + + fn get_inputs(&self) -> Vec<&dyn AnonymousIn>; + fn get_outputs(&self) -> Vec<&dyn AnonymousOut>; + + fn get_successors(&self) -> Vec + { + self.get_outputs() + .iter() + .map(|x| x.get_consumer_block().unwrap()) + .collect() + } + // Get all of the BlockIOIndices (block index + port) that // this blocks can send data to. - fn get_successors(&self) -> Vec; - - // Sets the index of the current blocks on the shared edges - fn set_index(&self, block_index: usize); - - // Number of input/output ports - fn input_count(&self) -> usize; - fn output_count(&self) -> usize; - - // Stream managment - fn set_anonymous_out_stream(&mut self, output_index: usize, producer: AnonymousStreamProducer); - fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: AnonymousStreamConsumer); - - fn create_anonymous_stream_for( - &mut self, - output_index: usize, - capacity: usize, - ) -> (AnonymousStreamProducer, AnonymousStreamConsumer); + // fn get_successors(&self) -> Vec; + // + // // Sets the index of the current blocks on the shared edges + // fn set_index(&self, block_index: usize); + // + // // Number of input/output ports + // fn input_count(&self) -> usize; + // fn output_count(&self) -> usize; + // + // // Stream managment + // fn set_anonymous_out_stream(&mut self, output_index: usize, producer: AnonymousStreamProducer); + // fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: AnonymousStreamConsumer); + // + // fn create_anonymous_stream_for( + // &mut self, + // output_index: usize, + // capacity: usize, + // ) -> (AnonymousStreamProducer, AnonymousStreamConsumer); // Meta information fn get_block_name(&self) -> &'static str; @@ -67,3 +101,173 @@ pub trait SyncBlock<'view>: SyncBlockIO<'view> pub trait GraphableBlock: Block + BlockIO {} impl GraphableBlock for T where T: Block + BlockIO {} + +impl BlockInput for In +{ + fn get_inputs_mut(&mut self) -> Vec<&mut dyn AnonymousIn> + { + vec![self] + } + + fn get_inputs(&self) -> Vec<&dyn AnonymousIn> + { + vec![self] + } + + fn get_types_names(&self) -> Vec<&'static str> + { + vec![std::any::type_name::()] + } +} + +impl BlockInput for Option +{ + fn get_inputs_mut(&mut self) -> Vec<&mut dyn AnonymousIn> + { + if let Some(input) = self + { + input.get_inputs_mut() + } + else + { + vec![] + } + } + + fn get_inputs(&self) -> Vec<&dyn AnonymousIn> + { + if let Some(input) = self + { + input.get_inputs() + } + else + { + vec![] + } + } + + fn get_types_names(&self) -> Vec<&'static str> + { + if let Some(input) = self + { + input.get_types_names() + } + else + { + vec![] + } + } +} + +impl BlockInput for [I; N] +{ + fn get_inputs(&self) -> Vec<&dyn AnonymousIn> + { + let mut output = vec![]; + for input in self + { + output.extend(input.get_inputs()); + } + output + } + + fn get_inputs_mut(&mut self) -> Vec<&mut dyn AnonymousIn> + { + let mut output = vec![]; + for input in self + { + output.extend(input.get_inputs_mut()); + } + output + } + + fn get_types_names(&self) -> Vec<&'static str> + { + vec![std::any::type_name::(); N] + } +} + +impl BlockOutput for Out +{ + fn get_outputs_mut(&mut self) -> Vec<&mut dyn AnonymousOut> + { + vec![self] + } + + fn get_outputs(&self) -> Vec<&dyn AnonymousOut> + { + vec![self] + } + + fn get_types_names(&self) -> Vec<&'static str> + { + vec![std::any::type_name::()] + } +} + +impl BlockOutput for Option +{ + fn get_outputs_mut(&mut self) -> Vec<&mut dyn AnonymousOut> + { + if let Some(output) = self + { + output.get_outputs_mut() + } + else + { + vec![] + } + } + + fn get_outputs(&self) -> Vec<&dyn AnonymousOut> + { + if let Some(output) = self + { + output.get_outputs() + } + else + { + vec![] + } + } + + fn get_types_names(&self) -> Vec<&'static str> + { + if let Some(input) = self + { + input.get_types_names() + } + else + { + vec![] + } + } +} + +impl BlockOutput for [I; N] +{ + fn get_outputs_mut(&mut self) -> Vec<&mut dyn AnonymousOut> + { + let mut result = vec![]; + for output in self + { + result.extend(output.get_outputs_mut()); + } + result + } + + fn get_outputs(&self) -> Vec<&dyn AnonymousOut> + { + let mut result = vec![]; + for output in self + { + result.extend(output.get_outputs()); + } + result + } + + fn get_types_names(&self) -> Vec<&'static str> + { + vec![std::any::type_name::(); N] + } +} diff --git a/oxydsp-flowgraph/src/edge.rs b/oxydsp-flowgraph/src/edge.rs deleted file mode 100644 index 62924c5..0000000 --- a/oxydsp-flowgraph/src/edge.rs +++ /dev/null @@ -1,20 +0,0 @@ -use std::any::Any; - -#[derive(Default)] -pub struct Edge -{ - // Represents the index of the block owning the Out end in the graph - // And the the output index within that block - pub from: Option, - - // Represents the index of the block owning the In end in the graph - // And the the input index within that block - pub to: Option, -} - -#[derive(Clone, Copy, PartialEq, Eq, Debug)] -pub struct BlockIOIndex -{ - pub block_index: usize, - pub port_index: usize, -} diff --git a/oxydsp-flowgraph/src/event.rs b/oxydsp-flowgraph/src/event.rs deleted file mode 100644 index cc0ce53..0000000 --- a/oxydsp-flowgraph/src/event.rs +++ /dev/null @@ -1,5 +0,0 @@ -// Represents a FlowGrahWide, simultaneous event -pub enum FlowGraphEvent -{ - Kill(String), -} diff --git a/oxydsp-flowgraph/src/graph.rs b/oxydsp-flowgraph/src/graph.rs index 182e092..608c188 100644 --- a/oxydsp-flowgraph/src/graph.rs +++ b/oxydsp-flowgraph/src/graph.rs @@ -1,6 +1,13 @@ +use std::sync::Arc; +use std::sync::atomic::AtomicBool; use std::thread::JoinHandle; +use crossbeam_deque::Steal; +use crossbeam_deque::Worker; + +use crate::block; use crate::block::GraphableBlock; +use crate::io::edge::BlockIOIndex; #[macro_export] macro_rules! flowgraph @@ -17,6 +24,21 @@ macro_rules! flowgraph } } +pub struct RunningGraph +{ + worker_handles: Vec>, +} + +impl RunningGraph +{ + pub fn join(self) + { + self.worker_handles.into_iter().for_each(|j| { + let _ = j.join(); + }); + } +} + pub struct FlowGraph { blocks: Vec>, @@ -31,48 +53,157 @@ impl FlowGraph pub fn add_block(&mut self, block: T) { - block.set_index(self.blocks.len()); + block.get_inputs().iter().enumerate().for_each(|(i, x)| { + x.set_index(BlockIOIndex { + block_index: self.blocks.len(), + port_index: i, + }) + }); + block.get_outputs().iter().enumerate().for_each(|(i, x)| { + x.set_index(BlockIOIndex { + block_index: self.blocks.len(), + port_index: i, + }) + }); self.blocks.push(Box::new(block)); } - pub fn run(mut self) -> JoinHandle<()> + // pub fn run(mut self, thread_count: usize) -> RunningGraph + // { + // self.populate_edges(); + // + // let mut worker_queues = (0..thread_count).map(|_| vec![]).collect::>(); + // + // for (i, block) in self.blocks.into_iter().enumerate() + // { + // worker_queues[i % thread_count].push(block); + // } + // + // let running = Arc::new(AtomicBool::new(true)); + // let worker_handles = worker_queues + // .into_iter() + // .map(|mut queue| { + // let running = running.clone(); + // std::thread::spawn(move || { + // 'outer: while running.load(std::sync::atomic::Ordering::Relaxed) + // { + // for block in queue.iter_mut() + // { + // match block.work() + // { + // crate::block::BlockResult::Ok => + // { + // // Reschedule block + // } + // crate::block::BlockResult::Terminated => + // { // DROP BLOCK + // } + // crate::block::BlockResult::Exit => + // { + // println!("KILLING GRAPH"); + // break 'outer; + // } + // } + // } + // } + // running.store(false, std::sync::atomic::Ordering::Relaxed); + // }) + // }) + // .collect::>(); + // RunningGraph { worker_handles } + // } + + pub fn run(mut self, thread_count: usize) -> RunningGraph { self.populate_edges(); - std::thread::spawn(move || { - 'outer: loop - { - for x in self.blocks.iter_mut() - { - match x.work() + let worker_queues = (0..thread_count) + .map(|_| Worker::>::new_fifo()) + .collect::>(); + + worker_queues + .iter() + .cycle() + .zip(self.blocks) + .for_each(|(worker, block)| worker.push(block)); + let stealers = worker_queues + .iter() + .map(|x| x.stealer()) + .collect::>(); + + let running = Arc::new(AtomicBool::new(true)); + let worker_handles = worker_queues + .into_iter() + .map(|queue| { + let stealers = stealers.clone(); + let running = running.clone(); + std::thread::spawn(move || { + 'outer: while running.load(std::sync::atomic::Ordering::Relaxed) { - crate::block::BlockResult::Ok => - {} - crate::block::BlockResult::Terminated => - { //break 'outer; - } - crate::block::BlockResult::Exit => + // Try to get a job + let mut block = queue.pop().unwrap_or_else(|| { + std::iter::repeat_with(|| { + stealers + .iter() + .map(|stealer| stealer.steal_batch_and_pop(&queue)) + .collect::>() + .success() + }) + .find(|x| x.is_some()) + .unwrap() + .unwrap() + }); + + match block.work() { - println!("KILLING GRAPH"); - break 'outer; + crate::block::BlockResult::Ok => + { + // Reschedule block + queue.push(block); + } + crate::block::BlockResult::Terminated => + { // DROP BLOCK + } + crate::block::BlockResult::Exit => + { + println!("KILLING GRAPH"); + break 'outer; + } } } - } - } - }) + running.store(false, std::sync::atomic::Ordering::Relaxed); + }) + }) + .collect::>(); + RunningGraph { worker_handles } } fn populate_edges(&mut self) { for block_index in 0..self.blocks.len() { - let successors = self.blocks[block_index].get_successors(); - for (output_index, succ_id) in successors.iter().enumerate() + let outputs = self.blocks[block_index].get_outputs_mut(); + let mut rxs = vec![]; + + for output in outputs.into_iter() { - let (tx, rx) = - self.blocks[block_index].create_anonymous_stream_for(output_index, 4096); - self.blocks[block_index].set_anonymous_out_stream(output_index, tx); - self.blocks[succ_id.block_index].set_anonymous_in_stream(succ_id.port_index, rx); + //let (tx, rx) = output.create_anonymous_stream(4096); + let (tx, rx) = output.create_anonymous_stream(8192); + //let (tx, rx) = output.create_anonymous_stream(65536); + + output.set_anonymous_stream(tx); + rxs.push(( + output + .get_consumer_block() + .expect("Non existent destination block."), + rx, + )) + } + + for (index, rx) in rxs.into_iter() + { + self.blocks[index.block_index].get_inputs_mut()[index.port_index] + .set_anonymous_stream(rx); } } } diff --git a/oxydsp-flowgraph/src/io.rs b/oxydsp-flowgraph/src/io.rs index 1164193..b3646a9 100644 --- a/oxydsp-flowgraph/src/io.rs +++ b/oxydsp-flowgraph/src/io.rs @@ -5,17 +5,20 @@ use std::sync::Mutex; use oxydsp_flowgraph_macros::generate_pop_iterable_tuple_impl; use oxydsp_flowgraph_macros::impl_iterator_for_pop_iter_tuple; -use crate::edge::BlockIOIndex; -use crate::edge::Edge; use crate::stream::StreamConsumer; use crate::stream::StreamProducer; use crate::stream::StreamReader; use crate::stream::StreamWriter; use crate::stream::{self}; -use crate::tag::Tag; use crate::tag::TagSlot; use crate::tag::Tagged; +pub mod edge; + +use crate::io::edge::BlockIOIndex; +use crate::io::edge::Edge; + +/// Represents a input port for a block pub struct In { stream: Option>, @@ -25,6 +28,7 @@ pub struct In edge: Arc>, } +/// Represents a output port for a block pub struct Out { stream: Option>, @@ -34,18 +38,119 @@ pub struct Out edge: Arc>, } +/// Trait to manipulate a block's input in a type agnostic/erased way +pub trait AnonymousIn +{ + /// Inform the input about the index of the blocks it's in, as well as its port index + fn set_index(&self, index: BlockIOIndex); + + /// Returns None or the block index of the block, and the block port of the corresponding + /// Out object + fn get_producer_block(&self) -> Option; + + /// Sets the internal stream object + fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer); +} + +/// Trait to manipulate a block's output in a type agnostic/erased way +pub trait AnonymousOut +{ + /// Inform the output about the index of the blocks it's in, as well as its port index + fn set_index(&self, index: BlockIOIndex); + + /// Sets the internal stream object + fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer); + + /// Returns None or the block index of the block, and the block port of the corresponding + /// In object + fn get_consumer_block(&self) -> Option; + + /// Creates the stream with the correct corresponding type, in a type erased way. + /// + /// This delegation of stream creation is necessary to allow the graph to manipulate + /// it, as it cannot know about the generic type of the stream. + fn create_anonymous_stream( + &self, + capacity: usize, + ) -> (AnonymousStreamProducer, AnonymousStreamConsumer); +} + +impl AnonymousIn for In +{ + fn set_index(&self, index: BlockIOIndex) + { + self.edge.lock().unwrap().to = Some(index); + } + + fn get_producer_block(&self) -> Option + { + self.edge.lock().unwrap().from + } + + fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer) + { + let (stream, tag_stream) = consumer.downcast::(); + self.stream = Some(stream); + self.tag_stream = Some(tag_stream); + } +} + +impl AnonymousOut for Out +{ + fn set_index(&self, index: BlockIOIndex) + { + self.edge.lock().unwrap().from = Some(index); + } + + fn get_consumer_block(&self) -> Option + { + self.edge.lock().unwrap().to + } + + fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer) + { + let (stream, tag_stream) = producer.downcast::(); + self.stream = Some(stream); + self.tag_stream = Some(tag_stream); + } + + // Delegate stream creation to Out object + // which knows the stream type + fn create_anonymous_stream( + &self, + capacity: usize, + ) -> (AnonymousStreamProducer, AnonymousStreamConsumer) + { + let (tx, rx) = stream::bounded_queue::(capacity); + let (tx_tag, rx_tag) = stream::bounded_queue::(capacity); + ((tx, tx_tag).into(), (rx, rx_tag).into()) + } +} + +/// A Reader to get data from an input pub struct InReader<'a, T> { data_reader: StreamReader<'a, T>, tag_reader: StreamReader<'a, TagSlot>, } +/// A writer to send data to an output pub struct OutWriter<'a, T> { data_writer: StreamWriter<'a, T>, tag_writer: StreamWriter<'a, TagSlot>, } +/// Creates a stream that can then be used to link blocks +/// +/// ```rust +/// let (output, input) = oxydsp-flowgraph::io::stream(); +/// +/// let writer = output.write(); +/// let reader = input.read(); +/// +/// // ... +/// ``` pub fn stream() -> (Out, In) { let edge = Arc::new(Mutex::new(Edge::default())); @@ -65,23 +170,12 @@ pub fn stream() -> (Out, In) impl In { - pub fn set_block_index(&self, index: BlockIOIndex) - { - self.edge.lock().unwrap().to = Some(index); - } - - pub fn get_producer_block(&self) -> Option - { - self.edge.lock().unwrap().from - } - - pub fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer) - { - let (stream, tag_stream) = consumer.downcast::(); - self.stream = Some(stream); - self.tag_stream = Some(tag_stream); - } - + /// Gets a reader view from an input. + /// + /// ``` + /// let reader = input.read(); + /// let data = reader.pop(); + /// ``` pub fn read<'a>(&'a mut self) -> InReader<'a, T> { let data_reader = self.stream.as_mut().unwrap().read(); @@ -95,35 +189,12 @@ impl In impl Out { - pub fn set_block_index(&self, index: BlockIOIndex) - { - self.edge.lock().unwrap().from = Some(index); - } - - pub fn get_consumer_block(&self) -> Option - { - self.edge.lock().unwrap().to - } - - pub fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer) - { - let (stream, tag_stream) = producer.downcast::(); - self.stream = Some(stream); - self.tag_stream = Some(tag_stream); - } - - // Delegate stream creation to Out object - // which knows the stream type - pub fn create_anonymous_stream( - &self, - capacity: usize, - ) -> (AnonymousStreamProducer, AnonymousStreamConsumer) - { - let (tx, rx) = stream::bounded_queue::(capacity); - let (tx_tag, rx_tag) = stream::bounded_queue::(capacity); - ((tx, tx_tag).into(), (rx, rx_tag).into()) - } - + /// Gets a reader view from an output. + /// + /// ``` + /// let writer = output.write(); + /// writer.push((data, tag).into()); + /// ``` pub fn write<'a>(&'a mut self) -> OutWriter<'a, T> { OutWriter { @@ -132,6 +203,17 @@ impl Out } } + /// Pushes an iterator to the output, sending the maximum amount of elements + /// to the output. + /// + /// It will not consume the iterator more than what can be sent. + /// + /// ``` + /// let writer = output.write(); + /// + /// // Send only 42s to the output + /// writer.push_iter(std::iter::repeat(42)); + /// ``` pub fn push_iter>>(&mut self, mut iter: I) -> bool { let writer = self.write(); @@ -151,7 +233,8 @@ impl Out true } - // Meta information + /// Meta information + /// Returns a string of the type of the output pub fn get_type_name(&self) -> &'static str { std::any::type_name::() @@ -160,16 +243,22 @@ impl Out impl InReader<'_, T> { + /// Gets the amount of elements that are available + /// on the input. pub fn len(&self) -> usize { self.data_reader.len() } + /// Returns true iif no elements are available on the input. pub fn is_empty(&self) -> bool { self.len() == 0 } + /// Pops an element from the input. + /// It is guaranteed to return `Some(data)` if + /// if pop was called strictly less times than len pub fn pop(&self) -> Option> { let data = self.data_reader.pop_with_index(); @@ -191,6 +280,9 @@ impl InReader<'_, T> } } + /// Pops an element from the input, discarding the tag. + /// It is guaranteed to return `Some(data)` if + /// if pop was called strictly less times than len pub fn pop_untag(&self) -> Option { self.pop().map(|data| data.into_inner()) @@ -199,16 +291,22 @@ impl InReader<'_, T> impl OutWriter<'_, T> { + /// Gets how much room is available on the output pub fn len(&self) -> usize { self.data_writer.len().min(self.tag_writer.len()) } + /// Returns true iif no element can be sent pub fn is_empty(&self) -> bool { self.len() == 0 } + /// Pushes some tagged data on the input. + /// + /// The operation succeeds (`Ok(())`) if there is enough room + /// Or fails returning the given data to the caller. pub fn push(&self, data: Tagged) -> Result<(), Tagged> { let (data, tag) = data.into(); @@ -227,12 +325,17 @@ impl OutWriter<'_, T> } } + /// Pushes some data on the input (not tagged). + /// + /// The operation succeeds (`Ok(())`) if there is enough room + /// Or fails returning the given data to the caller. pub fn push_no_tag(&self, data: T) -> Result<(), T> { self.data_writer.push(data) } } +/// An iterator type to push data to output(s) pub struct PopIter { len: usize, @@ -240,9 +343,16 @@ pub struct PopIter reader: T, } +/// Type on which data can be popped from pub trait PopIterable<'a> { type Output; + + /// Returns an iterator on the input elements : + /// + /// ``` + /// (&mut input_a, &mut input_b, &mut input_c).pop_iter().for_each(|(a, b, c)| println!("Got {a}, {b} and {c} !")); + /// ``` fn pop_iter(&'a mut self) -> PopIter; } @@ -296,14 +406,22 @@ impl_iterator_for_pop_iter_tuple! {10} impl_iterator_for_pop_iter_tuple! {11} impl_iterator_for_pop_iter_tuple! {12} -// Needed for graph to be able to manipulate -// stream endings without knowing the generic type +/// StreamProducer object for data and tags stored in a type +/// agnostic/erased way. +/// +/// This is needed for the graph system to manipulate and pass arround these objects +/// as they can't/don't know about the generic types of the stream objects pub struct AnonymousStreamProducer { inner: Box, inner_tag: StreamProducer, } +/// StreamConsumer object for data and tags stored in a type +/// agnostic/erased way. +/// +/// This is needed for the graph system to manipulate and pass arround these objects +/// as they can't/don't know about the generic types of the stream objects pub struct AnonymousStreamConsumer { inner: Box, @@ -353,10 +471,3 @@ impl AnonymousStreamConsumer ) } } - -// pub trait PushIterable<'a, T, I> -// where -// I: Iterator, -// { -// fn push_iter(&'a mut self, iter: I) -> bool; -// } diff --git a/oxydsp-flowgraph/src/io/edge.rs b/oxydsp-flowgraph/src/io/edge.rs new file mode 100644 index 0000000..2b96a61 --- /dev/null +++ b/oxydsp-flowgraph/src/io/edge.rs @@ -0,0 +1,23 @@ +/// Shared object between a block's input and output objects +/// so they can "communicate" and know about each other +#[derive(Default)] +pub struct Edge +{ + /// Represents the index of the block owning the Out end in the graph + /// And the the output index within that block + pub from: Option, + + /// Represents the index of the block owning the In end in the graph + /// And the the input index within that block + pub to: Option, +} + +/// Reprensents the location of a port (input or output) in terms of +/// - The block index in which they exist +/// - Their input or output index within that block +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub struct BlockIOIndex +{ + pub block_index: usize, + pub port_index: usize, +} diff --git a/oxydsp-flowgraph/src/lib.rs b/oxydsp-flowgraph/src/lib.rs index f80f847..a3bc6fb 100644 --- a/oxydsp-flowgraph/src/lib.rs +++ b/oxydsp-flowgraph/src/lib.rs @@ -1,9 +1,6 @@ -// This crate manages the flowgraph datastructures and execution/scheduling -// as well as the communication between the blocks - +/// This crate manages the flowgraph datastructures and execution/scheduling +/// as well as the communication between the blocks pub mod block; -pub mod edge; -pub mod event; pub mod graph; pub mod io; pub mod stream; diff --git a/oxydsp-flowgraph/src/main.rs b/oxydsp-flowgraph/src/main.rs deleted file mode 100644 index f328e4d..0000000 --- a/oxydsp-flowgraph/src/main.rs +++ /dev/null @@ -1 +0,0 @@ -fn main() {} diff --git a/oxydsp-flowgraph/src/stream.rs b/oxydsp-flowgraph/src/stream.rs index 489e4b1..b9acf27 100644 --- a/oxydsp-flowgraph/src/stream.rs +++ b/oxydsp-flowgraph/src/stream.rs @@ -1,6 +1,5 @@ use std::cell::Cell; use std::cell::UnsafeCell; -use std::io::empty; use std::mem::MaybeUninit; use std::ops::Deref; use std::sync::Arc; @@ -550,6 +549,28 @@ impl<'a, T> StreamWriter<'a, T> Err(element) } } + + pub fn write(&self, length: usize) + { + let new = self.written.get() + length; + assert!(new < self.len()); + self.written.set(new); + } +} + +impl<'a, T: Copy> StreamWriter<'a, T> +{ + pub fn slices_mut(&mut self) -> (&mut [MaybeUninit], &mut [MaybeUninit]) + { + unsafe { + ( + &mut *self.first.get(), + self.second + .map(|x| &mut *x.get()) + .unwrap_or_else(|| &mut(&mut *self.first.get())[0..0]), + ) + } + } } impl<'a, T> StreamReader<'a, T> @@ -643,6 +664,30 @@ impl<'a, T> StreamReader<'a, T> None } } + + pub fn read(&self, length: usize) + { + let new = self.read.get() + length; + assert!(new < self.len()); + self.read.set(new); + } +} + +impl<'a, T: Copy> StreamReader<'a, T> +{ + pub fn slices(&self) -> (&[T], &[T]) + { + unsafe { + ( + std::mem::transmute::<&[MaybeUninit], &[T]>(&*self.first.get()), + std::mem::transmute::<&[MaybeUninit], &[T]>( + self.second + .map(|x| &*x.get()) + .unwrap_or_else(|| &(&*self.first.get())[0..0]), + ), + ) + } + } } // When a Stream writer goes out of scope, it wrote @@ -653,13 +698,13 @@ impl<'a, T> Drop for StreamWriter<'a, T> { // Advance head. // We know that this value hasn't changed since this StreamWriter was created - let head = self.producer.inner.head.load(Ordering::Relaxed); + // let head = self.producer.inner.head.load(Ordering::Relaxed); // We want writes to the buffer to be visible when acquired in the pop side self.producer .inner .head - .store(head + self.written.get(), Ordering::Release); + .store(self.start_index + self.written.get(), Ordering::Release); } } @@ -671,13 +716,13 @@ impl<'a, T> Drop for StreamReader<'a, T> { // Advance tail. // We know that this value hasn't changed since this StreamWriter was created - let tail = self.producer.inner.tail.load(Ordering::Relaxed); + // let tail = self.producer.inner.tail.load(Ordering::Relaxed); // We want writes to the buffer to be visible when acquired in the push side self.producer .inner .tail - .store(tail + self.read.get(), Ordering::Release); + .store(self.start_index + self.read.get(), Ordering::Release); } } diff --git a/oxydsp-flowgraph/src/tag.rs b/oxydsp-flowgraph/src/tag.rs index 8e5869a..2793077 100644 --- a/oxydsp-flowgraph/src/tag.rs +++ b/oxydsp-flowgraph/src/tag.rs @@ -48,7 +48,7 @@ use std::ops::DerefMut; use std::sync::Arc; use std::sync::RwLock; -/// Object to allocate tags +/// Object to allocate tags and give a unique identifier per tag struct TagAllocator { // Counter to uniquely identify allocated tags @@ -58,27 +58,32 @@ struct TagAllocator labels: HashMap, } -// Label for a tag like : "symbol", "packet_start", "error" +/// Label for a tag like : "symbol", "packet_start", "error" struct TagLabel { - label: String, + // TODO: Allow user customization of labels + // maybe multiple labels + _label: String, } -// Front for tag allocator +/// Object from which TagKeys are obtained. This guarantees absence of collisions between tags pub struct Tags { allocator: Arc>, } +/// Used to anotate tags entries and retrieve them #[derive(Clone)] pub struct TagKey { key: usize, - owner: Arc>, + + // Maybe used later to retrieve labels for example + _owner: Arc>, _phantom: PhantomData, } -// Tags a particular sample within a specific stream +/// Tags a particular sample within a specific stream #[derive(Clone)] pub(crate) struct TagSlot { @@ -93,7 +98,7 @@ pub(crate) struct TagSlot pub tag: Tag, } -// Tag key value pairs +/// A Tag object containing TagKey-value pairs #[derive(Clone)] pub struct Tag { @@ -102,6 +107,7 @@ pub struct Tag impl Tags { + /// Creates a new tag allocator pub fn new() -> Self { Self { @@ -112,12 +118,13 @@ impl Tags } } + /// Allocates a new unique tag key pub fn allocate_tag(&mut self, label: impl AsRef) -> TagKey { let k = self.allocator.write().unwrap().allocate_tag::(label); TagKey { key: k, - owner: self.allocator.clone(), + _owner: self.allocator.clone(), _phantom: Default::default(), } } @@ -125,6 +132,7 @@ impl Tags impl TagAllocator { + /// Allocates a new unique tag key pub fn allocate_tag(&mut self, label: impl AsRef) -> usize { let key = self.counter; @@ -133,7 +141,7 @@ impl TagAllocator ( std::any::type_name::(), TagLabel { - label: label.as_ref().to_owned(), + _label: label.as_ref().to_owned(), }, ), ); @@ -144,6 +152,7 @@ impl TagAllocator impl Default for Tags { + /// Creates a new tag allocator fn default() -> Self { Self::new() @@ -152,6 +161,7 @@ impl Default for Tags impl Tag { + /// Creates a new empty tag pub fn new() -> Self { Self { @@ -159,6 +169,7 @@ impl Tag } } + /// Creates a new tag with a (key, value) entry pub fn with_entry(key: TagKey, value: T) -> Self { let new_tag = Self::default(); @@ -166,6 +177,7 @@ impl Tag new_tag } + /// Creates a new tag, which is the combination of the given tags pub fn from_tags(tag_opts: [&Tag; N]) -> Tag { let new_tag = Self::default(); @@ -181,6 +193,10 @@ impl Tag new_tag } + /// Creates a new tag option, which is the combination of the given tag options + /// + /// If all the tag options are None, None is returned + /// Otherwise it is Some of the combination of all of the tags which are Some pub fn from_tag_opts(tag_opts: [&Option; N]) -> Option { if tag_opts.iter().all(|t| t.is_none()) @@ -201,11 +217,14 @@ impl Tag Some(new_tag) } + /// Adds a new entry in the tag. If it already exists, it is overwritten pub fn add_entry(&self, key: TagKey, value: T) { self.data.write().unwrap().insert(key.key, Arc::new(value)); } + /// Retrieves an entry in tag. If ther is no such entry corresponding to the key, + /// retruns None pub fn retrieve(&self, key: &TagKey) -> Option> { let element = self.data.read().unwrap().get(&key.key).cloned();