Starting support for tags

This commit is contained in:
2026-03-18 16:38:23 +01:00
parent 3cdc0e613a
commit 4aef173c7c
5 changed files with 296 additions and 1 deletions

View File

@ -11,6 +11,7 @@ use crate::stream::StreamConsumer;
use crate::stream::StreamProducer;
use crate::stream::StreamReader;
use crate::stream::StreamWriter;
use crate::tag::Tag;
#[derive(Default)]
pub struct Edge
@ -82,6 +83,7 @@ impl AnonymousStreamConsumer
pub struct In<T>
{
stream: Option<StreamConsumer<T>>,
tag_stream: Option<StreamConsumer<Tag>>,
// Will rarely be accessed
edge: Arc<Mutex<Edge>>,
@ -90,6 +92,7 @@ pub struct In<T>
pub struct Out<T>
{
stream: Option<StreamProducer<T>>,
tag_stream: Option<StreamProducer<Tag>>,
// Will rarely be accessed
edge: Arc<Mutex<Edge>>,
@ -101,9 +104,14 @@ pub fn stream<T>() -> (Out<T>, In<T>)
(
Out {
stream: None,
tag_stream: None,
edge: edge.clone(),
},
In { stream: None, edge },
In {
stream: None,
tag_stream: None,
edge,
},
)
}

142
oxydsp-flowgraph/src/io.rs Normal file
View File

@ -0,0 +1,142 @@
use std::sync::{Arc, Mutex};
use crate::{
edge::{AnonymousStreamConsumer, AnonymousStreamProducer, BlockIOIndex, Edge},
stream::{self, StreamConsumer, StreamProducer, StreamReader, StreamWriter},
tag::Tag,
};
pub struct In<T>
{
stream: Option<StreamConsumer<T>>,
tag_stream: Option<StreamConsumer<Tag>>,
// Will rarely be accessed
edge: Arc<Mutex<Edge>>,
}
pub struct Out<T>
{
stream: Option<StreamProducer<T>>,
tag_stream: Option<StreamProducer<Tag>>,
// Will rarely be accessed
edge: Arc<Mutex<Edge>>,
}
pub struct InReader<'a, T>
{
data_reader: StreamReader<'a, T>,
tag_reader: StreamReader<'a, Tag>,
}
pub struct OutWriter<'a, T>
{
data_writer: StreamWriter<'a, T>,
tag_writer: StreamWriter<'a, Tag>,
}
pub fn stream<T>() -> (Out<T>, In<T>)
{
let edge = Arc::new(Mutex::new(Edge::default()));
(
Out {
stream: None,
tag_stream: None,
edge: edge.clone(),
},
In {
stream: None,
tag_stream: None,
edge,
},
)
}
impl<T: 'static> In<T>
{
pub fn set_block_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().to = Some(index);
}
pub fn get_producer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().from
}
pub fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer)
{
self.stream = Some(consumer.downcast::<T>())
}
pub fn read<'a>(&'a mut self) -> InReader<'a, T>
{
InReader {
data_reader: self.stream.as_mut().unwrap().read(),
tag_reader: self.tag_stream.as_mut().unwrap().read(),
}
}
}
impl<T: 'static> Out<T>
{
pub fn set_block_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().from = Some(index);
}
pub fn get_consumer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().to
}
pub fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer)
{
self.stream = Some(producer.downcast::<T>())
}
// Delegate stream creation to Out object
// which knows the stream type
pub fn create_anonymous_stream(
&self,
capacity: usize,
) -> (AnonymousStreamProducer, AnonymousStreamConsumer)
{
let (tx, rx) = stream::bounded_queue::<T>(capacity);
(tx.into(), rx.into())
}
pub fn write<'a>(&'a mut self) -> OutWriter<'a, T>
{
OutWriter {
data_writer: self.stream.as_mut().unwrap().write(),
tag_writer: self.tag_stream.as_mut().unwrap().write(),
}
}
pub fn push_iter<I: Iterator<Item = T>>(&mut self, mut iter: I) -> bool
{
let writer = self.write();
let len = writer.len();
for _ in 0..len
{
if let Some(elt) = iter.next()
{
let _ = writer.push(elt);
}
else
{
return false;
}
}
true
}
// Meta information
pub fn get_type_name(&self) -> &'static str
{
std::any::type_name::<T>()
}
}

View File

@ -4,5 +4,7 @@
pub mod block;
pub mod edge;
pub mod graph;
pub mod io;
pub mod stream;
pub mod tag;
pub use oxydsp_flowgraph_macros::{BlockIO, sync_block};

View File

@ -396,6 +396,48 @@ impl<T> StreamConsumer<T>
}
}
}
/// Creates a reader of contiguous elements that
/// satisfy the predicate
pub fn read_while<F>(&mut self, predicate: F) -> StreamReader<'_, T>
where
F: Fn(&T) -> bool,
{
// Take a normal reader. This contains available elements to read.
let mut reader = self.read();
// We need to trim the slices to keep only the satified elements
// First slice
let mut first_kept = 0;
for element in reader.first.get_mut()
{
// SAFETY
//
// If this element is in a reader returned by self.read
// with no pop called, we know it is initialized
let init_element = unsafe { element.assume_init_ref() };
let sat = predicate(init_element);
if !sat
{
// Stop here
// Forget about second slice
reader.second_len = 0;
reader.second = None;
reader.first_len = first_kept;
unsafe {
*reader.first =
std::mem::transmute::<
&mut [MaybeUninit<T>],
&mut UnsafeCell<[MaybeUninit<T>]>,
>(&mut (reader.first.get_mut())[0..first_kept]);
}
}
}
todo!()
}
}
impl<'a, T> StreamWriter<'a, T>

101
oxydsp-flowgraph/src/tag.rs Normal file
View File

@ -0,0 +1,101 @@
use std::{
any::Any,
collections::HashMap,
ops::{Deref, DerefMut},
sync::{Arc, Mutex},
};
// Tags a particular sample within a specific stream
#[derive(Clone)]
pub struct Tag
{
// Position of the sample this tag is tied to.
// The position is in terms of the stream front index when the
// sample was added
pub position: usize,
// TODO: Make it such that when a tag is duplicated, the data seems to be too:
// When adding on a duplicate, it should not replicate on others, but without
// requiring a deep copy.
pub data: Arc<Mutex<HashMap<String, Box<dyn Any>>>>,
}
/// Represents a data, with a potential tag attached to it.
#[derive(Clone)]
pub struct Tagged<T>
{
inner: T,
tag: Option<Tag>,
}
impl<T> Tagged<T>
{
pub fn new(inner: T, tag: Option<Tag>) -> Self
{
Self { inner, tag }
}
pub fn has_tag(&self) -> bool
{
self.tag.is_some()
}
pub fn strip(&mut self)
{
self.tag = None;
}
pub fn tag(&mut self, tag: Tag) -> Option<Tag>
{
let t = self.tag.take();
self.tag = Some(tag);
t
}
}
impl<T: Clone> Tagged<T>
{
pub fn stripped(&self) -> Self
{
self.inner.clone().into()
}
pub fn tagged(&self, tag: Tag) -> Self
{
(self.inner.clone(), tag).into()
}
}
impl<T> From<T> for Tagged<T>
{
fn from(value: T) -> Self
{
Self::new(value, None)
}
}
impl<T> From<(T, Tag)> for Tagged<T>
{
fn from((value, tag): (T, Tag)) -> Self
{
Self::new(value, Some(tag))
}
}
impl<T> DerefMut for Tagged<T>
{
fn deref_mut(&mut self) -> &mut Self::Target
{
&mut self.inner
}
}
impl<T> Deref for Tagged<T>
{
type Target = T;
fn deref(&self) -> &Self::Target
{
&self.inner
}
}