From 4aef173c7cbf742ebdc3af173dd86def7e34ed7e Mon Sep 17 00:00:00 2001 From: Albin Chaboissier Date: Wed, 18 Mar 2026 16:38:23 +0100 Subject: [PATCH] Starting support for tags --- oxydsp-flowgraph/src/edge.rs | 10 ++- oxydsp-flowgraph/src/io.rs | 142 +++++++++++++++++++++++++++++++++ oxydsp-flowgraph/src/lib.rs | 2 + oxydsp-flowgraph/src/stream.rs | 42 ++++++++++ oxydsp-flowgraph/src/tag.rs | 101 +++++++++++++++++++++++ 5 files changed, 296 insertions(+), 1 deletion(-) create mode 100644 oxydsp-flowgraph/src/io.rs create mode 100644 oxydsp-flowgraph/src/tag.rs diff --git a/oxydsp-flowgraph/src/edge.rs b/oxydsp-flowgraph/src/edge.rs index ce32d10..44144f1 100644 --- a/oxydsp-flowgraph/src/edge.rs +++ b/oxydsp-flowgraph/src/edge.rs @@ -11,6 +11,7 @@ use crate::stream::StreamConsumer; use crate::stream::StreamProducer; use crate::stream::StreamReader; use crate::stream::StreamWriter; +use crate::tag::Tag; #[derive(Default)] pub struct Edge @@ -82,6 +83,7 @@ impl AnonymousStreamConsumer pub struct In { stream: Option>, + tag_stream: Option>, // Will rarely be accessed edge: Arc>, @@ -90,6 +92,7 @@ pub struct In pub struct Out { stream: Option>, + tag_stream: Option>, // Will rarely be accessed edge: Arc>, @@ -101,9 +104,14 @@ pub fn stream() -> (Out, In) ( Out { stream: None, + tag_stream: None, edge: edge.clone(), }, - In { stream: None, edge }, + In { + stream: None, + tag_stream: None, + edge, + }, ) } diff --git a/oxydsp-flowgraph/src/io.rs b/oxydsp-flowgraph/src/io.rs new file mode 100644 index 0000000..68367da --- /dev/null +++ b/oxydsp-flowgraph/src/io.rs @@ -0,0 +1,142 @@ +use std::sync::{Arc, Mutex}; + +use crate::{ + edge::{AnonymousStreamConsumer, AnonymousStreamProducer, BlockIOIndex, Edge}, + stream::{self, StreamConsumer, StreamProducer, StreamReader, StreamWriter}, + tag::Tag, +}; + +pub struct In +{ + stream: Option>, + tag_stream: Option>, + + // Will rarely be accessed + edge: Arc>, +} + +pub struct Out +{ + stream: Option>, + tag_stream: Option>, + + // Will rarely be accessed + edge: Arc>, +} + +pub struct InReader<'a, T> +{ + data_reader: StreamReader<'a, T>, + tag_reader: StreamReader<'a, Tag>, +} + +pub struct OutWriter<'a, T> +{ + data_writer: StreamWriter<'a, T>, + tag_writer: StreamWriter<'a, Tag>, +} + +pub fn stream() -> (Out, In) +{ + let edge = Arc::new(Mutex::new(Edge::default())); + ( + Out { + stream: None, + tag_stream: None, + edge: edge.clone(), + }, + In { + stream: None, + tag_stream: None, + edge, + }, + ) +} + +impl In +{ + pub fn set_block_index(&self, index: BlockIOIndex) + { + self.edge.lock().unwrap().to = Some(index); + } + + pub fn get_producer_block(&self) -> Option + { + self.edge.lock().unwrap().from + } + + pub fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer) + { + self.stream = Some(consumer.downcast::()) + } + + pub fn read<'a>(&'a mut self) -> InReader<'a, T> + { + InReader { + data_reader: self.stream.as_mut().unwrap().read(), + tag_reader: self.tag_stream.as_mut().unwrap().read(), + } + } +} + +impl Out +{ + pub fn set_block_index(&self, index: BlockIOIndex) + { + self.edge.lock().unwrap().from = Some(index); + } + + pub fn get_consumer_block(&self) -> Option + { + self.edge.lock().unwrap().to + } + + pub fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer) + { + self.stream = Some(producer.downcast::()) + } + + // Delegate stream creation to Out object + // which knows the stream type + pub fn create_anonymous_stream( + &self, + capacity: usize, + ) -> (AnonymousStreamProducer, AnonymousStreamConsumer) + { + let (tx, rx) = stream::bounded_queue::(capacity); + (tx.into(), rx.into()) + } + + pub fn write<'a>(&'a mut self) -> OutWriter<'a, T> + { + OutWriter { + data_writer: self.stream.as_mut().unwrap().write(), + tag_writer: self.tag_stream.as_mut().unwrap().write(), + } + } + + pub fn push_iter>(&mut self, mut iter: I) -> bool + { + let writer = self.write(); + let len = writer.len(); + + for _ in 0..len + { + if let Some(elt) = iter.next() + { + let _ = writer.push(elt); + } + else + { + return false; + } + } + true + } + + // Meta information + pub fn get_type_name(&self) -> &'static str + { + std::any::type_name::() + } +} diff --git a/oxydsp-flowgraph/src/lib.rs b/oxydsp-flowgraph/src/lib.rs index 20f4b59..27c2a23 100644 --- a/oxydsp-flowgraph/src/lib.rs +++ b/oxydsp-flowgraph/src/lib.rs @@ -4,5 +4,7 @@ pub mod block; pub mod edge; pub mod graph; +pub mod io; pub mod stream; +pub mod tag; pub use oxydsp_flowgraph_macros::{BlockIO, sync_block}; diff --git a/oxydsp-flowgraph/src/stream.rs b/oxydsp-flowgraph/src/stream.rs index df41584..1f2aedd 100644 --- a/oxydsp-flowgraph/src/stream.rs +++ b/oxydsp-flowgraph/src/stream.rs @@ -396,6 +396,48 @@ impl StreamConsumer } } } + + /// Creates a reader of contiguous elements that + /// satisfy the predicate + pub fn read_while(&mut self, predicate: F) -> StreamReader<'_, T> + where + F: Fn(&T) -> bool, + { + // Take a normal reader. This contains available elements to read. + let mut reader = self.read(); + + // We need to trim the slices to keep only the satified elements + + // First slice + let mut first_kept = 0; + for element in reader.first.get_mut() + { + // SAFETY + // + // If this element is in a reader returned by self.read + // with no pop called, we know it is initialized + let init_element = unsafe { element.assume_init_ref() }; + let sat = predicate(init_element); + if !sat + { + // Stop here + // Forget about second slice + reader.second_len = 0; + reader.second = None; + + reader.first_len = first_kept; + unsafe { + *reader.first = + std::mem::transmute::< + &mut [MaybeUninit], + &mut UnsafeCell<[MaybeUninit]>, + >(&mut (reader.first.get_mut())[0..first_kept]); + } + } + } + + todo!() + } } impl<'a, T> StreamWriter<'a, T> diff --git a/oxydsp-flowgraph/src/tag.rs b/oxydsp-flowgraph/src/tag.rs new file mode 100644 index 0000000..b314996 --- /dev/null +++ b/oxydsp-flowgraph/src/tag.rs @@ -0,0 +1,101 @@ +use std::{ + any::Any, + collections::HashMap, + ops::{Deref, DerefMut}, + sync::{Arc, Mutex}, +}; + +// Tags a particular sample within a specific stream +#[derive(Clone)] +pub struct Tag +{ + // Position of the sample this tag is tied to. + // The position is in terms of the stream front index when the + // sample was added + pub position: usize, + + // TODO: Make it such that when a tag is duplicated, the data seems to be too: + // When adding on a duplicate, it should not replicate on others, but without + // requiring a deep copy. + pub data: Arc>>>, +} + +/// Represents a data, with a potential tag attached to it. +#[derive(Clone)] +pub struct Tagged +{ + inner: T, + tag: Option, +} + +impl Tagged +{ + pub fn new(inner: T, tag: Option) -> Self + { + Self { inner, tag } + } + + pub fn has_tag(&self) -> bool + { + self.tag.is_some() + } + + pub fn strip(&mut self) + { + self.tag = None; + } + + pub fn tag(&mut self, tag: Tag) -> Option + { + let t = self.tag.take(); + self.tag = Some(tag); + t + } +} + +impl Tagged +{ + pub fn stripped(&self) -> Self + { + self.inner.clone().into() + } + + pub fn tagged(&self, tag: Tag) -> Self + { + (self.inner.clone(), tag).into() + } +} + +impl From for Tagged +{ + fn from(value: T) -> Self + { + Self::new(value, None) + } +} + +impl From<(T, Tag)> for Tagged +{ + fn from((value, tag): (T, Tag)) -> Self + { + Self::new(value, Some(tag)) + } +} + +impl DerefMut for Tagged +{ + fn deref_mut(&mut self) -> &mut Self::Target + { + &mut self.inner + } +} + +impl Deref for Tagged +{ + type Target = T; + + fn deref(&self) -> &Self::Target + { + &self.inner + } +}