/// A tag is an amount of data you can "attach" to a particular sample in stream. /// Indeed you might need to mark sample, or bring information along with them sparsely : /// Tags are often a rare occurence. /// /// Example : /// /// Timing error detection : A sample is the center of a symbol /// There might be a Tag : /// /// ted_symbol_center: 0.011 (Here the number might represent a error value) /// /// But really, any kind of data could be referenced by a tag : /// A float, A string like a json structure, or even a binary blob. /// /// A tag is formed that way : /// /// The TagKey : /// | A unique key allocated at the begining, /// | Which is tied to a human readable label configured /// | ahead of runtime. /// | /// | And a type, which is used to constrain the type that the tag /// | contains, to keep downcasting safe and less error prone. /// /// The TagData : /// | Then, when it is time to tag sa sample, a block uses that tag key /// | to add a tag on a sample bundling the tag key, and the sepcific data. /// /// /// Another block downstream could then check if a sample has been tagged with something /// of interest with the tag key. /// If so, the block can then retrieve the data on the tag. /// /// Each sample can have multiple tags attached to it. /// /// TagKey using a usize backed identifier is used instead of the more /// classic way of using String-Value pairs to allow easier and faster /// Comparisons, as well as much tighter and cleaner managment of tags /// throughout the graph. /// /// /// use std::any::Any; use std::collections::HashMap; use std::marker::PhantomData; use std::ops::Deref; use std::ops::DerefMut; use std::sync::Arc; use std::sync::Mutex; pub struct Tags { // Counter to uniquely identify allocated tags counter: usize, // Keeps readable tag type and label(s) for the tags tag_data: HashMap, } pub struct TagKey { key: usize, _phantom: PhantomData, } // Label for a tag like : "symbol", "packet_start", "error" pub struct TagLabel { label: String, } // Tags a particular sample within a specific stream #[derive(Clone)] pub struct Tag { // Position of the sample this tag is tied to. // The position is in terms of the stream front index when the // sample was added pub position: usize, // TODO: Make it such that when a tag is duplicated, the data seems to be too: // When adding on a duplicate, it should not replicate on others, but without // requiring a deep copy. pub data: Arc>>>, } impl Tags { pub fn new() -> Self { Self { counter: 0, tag_data: HashMap::new(), } } pub fn allocate_tag(&mut self) -> TagKey { let new_tag = TagKey { key: self.counter, _phantom: Default::default(), }; self.counter += 1; new_tag } } impl Default for Tags { fn default() -> Self { Self::new() } } impl Tag { pub fn new() -> Self { Self { position: 0, data: Default::default(), } } pub fn merge_tag_opts(tag_opts: [Option; N]) -> Option { let mut out_tag = None; for tag in tag_opts.iter() { out_tag = out_tag.merge(tag); } out_tag } pub fn tag(&self, key: impl AsRef, value: T) { self.data .lock() .unwrap() .insert(key.as_ref().to_owned(), Arc::new(value)); } pub fn retrieve(&self, key: impl AsRef) -> Option> { self.data.lock().unwrap().get(key.as_ref()).cloned() } } impl Default for Tag { fn default() -> Self { Self::new() } } pub trait TagValue: Clone {} impl TagValue for T where T: Clone {} pub trait TagMergable { fn merge(&self, other: &T) -> Self; } impl TagMergable for Tag { fn merge(&self, other: &Self) -> Self { // TODO: More performant merge let mut new = other.clone(); new.position = self.position; { let mut data_locked = new.data.lock().unwrap(); for (k, v) in self.data.lock().unwrap().iter() { data_locked.insert(k.clone(), v.clone()); } } new } } impl TagMergable> for Option { fn merge(&self, other: &Self) -> Self { match self { Some(first) => match other { Some(other) => Some(first.merge(other)), None => Some(first.clone()), }, None => other.clone(), } } } impl TagMergable> for Tagged { fn merge(&self, other: &Option) -> Self { Tagged::new(self.0.clone(), self.1.merge(other)) } } /// Represents a data, with a potential tag attached to it. #[derive(Clone)] pub struct Tagged(pub T, pub Option); impl Tagged { pub fn new(inner: T, tag: Option) -> Self { if tag.is_none() { //println!("data has no tag"); } Self(inner, tag) } pub fn has_tag(&self) -> bool { self.1.is_some() } pub fn strip(&mut self) { self.1 = None; } pub fn into_inner(self) -> T { self.0 } pub fn tag(&mut self, tag: Tag) -> Option { let t = self.1.take(); self.1 = Some(tag); t } } impl Tagged { pub fn stripped(&self) -> Self { self.0.clone().into() } pub fn tagged(&self, tag: Tag) -> Self { (self.0.clone(), tag).into() } } impl From for Tagged { fn from(value: T) -> Self { Self::new(value, None) } } impl From<(T, Tag)> for Tagged { fn from((value, tag): (T, Tag)) -> Self { Self::new(value, Some(tag)) } } impl From<(T, Option)> for Tagged { fn from((value, tag): (T, Option)) -> Self { Self::new(value, tag) } } impl From> for (T, Option) { fn from(val: Tagged) -> Self { (val.0, val.1) } } impl DerefMut for Tagged { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } impl Deref for Tagged { type Target = T; fn deref(&self) -> &Self::Target { &self.0 } }