diff --git a/Cargo.lock b/Cargo.lock index 44d606f..8504b79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -892,6 +892,21 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8b14ccef22fc6f5a8f4d7d768562a182c04ce9a3b3157b91390b52ddfdf1a76" +[[package]] +name = "dpsk-modem" +version = "0.1.0" +dependencies = [ + "cpal", + "eframe", + "egui", + "egui_plot", + "hound", + "num", + "oxydsp-dsp", + "oxydsp-flowgraph", + "rand", +] + [[package]] name = "ecolor" version = "0.33.3" diff --git a/examples/bfsk-modem/src/main.rs b/examples/bfsk-modem/src/main.rs index 954db26..c3dd93f 100644 --- a/examples/bfsk-modem/src/main.rs +++ b/examples/bfsk-modem/src/main.rs @@ -1,40 +1,3 @@ -use std::collections::VecDeque; -use std::fs::File; -use std::io; -use std::io::Write; -use std::sync::mpsc; - -use cpal::traits::DeviceTrait; -use cpal::traits::HostTrait; -use eframe::NativeOptions; -use egui::Color32; -use egui_plot::Line; -use egui_plot::PlotPoints; -use num::Complex; -use num::Zero; -use oxydsp_dsp::blocks::filtering::fir::FirFilter; -use oxydsp_dsp::blocks::iq::zero_if::ZeroIf; -use oxydsp_dsp::blocks::math::basic::Adder; -use oxydsp_dsp::blocks::math::basic::Multiplier; -use oxydsp_dsp::blocks::synthesis::Nco; -use oxydsp_dsp::blocks::synthesis::OscillatorSource; -use oxydsp_dsp::blocks::ted::early_late::EarlyLateGate; -use oxydsp_dsp::blocks::utilities::adapters::Map; -use oxydsp_dsp::blocks::utilities::adapters::MapResultTagged; -use oxydsp_dsp::blocks::utilities::adapters::NullSink; -use oxydsp_dsp::blocks::utilities::adapters::Repeat; -use oxydsp_dsp::blocks::utilities::adapters::Scan; -use oxydsp_dsp::blocks::utilities::adapters::ScanTagged; -use oxydsp_dsp::blocks::utilities::channels::RxSource; -use oxydsp_dsp::blocks::utilities::iter::IterSource; -use oxydsp_dsp::blocks::utilities::squelch::Squelch; -use oxydsp_dsp::filtering::fir::Fir; -use oxydsp_dsp::units::DigitalFrequency; -use oxydsp_flowgraph::block::BlockResult; -use oxydsp_flowgraph::flowgraph; -use oxydsp_flowgraph::graph::FlowGraph; -use rand::random; - use crate::receiver::RadioReceiver; use crate::transmitter::Transmitter; @@ -51,7 +14,7 @@ fn main() loop { let mut user_input = String::new(); - io::stdin().read_line(&mut user_input).unwrap(); + std::io::stdin().read_line(&mut user_input).unwrap(); println!("Transmitting ..."); tx.transmit(user_input.as_bytes().to_vec()); } diff --git a/examples/bfsk-modem/src/receiver.rs b/examples/bfsk-modem/src/receiver.rs index 830102d..00783e7 100644 --- a/examples/bfsk-modem/src/receiver.rs +++ b/examples/bfsk-modem/src/receiver.rs @@ -28,6 +28,7 @@ use oxydsp_flowgraph::flowgraph; use oxydsp_flowgraph::graph::FlowGraph; use oxydsp_flowgraph::tag::Tag; use oxydsp_flowgraph::tag::Tagged; +use oxydsp_flowgraph::tag::Tags; use crate::CARRIER; use crate::DEVIATION; @@ -149,6 +150,7 @@ impl RadioReceiver pub fn start_new() -> Self { let carrier = DigitalFrequency::from_time_frequency(CARRIER, SAMPLE_RATE as f64); + let mut tags = Tags::default(); let (audio_tx, audio_rx) = mpsc::channel(); let (packet_tx, packet_rx) = mpsc::channel::>(); @@ -174,11 +176,13 @@ impl RadioReceiver let mut elg_loop = Fir(vec![1. / 20.; 30]); *elg_loop.0.last_mut().unwrap() = 1.; - let (elg, arg) = EarlyLateGate::new(arg, elg_loop, SAMPLE_PER_SYMBOL); + let symbol_tag = tags.allocate_tag("early late gate symbol"); + let (elg, arg) = EarlyLateGate::new(arg, elg_loop, SAMPLE_PER_SYMBOL, symbol_tag.clone()); // // Eye diagram let (tx, rx) = mpsc::channel::<(Vec, f32)>(); //let (eye_sender, arg) = ScanTagged::new(arg, VecDeque::<()>::new(), move |history, x| { + let symbol_tag2 = symbol_tag.clone(); let (eye_sender, arg) = ScanTagged::new(arg, VecDeque::new(), move |history, x| { let cloned_tag = x.1.clone(); if history.len() == 2 * SAMPLE_PER_SYMBOL @@ -188,9 +192,9 @@ impl RadioReceiver let mut error: f32 = 0.; let is_symbol_center = x.1.as_ref().is_some_and(|t| { - if let Some(err) = t.retrieve("elg_symbol") + if let Some(err) = t.retrieve(symbol_tag2.clone()) { - error = *err.downcast().unwrap(); + error = *err; true } else @@ -216,7 +220,7 @@ impl RadioReceiver if sample .1 .as_ref() - .is_some_and(|t| t.retrieve("elg_symbol").is_some()) + .is_some_and(|t| t.retrieve(symbol_tag.clone()).is_some()) && let Some(packet) = builder.next_bit(sample.0 < 0.) { let _ = packet_tx.send(packet); diff --git a/examples/bfsk-modem/src/transmitter.rs b/examples/bfsk-modem/src/transmitter.rs index ecd831e..9eb607a 100644 --- a/examples/bfsk-modem/src/transmitter.rs +++ b/examples/bfsk-modem/src/transmitter.rs @@ -1,44 +1,16 @@ -use std::collections::VecDeque; -use std::fs::File; -use std::io::Write; -use std::iter::FusedIterator; -use std::net::UdpSocket; -use std::ops::BitXor; -use std::sync::mpsc; -use std::sync::mpsc::Receiver; -use std::sync::mpsc::Sender; -use std::sync::mpsc::SyncSender; -use std::sync::mpsc::sync_channel; -use std::thread::JoinHandle; -use std::time::Duration; - use cpal::Stream; use cpal::traits::DeviceTrait; use cpal::traits::HostTrait; -use eframe::NativeOptions; -use egui::Color32; -use egui::output; -use egui_plot::Line; -use egui_plot::PlotPoints; use num::Complex; -use num::Zero; use oxydsp_dsp::blocks::filtering::fir::FirFilter; -use oxydsp_dsp::blocks::iq::zero_if::ZeroIf; -use oxydsp_dsp::blocks::math::basic::Adder; use oxydsp_dsp::blocks::math::basic::Multiplier; use oxydsp_dsp::blocks::synthesis::Nco; use oxydsp_dsp::blocks::synthesis::OscillatorSource; -use oxydsp_dsp::blocks::ted::early_late::EarlyLateGate; use oxydsp_dsp::blocks::utilities::adapters::Map; -use oxydsp_dsp::blocks::utilities::adapters::MapResultTagged; -use oxydsp_dsp::blocks::utilities::adapters::NullSink; use oxydsp_dsp::blocks::utilities::adapters::Repeat; use oxydsp_dsp::blocks::utilities::adapters::Scan; -use oxydsp_dsp::blocks::utilities::adapters::ScanTagged; use oxydsp_dsp::blocks::utilities::channels::RxSource; use oxydsp_dsp::blocks::utilities::channels::TxSink; -use oxydsp_dsp::blocks::utilities::iter::IterSource; -use oxydsp_dsp::blocks::utilities::squelch::Squelch; use oxydsp_dsp::filtering::fir::Fir; use oxydsp_dsp::units::DigitalFrequency; use oxydsp_flowgraph::BlockIO; @@ -49,6 +21,14 @@ use oxydsp_flowgraph::graph::FlowGraph; use oxydsp_flowgraph::io::In; use oxydsp_flowgraph::io::Out; use rand::random; +use std::iter::FusedIterator; +use std::net::UdpSocket; +use std::ops::BitXor; +use std::sync::mpsc; +use std::sync::mpsc::Receiver; +use std::sync::mpsc::SyncSender; +use std::sync::mpsc::sync_channel; +use std::thread::JoinHandle; use crate::CARRIER; use crate::DEVIATION; @@ -200,11 +180,11 @@ impl Transmitter // gaussian fir let fir = Fir((0..SAMPLE_PER_SYMBOL) - .map(|x| gaussian(0.1, x as f32 / SAMPLE_PER_SYMBOL as f32)) + .map(|x| gaussian(0.8, x as f32 / SAMPLE_PER_SYMBOL as f32)) .collect()) .normalized(); - //let (bit_filter, bits) = FirFilter::new(bits, fir); + let (bit_filter, bits) = FirFilter::new(bits, fir); let (to_freq, freq) = Map::new(bits, move |x| { DigitalFrequency::from_time_frequency(DEVIATION * x as f64, SAMPLE_RATE as f64) }); @@ -240,7 +220,7 @@ impl Transmitter packet_rec, linearizer, //reverb, - //bit_filter, + bit_filter, udp_map, to_freq, repeat, diff --git a/examples/dpsk-modem/Cargo.toml b/examples/dpsk-modem/Cargo.toml new file mode 100644 index 0000000..917333e --- /dev/null +++ b/examples/dpsk-modem/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "dpsk-modem" +version = "0.1.0" +edition = "2024" + +[dependencies] +oxydsp-flowgraph = {path = "../../oxydsp-flowgraph/"} +oxydsp-dsp = {path = "../../oxydsp-dsp/"} +egui = "0.33.3" +egui_plot = "0.34.1" +eframe = { version = "0.33.3", features = ["default_fonts", "wayland"] } +num = "0.4.3" +hound = "3.5.1" +rand = "0.10.0" +cpal = "0.17.3" diff --git a/examples/dpsk-modem/mod.wav b/examples/dpsk-modem/mod.wav new file mode 100644 index 0000000..e565447 Binary files /dev/null and b/examples/dpsk-modem/mod.wav differ diff --git a/examples/dpsk-modem/src/main.rs b/examples/dpsk-modem/src/main.rs new file mode 100644 index 0000000..f66f7c1 --- /dev/null +++ b/examples/dpsk-modem/src/main.rs @@ -0,0 +1,153 @@ +use std::collections::VecDeque; +use std::sync::mpsc::channel; +use std::time::Duration; + +use eframe::NativeOptions; +use egui::Color32; +use egui_plot::PlotPoints; +use egui_plot::Points; +use num::Complex; +use num::Integer; +use oxydsp_dsp::blocks::iq::zero_if::ZeroIf; +use oxydsp_dsp::blocks::math::basic::Multiplier; +use oxydsp_dsp::blocks::synthesis::OscillatorSource; +use oxydsp_dsp::blocks::utilities::adapters::MapResultTagged; +use oxydsp_dsp::blocks::utilities::adapters::NullSink; +use oxydsp_dsp::blocks::utilities::adapters::Repeat; +use oxydsp_dsp::blocks::utilities::adapters::Scan; +use oxydsp_dsp::blocks::utilities::channels::TxSink; +use oxydsp_dsp::blocks::utilities::iter::IterSource; +use oxydsp_dsp::filtering::fir::Fir; +use oxydsp_dsp::units::DigitalFrequency; +use oxydsp_flowgraph::block::BlockResult; +use oxydsp_flowgraph::flowgraph; +use oxydsp_flowgraph::tag::Tags; +use rand::random; +use rand::random_bool; + +const SAMPLE_RATE: usize = 48_000; +const CARRIER: f64 = 1000.; +const SAMPLE_PER_SYMBOL: usize = 96; + +fn main() +{ + modulator(); + demodulator(); + println!("Hello, world!"); +} + +fn demodulator() +{ + let mut reader = hound::WavReader::open("mod.wav").unwrap(); + let samples = reader + .samples::() + .map(|x| (x.unwrap() as f32) / (i16::MAX as f32)) + .collect::>(); + + let (signal_source, signal) = IterSource::new(samples.into_iter()); + let (mut zero_if, baseband) = ZeroIf::new( + signal, + DigitalFrequency::from_time_frequency(CARRIER, SAMPLE_RATE as f64).into(), + ); + zero_if.set_fir(Fir::lowpass( + DigitalFrequency::from_time_frequency(CARRIER + 100., SAMPLE_RATE as f64), + 100, + )); + + let (constellation_tx, conbtellation_rx) = channel::>(); + let tx_sink = TxSink::new(baseband, constellation_tx); + + let graph = flowgraph![signal_source, zero_if, tx_sink]; + graph.run(); + + let mut constellation = VecDeque::new(); + let mut n = 0; + eframe::run_simple_native("Plot", NativeOptions::default(), move |ctx, _frame| { + while let Ok(sample) = conbtellation_rx.try_recv() + { + if constellation.len() >= 100_000 + { + let _ = constellation.pop_back(); + } + + if n.is_multiple_of(&SAMPLE_PER_SYMBOL) + { + constellation.push_front(sample); + } + n += 1; + } + + egui::CentralPanel::default().show(ctx, |ui| { + egui_plot::Plot::new("hello").show(ui, |plot_ui| { + plot_ui.points( + Points::new( + "constellation", + constellation + .iter() + .map(|s| [s.re as f64, s.im as f64]) + .collect::(), + ) + .id("constellation") + .color(Color32::GREEN), + ); + }); + ctx.request_repaint(); + }); + }) + .unwrap(); +} + +fn modulator() +{ + let random_source = (0..1000).map(|_| random_bool(0.5)); + + let mut tags = Tags::new(); + let (mut bit_source, bits) = IterSource::new(random_source); + let last_tag = tags.allocate_tag("finished"); + bit_source.tag_last_with(last_tag.clone()); + + let (phase_map, phase) = Scan::new(bits, 1., |state, bit| { + if bit + { + *state *= -1.; + } + *state + }); + let (repeater, phase) = Repeat::new(phase, SAMPLE_PER_SYMBOL); + + let (oscillator, passband) = OscillatorSource::::new( + DigitalFrequency::from_time_frequency(CARRIER, SAMPLE_RATE as f64).into(), + ); + let (multiplier, passband) = Multiplier::new(passband, phase); + let (output_tx, output_rx) = channel::>(); + let (tx_map, passband) = MapResultTagged::new(passband, move |s| { + let _ = output_tx.send(s.0); + if s.retrieve(&last_tag).is_some() + { + return (s, BlockResult::Exit); + } + (s, BlockResult::Ok) + }); + let null_sink = NullSink::new(passband); + + let graph = flowgraph![ + bit_source, phase_map, repeater, oscillator, multiplier, tx_map, null_sink + ]; + graph.run(); + + let spec = hound::WavSpec { + channels: 1, + sample_rate: SAMPLE_RATE as u32, + bits_per_sample: 16, + sample_format: hound::SampleFormat::Int, + }; + let mut writer = hound::WavWriter::create("mod.wav", spec).unwrap(); + for sample in output_rx.iter() + { + let amplitude = i16::MAX as f32; + writer + .write_sample(((sample.re + random::() * 0.2) * amplitude) as i16) + .unwrap(); + } + writer.finalize().unwrap(); +} diff --git a/oxydsp-dsp/src/blocks/ted/early_late.rs b/oxydsp-dsp/src/blocks/ted/early_late.rs index 4f81e51..59c3007 100644 --- a/oxydsp-dsp/src/blocks/ted/early_late.rs +++ b/oxydsp-dsp/src/blocks/ted/early_late.rs @@ -9,6 +9,7 @@ use oxydsp_flowgraph::io::In; use oxydsp_flowgraph::io::Out; use oxydsp_flowgraph::sync_block; use oxydsp_flowgraph::tag::Tag; +use oxydsp_flowgraph::tag::TagKey; use crate::filtering::fir::Fir; use crate::filtering::fir::FirFilter; @@ -37,13 +38,20 @@ pub struct EarlyLateGate, + + symbol_tag_key: TagKey, } impl EarlyLateGate where T: Float + Sum + Clone + 'static + Send + Sync + NumCast, { - pub fn new(input: In, loop_filter: Fir, symbol_length: usize) -> (Self, In) + pub fn new( + input: In, + loop_filter: Fir, + symbol_length: usize, + symbol_tag_key: TagKey, + ) -> (Self, In) { let (output, samples) = oxydsp_flowgraph::io::stream(); ( @@ -57,6 +65,7 @@ where next_sample: symbol_length as f32, // We assume that the first symbol is 1.5 windows into // the stream loop_filter: FirFilter::new(loop_filter), + symbol_tag_key, }, samples, ) @@ -102,9 +111,7 @@ where *state.next_sample -= *state.window_location as f32; *state.window_location = 0; - let new_tag = Tag::default(); - new_tag.tag("elg_symbol", error); - tag = Some(new_tag); + tag = Some(Tag::with_entry(state.symbol_tag_key.clone(), error)); } Some((sample, tag).into()) diff --git a/oxydsp-dsp/src/blocks/utilities/iter.rs b/oxydsp-dsp/src/blocks/utilities/iter.rs index 6175583..2f351a1 100644 --- a/oxydsp-dsp/src/blocks/utilities/iter.rs +++ b/oxydsp-dsp/src/blocks/utilities/iter.rs @@ -3,12 +3,11 @@ use std::iter::Peekable; use oxydsp_flowgraph::BlockIO; use oxydsp_flowgraph::block::Block; use oxydsp_flowgraph::block::BlockResult; -use oxydsp_flowgraph::block::SyncBlock; use oxydsp_flowgraph::io::In; use oxydsp_flowgraph::io::Out; use oxydsp_flowgraph::io::stream; -use oxydsp_flowgraph::sync_block; use oxydsp_flowgraph::tag::Tag; +use oxydsp_flowgraph::tag::TagKey; #[derive(BlockIO)] pub struct IterSource @@ -16,6 +15,7 @@ where I::Item: 'static, { iter: Peekable, + finished_tag: Option>, #[output] output: Out, @@ -32,11 +32,17 @@ where ( Self { iter: iter.peekable(), + finished_tag: None, output, }, items, ) } + + pub fn tag_last_with(&mut self, finished_tag: TagKey<()>) + { + self.finished_tag = Some(finished_tag); + } } impl Block for IterSource @@ -53,11 +59,10 @@ where if let Some(element) = self.iter.next() { let mut tag = None; - if self.iter.peek().is_none() + if let Some(tag_key) = &self.finished_tag + && self.iter.peek().is_none() { - let new_tag = Tag::default(); - new_tag.tag("itersource_finished", ()); - tag = Some(new_tag); + tag = Some(Tag::with_entry(tag_key.clone(), ())); } let _ = writer.push((element, tag).into()); } diff --git a/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/sync.rs b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/sync.rs index 11160d7..502c8b4 100644 --- a/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/sync.rs +++ b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/sync.rs @@ -374,10 +374,10 @@ fn sync_block_block_impl_with_inputs( | { // Create output tag - let common_tag = oxydsp_flowgraph::tag::Tag::merge_tag_opts([ + let common_tag = oxydsp_flowgraph::tag::Tag::from_tag_opts([ @for (in_field in input_fields.iter()) { - {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone(), + &{{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}, } ]); diff --git a/oxydsp-flowgraph/src/graph.rs b/oxydsp-flowgraph/src/graph.rs index 79726e3..182e092 100644 --- a/oxydsp-flowgraph/src/graph.rs +++ b/oxydsp-flowgraph/src/graph.rs @@ -8,7 +8,7 @@ macro_rules! flowgraph ($($x:ident),* $(,)?) => { { - let mut flowgraph = FlowGraph::new(); + let mut flowgraph = oxydsp_flowgraph::graph::FlowGraph::new(); $( flowgraph.add_block($x); )* diff --git a/oxydsp-flowgraph/src/io.rs b/oxydsp-flowgraph/src/io.rs index 9f7079b..1164193 100644 --- a/oxydsp-flowgraph/src/io.rs +++ b/oxydsp-flowgraph/src/io.rs @@ -13,12 +13,13 @@ use crate::stream::StreamReader; use crate::stream::StreamWriter; use crate::stream::{self}; use crate::tag::Tag; +use crate::tag::TagSlot; use crate::tag::Tagged; pub struct In { stream: Option>, - tag_stream: Option>, + tag_stream: Option>, // Will rarely be accessed edge: Arc>, @@ -27,7 +28,7 @@ pub struct In pub struct Out { stream: Option>, - tag_stream: Option>, + tag_stream: Option>, // Will rarely be accessed edge: Arc>, @@ -36,13 +37,13 @@ pub struct Out pub struct InReader<'a, T> { data_reader: StreamReader<'a, T>, - tag_reader: StreamReader<'a, Tag>, + tag_reader: StreamReader<'a, TagSlot>, } pub struct OutWriter<'a, T> { data_writer: StreamWriter<'a, T>, - tag_writer: StreamWriter<'a, Tag>, + tag_writer: StreamWriter<'a, TagSlot>, } pub fn stream() -> (Out, In) @@ -119,7 +120,7 @@ impl Out ) -> (AnonymousStreamProducer, AnonymousStreamConsumer) { let (tx, rx) = stream::bounded_queue::(capacity); - let (tx_tag, rx_tag) = stream::bounded_queue::(capacity); + let (tx_tag, rx_tag) = stream::bounded_queue::(capacity); ((tx, tx_tag).into(), (rx, rx_tag).into()) } @@ -182,7 +183,7 @@ impl InReader<'_, T> { tag = self.tag_reader.pop(); } - Some((data, tag).into()) + Some((data, tag.map(|t| t.tag)).into()) } else { @@ -210,12 +211,9 @@ impl OutWriter<'_, T> pub fn push(&self, data: Tagged) -> Result<(), Tagged> { - let (data, mut tag) = data.into(); + let (data, tag) = data.into(); let position = self.data_writer.next_index(); - if let Some(tag) = &mut tag - { - tag.position = position - } + let tag = tag.map(|t| TagSlot { position, tag: t }); match self.data_writer.push(data) { @@ -225,7 +223,7 @@ impl OutWriter<'_, T> Ok(()) } Ok(_) => Ok(()), - Err(data) => Err((data, tag).into()), + Err(data) => Err((data, tag.map(|t| t.tag)).into()), } } @@ -303,18 +301,18 @@ impl_iterator_for_pop_iter_tuple! {12} pub struct AnonymousStreamProducer { inner: Box, - inner_tag: StreamProducer, + inner_tag: StreamProducer, } pub struct AnonymousStreamConsumer { inner: Box, - inner_tag: StreamConsumer, + inner_tag: StreamConsumer, } -impl From<(StreamProducer, StreamProducer)> for AnonymousStreamProducer +impl From<(StreamProducer, StreamProducer)> for AnonymousStreamProducer { - fn from(value: (StreamProducer, StreamProducer)) -> Self + fn from(value: (StreamProducer, StreamProducer)) -> Self { AnonymousStreamProducer { inner: Box::new(value.0), @@ -323,9 +321,9 @@ impl From<(StreamProducer, StreamProducer)> for AnonymousStr } } -impl From<(StreamConsumer, StreamConsumer)> for AnonymousStreamConsumer +impl From<(StreamConsumer, StreamConsumer)> for AnonymousStreamConsumer { - fn from(value: (StreamConsumer, StreamConsumer)) -> Self + fn from(value: (StreamConsumer, StreamConsumer)) -> Self { AnonymousStreamConsumer { inner: Box::new(value.0), @@ -336,7 +334,7 @@ impl From<(StreamConsumer, StreamConsumer)> for AnonymousStr impl AnonymousStreamProducer { - pub fn downcast(self) -> (StreamProducer, StreamProducer) + pub(crate) fn downcast(self) -> (StreamProducer, StreamProducer) { ( *self.inner.downcast::>().unwrap(), @@ -347,7 +345,7 @@ impl AnonymousStreamProducer impl AnonymousStreamConsumer { - pub fn downcast(self) -> (StreamConsumer, StreamConsumer) + pub(crate) fn downcast(self) -> (StreamConsumer, StreamConsumer) { ( *self.inner.downcast::>().unwrap(), diff --git a/oxydsp-flowgraph/src/tag.rs b/oxydsp-flowgraph/src/tag.rs index cb261aa..8e5869a 100644 --- a/oxydsp-flowgraph/src/tag.rs +++ b/oxydsp-flowgraph/src/tag.rs @@ -46,32 +46,41 @@ use std::marker::PhantomData; use std::ops::Deref; use std::ops::DerefMut; use std::sync::Arc; -use std::sync::Mutex; +use std::sync::RwLock; -pub struct Tags +/// Object to allocate tags +struct TagAllocator { // Counter to uniquely identify allocated tags counter: usize, // Keeps readable tag type and label(s) for the tags - tag_data: HashMap, -} - -pub struct TagKey -{ - key: usize, - _phantom: PhantomData, + labels: HashMap, } // Label for a tag like : "symbol", "packet_start", "error" -pub struct TagLabel +struct TagLabel { label: String, } +// Front for tag allocator +pub struct Tags +{ + allocator: Arc>, +} + +#[derive(Clone)] +pub struct TagKey +{ + key: usize, + owner: Arc>, + _phantom: PhantomData, +} + // Tags a particular sample within a specific stream #[derive(Clone)] -pub struct Tag +pub(crate) struct TagSlot { // Position of the sample this tag is tied to. // The position is in terms of the stream front index when the @@ -81,7 +90,14 @@ pub struct Tag // TODO: Make it such that when a tag is duplicated, the data seems to be too: // When adding on a duplicate, it should not replicate on others, but without // requiring a deep copy. - pub data: Arc>>>, + pub tag: Tag, +} + +// Tag key value pairs +#[derive(Clone)] +pub struct Tag +{ + data: Arc>>>, } impl Tags @@ -89,19 +105,40 @@ impl Tags pub fn new() -> Self { Self { - counter: 0, - tag_data: HashMap::new(), + allocator: Arc::new(RwLock::new(TagAllocator { + counter: 0, + labels: HashMap::default(), + })), } } - pub fn allocate_tag(&mut self) -> TagKey + pub fn allocate_tag(&mut self, label: impl AsRef) -> TagKey { - let new_tag = TagKey { - key: self.counter, + let k = self.allocator.write().unwrap().allocate_tag::(label); + TagKey { + key: k, + owner: self.allocator.clone(), _phantom: Default::default(), - }; + } + } +} + +impl TagAllocator +{ + pub fn allocate_tag(&mut self, label: impl AsRef) -> usize + { + let key = self.counter; + self.labels.insert( + self.counter, + ( + std::any::type_name::(), + TagLabel { + label: label.as_ref().to_owned(), + }, + ), + ); self.counter += 1; - new_tag + key } } @@ -118,32 +155,65 @@ impl Tag pub fn new() -> Self { Self { - position: 0, data: Default::default(), } } - pub fn merge_tag_opts(tag_opts: [Option; N]) -> Option + pub fn with_entry(key: TagKey, value: T) -> Self { - let mut out_tag = None; - for tag in tag_opts.iter() + let new_tag = Self::default(); + new_tag.add_entry(key, value); + new_tag + } + + pub fn from_tags(tag_opts: [&Tag; N]) -> Tag + { + let new_tag = Self::default(); { - out_tag = out_tag.merge(tag); + let mut writer = new_tag.data.write().unwrap(); + + for tag in tag_opts.iter() + { + let reader = tag.data.read().unwrap(); + writer.extend(reader.iter().map(|x| (*x.0, x.1.clone()))); + } } - out_tag + new_tag } - pub fn tag(&self, key: impl AsRef, value: T) + pub fn from_tag_opts(tag_opts: [&Option; N]) -> Option { - self.data - .lock() - .unwrap() - .insert(key.as_ref().to_owned(), Arc::new(value)); + if tag_opts.iter().all(|t| t.is_none()) + { + return None; + } + + let new_tag = Self::default(); + { + let mut writer = new_tag.data.write().unwrap(); + + for tag in tag_opts.iter().filter(|t| t.is_some()) + { + let reader = tag.as_ref().unwrap().data.read().unwrap(); + writer.extend(reader.iter().map(|x| (*x.0, x.1.clone()))); + } + } + Some(new_tag) } - pub fn retrieve(&self, key: impl AsRef) -> Option> + pub fn add_entry(&self, key: TagKey, value: T) { - self.data.lock().unwrap().get(key.as_ref()).cloned() + self.data.write().unwrap().insert(key.key, Arc::new(value)); + } + + pub fn retrieve(&self, key: &TagKey) -> Option> + { + let element = self.data.read().unwrap().get(&key.key).cloned(); + + // TODO: When available : downcast unchecked, the type should be guaranteed + // by the TagKey's generic + // (But there might be an issue if the key comes from somewhere else) + element.map(|x| x.downcast().unwrap()) } } @@ -167,19 +237,7 @@ impl TagMergable for Tag { fn merge(&self, other: &Self) -> Self { - // TODO: More performant merge - let mut new = other.clone(); - - new.position = self.position; - { - let mut data_locked = new.data.lock().unwrap(); - for (k, v) in self.data.lock().unwrap().iter() - { - data_locked.insert(k.clone(), v.clone()); - } - } - - new + Self::from_tags([self, other]) } } @@ -187,15 +245,7 @@ impl TagMergable> for Option { fn merge(&self, other: &Self) -> Self { - match self - { - Some(first) => match other - { - Some(other) => Some(first.merge(other)), - None => Some(first.clone()), - }, - None => other.clone(), - } + Tag::from_tag_opts([self, other]) } } @@ -215,10 +265,6 @@ impl Tagged { pub fn new(inner: T, tag: Option) -> Self { - if tag.is_none() - { - //println!("data has no tag"); - } Self(inner, tag) } @@ -243,6 +289,16 @@ impl Tagged self.1 = Some(tag); t } + + pub fn retrieve(&self, key: &TagKey) -> Option> + { + self.1.as_ref().and_then(|t| t.retrieve(key)) + } + + pub fn add_entry(&mut self, key: TagKey, value: D) + { + self.1.get_or_insert(Tag::default()).add_entry(key, value); + } } impl Tagged