Better tag interface, starting bpsk

This commit is contained in:
2026-03-25 13:27:46 +01:00
parent b13e846fa5
commit 7766d9b91d
13 changed files with 360 additions and 164 deletions

View File

@ -374,10 +374,10 @@ fn sync_block_block_impl_with_inputs(
|
{
// Create output tag
let common_tag = oxydsp_flowgraph::tag::Tag::merge_tag_opts([
let common_tag = oxydsp_flowgraph::tag::Tag::from_tag_opts([
@for (in_field in input_fields.iter())
{
{{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone(),
&{{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}},
}
]);

View File

@ -8,7 +8,7 @@ macro_rules! flowgraph
($($x:ident),* $(,)?) =>
{
{
let mut flowgraph = FlowGraph::new();
let mut flowgraph = oxydsp_flowgraph::graph::FlowGraph::new();
$(
flowgraph.add_block($x);
)*

View File

@ -13,12 +13,13 @@ use crate::stream::StreamReader;
use crate::stream::StreamWriter;
use crate::stream::{self};
use crate::tag::Tag;
use crate::tag::TagSlot;
use crate::tag::Tagged;
pub struct In<T>
{
stream: Option<StreamConsumer<T>>,
tag_stream: Option<StreamConsumer<Tag>>,
tag_stream: Option<StreamConsumer<TagSlot>>,
// Will rarely be accessed
edge: Arc<Mutex<Edge>>,
@ -27,7 +28,7 @@ pub struct In<T>
pub struct Out<T>
{
stream: Option<StreamProducer<T>>,
tag_stream: Option<StreamProducer<Tag>>,
tag_stream: Option<StreamProducer<TagSlot>>,
// Will rarely be accessed
edge: Arc<Mutex<Edge>>,
@ -36,13 +37,13 @@ pub struct Out<T>
pub struct InReader<'a, T>
{
data_reader: StreamReader<'a, T>,
tag_reader: StreamReader<'a, Tag>,
tag_reader: StreamReader<'a, TagSlot>,
}
pub struct OutWriter<'a, T>
{
data_writer: StreamWriter<'a, T>,
tag_writer: StreamWriter<'a, Tag>,
tag_writer: StreamWriter<'a, TagSlot>,
}
pub fn stream<T>() -> (Out<T>, In<T>)
@ -119,7 +120,7 @@ impl<T: 'static> Out<T>
) -> (AnonymousStreamProducer, AnonymousStreamConsumer)
{
let (tx, rx) = stream::bounded_queue::<T>(capacity);
let (tx_tag, rx_tag) = stream::bounded_queue::<Tag>(capacity);
let (tx_tag, rx_tag) = stream::bounded_queue::<TagSlot>(capacity);
((tx, tx_tag).into(), (rx, rx_tag).into())
}
@ -182,7 +183,7 @@ impl<T> InReader<'_, T>
{
tag = self.tag_reader.pop();
}
Some((data, tag).into())
Some((data, tag.map(|t| t.tag)).into())
}
else
{
@ -210,12 +211,9 @@ impl<T> OutWriter<'_, T>
pub fn push(&self, data: Tagged<T>) -> Result<(), Tagged<T>>
{
let (data, mut tag) = data.into();
let (data, tag) = data.into();
let position = self.data_writer.next_index();
if let Some(tag) = &mut tag
{
tag.position = position
}
let tag = tag.map(|t| TagSlot { position, tag: t });
match self.data_writer.push(data)
{
@ -225,7 +223,7 @@ impl<T> OutWriter<'_, T>
Ok(())
}
Ok(_) => Ok(()),
Err(data) => Err((data, tag).into()),
Err(data) => Err((data, tag.map(|t| t.tag)).into()),
}
}
@ -303,18 +301,18 @@ impl_iterator_for_pop_iter_tuple! {12}
pub struct AnonymousStreamProducer
{
inner: Box<dyn Any>,
inner_tag: StreamProducer<Tag>,
inner_tag: StreamProducer<TagSlot>,
}
pub struct AnonymousStreamConsumer
{
inner: Box<dyn Any>,
inner_tag: StreamConsumer<Tag>,
inner_tag: StreamConsumer<TagSlot>,
}
impl<T: 'static> From<(StreamProducer<T>, StreamProducer<Tag>)> for AnonymousStreamProducer
impl<T: 'static> From<(StreamProducer<T>, StreamProducer<TagSlot>)> for AnonymousStreamProducer
{
fn from(value: (StreamProducer<T>, StreamProducer<Tag>)) -> Self
fn from(value: (StreamProducer<T>, StreamProducer<TagSlot>)) -> Self
{
AnonymousStreamProducer {
inner: Box::new(value.0),
@ -323,9 +321,9 @@ impl<T: 'static> From<(StreamProducer<T>, StreamProducer<Tag>)> for AnonymousStr
}
}
impl<T: 'static> From<(StreamConsumer<T>, StreamConsumer<Tag>)> for AnonymousStreamConsumer
impl<T: 'static> From<(StreamConsumer<T>, StreamConsumer<TagSlot>)> for AnonymousStreamConsumer
{
fn from(value: (StreamConsumer<T>, StreamConsumer<Tag>)) -> Self
fn from(value: (StreamConsumer<T>, StreamConsumer<TagSlot>)) -> Self
{
AnonymousStreamConsumer {
inner: Box::new(value.0),
@ -336,7 +334,7 @@ impl<T: 'static> From<(StreamConsumer<T>, StreamConsumer<Tag>)> for AnonymousStr
impl AnonymousStreamProducer
{
pub fn downcast<T: 'static>(self) -> (StreamProducer<T>, StreamProducer<Tag>)
pub(crate) fn downcast<T: 'static>(self) -> (StreamProducer<T>, StreamProducer<TagSlot>)
{
(
*self.inner.downcast::<StreamProducer<T>>().unwrap(),
@ -347,7 +345,7 @@ impl AnonymousStreamProducer
impl AnonymousStreamConsumer
{
pub fn downcast<T: 'static>(self) -> (StreamConsumer<T>, StreamConsumer<Tag>)
pub(crate) fn downcast<T: 'static>(self) -> (StreamConsumer<T>, StreamConsumer<TagSlot>)
{
(
*self.inner.downcast::<StreamConsumer<T>>().unwrap(),

View File

@ -46,32 +46,41 @@ use std::marker::PhantomData;
use std::ops::Deref;
use std::ops::DerefMut;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
pub struct Tags
/// Object to allocate tags
struct TagAllocator
{
// Counter to uniquely identify allocated tags
counter: usize,
// Keeps readable tag type and label(s) for the tags
tag_data: HashMap<usize, (String, TagLabel)>,
}
pub struct TagKey<T>
{
key: usize,
_phantom: PhantomData<T>,
labels: HashMap<usize, (&'static str, TagLabel)>,
}
// Label for a tag like : "symbol", "packet_start", "error"
pub struct TagLabel
struct TagLabel
{
label: String,
}
// Front for tag allocator
pub struct Tags
{
allocator: Arc<RwLock<TagAllocator>>,
}
#[derive(Clone)]
pub struct TagKey<T>
{
key: usize,
owner: Arc<RwLock<TagAllocator>>,
_phantom: PhantomData<T>,
}
// Tags a particular sample within a specific stream
#[derive(Clone)]
pub struct Tag
pub(crate) struct TagSlot
{
// Position of the sample this tag is tied to.
// The position is in terms of the stream front index when the
@ -81,7 +90,14 @@ pub struct Tag
// TODO: Make it such that when a tag is duplicated, the data seems to be too:
// When adding on a duplicate, it should not replicate on others, but without
// requiring a deep copy.
pub data: Arc<Mutex<HashMap<String, Arc<dyn Any + Send + Sync>>>>,
pub tag: Tag,
}
// Tag key value pairs
#[derive(Clone)]
pub struct Tag
{
data: Arc<RwLock<HashMap<usize, Arc<dyn Any + Send + Sync + 'static>>>>,
}
impl Tags
@ -89,19 +105,40 @@ impl Tags
pub fn new() -> Self
{
Self {
counter: 0,
tag_data: HashMap::new(),
allocator: Arc::new(RwLock::new(TagAllocator {
counter: 0,
labels: HashMap::default(),
})),
}
}
pub fn allocate_tag<T>(&mut self) -> TagKey<T>
pub fn allocate_tag<T>(&mut self, label: impl AsRef<str>) -> TagKey<T>
{
let new_tag = TagKey {
key: self.counter,
let k = self.allocator.write().unwrap().allocate_tag::<T>(label);
TagKey {
key: k,
owner: self.allocator.clone(),
_phantom: Default::default(),
};
}
}
}
impl TagAllocator
{
pub fn allocate_tag<T>(&mut self, label: impl AsRef<str>) -> usize
{
let key = self.counter;
self.labels.insert(
self.counter,
(
std::any::type_name::<T>(),
TagLabel {
label: label.as_ref().to_owned(),
},
),
);
self.counter += 1;
new_tag
key
}
}
@ -118,32 +155,65 @@ impl Tag
pub fn new() -> Self
{
Self {
position: 0,
data: Default::default(),
}
}
pub fn merge_tag_opts<const N: usize>(tag_opts: [Option<Tag>; N]) -> Option<Tag>
pub fn with_entry<T: 'static + Send + Sync>(key: TagKey<T>, value: T) -> Self
{
let mut out_tag = None;
for tag in tag_opts.iter()
let new_tag = Self::default();
new_tag.add_entry(key, value);
new_tag
}
pub fn from_tags<const N: usize>(tag_opts: [&Tag; N]) -> Tag
{
let new_tag = Self::default();
{
out_tag = out_tag.merge(tag);
let mut writer = new_tag.data.write().unwrap();
for tag in tag_opts.iter()
{
let reader = tag.data.read().unwrap();
writer.extend(reader.iter().map(|x| (*x.0, x.1.clone())));
}
}
out_tag
new_tag
}
pub fn tag<T: 'static + Send + Sync>(&self, key: impl AsRef<str>, value: T)
pub fn from_tag_opts<const N: usize>(tag_opts: [&Option<Tag>; N]) -> Option<Tag>
{
self.data
.lock()
.unwrap()
.insert(key.as_ref().to_owned(), Arc::new(value));
if tag_opts.iter().all(|t| t.is_none())
{
return None;
}
let new_tag = Self::default();
{
let mut writer = new_tag.data.write().unwrap();
for tag in tag_opts.iter().filter(|t| t.is_some())
{
let reader = tag.as_ref().unwrap().data.read().unwrap();
writer.extend(reader.iter().map(|x| (*x.0, x.1.clone())));
}
}
Some(new_tag)
}
pub fn retrieve(&self, key: impl AsRef<str>) -> Option<Arc<dyn Any + Send + Sync>>
pub fn add_entry<T: 'static + Send + Sync>(&self, key: TagKey<T>, value: T)
{
self.data.lock().unwrap().get(key.as_ref()).cloned()
self.data.write().unwrap().insert(key.key, Arc::new(value));
}
pub fn retrieve<T: 'static + Send + Sync>(&self, key: &TagKey<T>) -> Option<Arc<T>>
{
let element = self.data.read().unwrap().get(&key.key).cloned();
// TODO: When available : downcast unchecked, the type should be guaranteed
// by the TagKey's generic
// (But there might be an issue if the key comes from somewhere else)
element.map(|x| x.downcast().unwrap())
}
}
@ -167,19 +237,7 @@ impl TagMergable<Tag> for Tag
{
fn merge(&self, other: &Self) -> Self
{
// TODO: More performant merge
let mut new = other.clone();
new.position = self.position;
{
let mut data_locked = new.data.lock().unwrap();
for (k, v) in self.data.lock().unwrap().iter()
{
data_locked.insert(k.clone(), v.clone());
}
}
new
Self::from_tags([self, other])
}
}
@ -187,15 +245,7 @@ impl TagMergable<Option<Tag>> for Option<Tag>
{
fn merge(&self, other: &Self) -> Self
{
match self
{
Some(first) => match other
{
Some(other) => Some(first.merge(other)),
None => Some(first.clone()),
},
None => other.clone(),
}
Tag::from_tag_opts([self, other])
}
}
@ -215,10 +265,6 @@ impl<T> Tagged<T>
{
pub fn new(inner: T, tag: Option<Tag>) -> Self
{
if tag.is_none()
{
//println!("data has no tag");
}
Self(inner, tag)
}
@ -243,6 +289,16 @@ impl<T> Tagged<T>
self.1 = Some(tag);
t
}
pub fn retrieve<D: Send + Sync + 'static>(&self, key: &TagKey<D>) -> Option<Arc<D>>
{
self.1.as_ref().and_then(|t| t.retrieve(key))
}
pub fn add_entry<D: Send + Sync + 'static>(&mut self, key: TagKey<D>, value: D)
{
self.1.get_or_insert(Tag::default()).add_entry(key, value);
}
}
impl<T: Clone> Tagged<T>