diff --git a/example/src/main.rs b/example/src/main.rs index eb8d9c8..b78cb70 100644 --- a/example/src/main.rs +++ b/example/src/main.rs @@ -20,10 +20,12 @@ use oxydsp_dsp::units::DigitalFrequency; use oxydsp_flowgraph::BlockIO; use oxydsp_flowgraph::block::Block; use oxydsp_flowgraph::block::BlockResult; -use oxydsp_flowgraph::edge::In; -use oxydsp_flowgraph::edge::PopIterable; +use oxydsp_flowgraph::block::SyncBlockIO; use oxydsp_flowgraph::flowgraph; use oxydsp_flowgraph::graph::FlowGraph; +use oxydsp_flowgraph::io::In; +use oxydsp_flowgraph::io::PopIterable; +use oxydsp_flowgraph::sync_block; #[derive(BlockIO)] pub struct Printer @@ -34,6 +36,15 @@ pub struct Printer n: usize, } +impl SyncBlockIO for Printer +{ + type StateView = u32; + + type Input = T; + + type Output = T; +} + impl Printer { pub fn new(input: In) -> Self diff --git a/oxydsp-dsp/src/blocks/math/basic.rs b/oxydsp-dsp/src/blocks/math/basic.rs index bb5692b..772af36 100644 --- a/oxydsp-dsp/src/blocks/math/basic.rs +++ b/oxydsp-dsp/src/blocks/math/basic.rs @@ -3,7 +3,8 @@ use std::ops::{Add, Mul}; use oxydsp_flowgraph::{ BlockIO, block::{Block, BlockResult}, - edge::{In, Out, PopIterable, stream}, + io::{In, Out, PopIterable}, + tag::TagMergable, }; #[derive(BlockIO)] @@ -31,7 +32,7 @@ where { pub fn new(input_a: In, input_b: In) -> (Self, In) { - let (output, added) = stream(); + let (output, added) = oxydsp_flowgraph::io::stream(); ( Self { input_a, @@ -54,7 +55,7 @@ where self.output.push_iter( (&mut self.input_a, &mut self.input_b) .pop_iter() - .map(|(a, b)| a + b), + .map(|(a, b)| (a.0 + b.0, a.1.merge(&b.1))), ); BlockResult::Ok } @@ -85,7 +86,7 @@ where { pub fn new(input_a: In, input_b: In) -> (Self, In) { - let (output, added) = stream(); + let (output, added) = oxydsp_flowgraph::io::stream(); ( Self { input_a, @@ -108,7 +109,7 @@ where self.output.push_iter( (&mut self.input_a, &mut self.input_b) .pop_iter() - .map(|(a, b)| a * b), + .map(|(a, b)| (a.0 * b.0, a.1.merge(&b.1))), ); BlockResult::Ok } diff --git a/oxydsp-dsp/src/blocks/synthesis.rs b/oxydsp-dsp/src/blocks/synthesis.rs index 1da4156..904ca9f 100644 --- a/oxydsp-dsp/src/blocks/synthesis.rs +++ b/oxydsp-dsp/src/blocks/synthesis.rs @@ -4,10 +4,10 @@ use num::Float; use oxydsp_flowgraph::BlockIO; use oxydsp_flowgraph::block::Block; use oxydsp_flowgraph::block::BlockResult; -use oxydsp_flowgraph::edge::In; -use oxydsp_flowgraph::edge::Out; -use oxydsp_flowgraph::edge::PopIterable; -use oxydsp_flowgraph::edge::stream; +use oxydsp_flowgraph::io::In; +use oxydsp_flowgraph::io::Out; +use oxydsp_flowgraph::io::PopIterable; +use oxydsp_flowgraph::io::stream; #[derive(BlockIO)] pub struct OscillatorSource + 'static> @@ -31,7 +31,7 @@ impl + 'static> Block for OscillatorSource { fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - self.output.push_iter(&mut self.nco); + self.output.push_iter((&mut self.nco).map(|x| (x, None))); BlockResult::Ok } } @@ -81,8 +81,8 @@ impl + 'static> Block for Nco { self.output .push_iter(&mut self.frequency.pop_iter().map(|f| { - self.nco.set_frequency(f); - self.nco.next().unwrap() + self.nco.set_frequency(f.0); + (self.nco.next().unwrap(), f.1) })); BlockResult::Ok } diff --git a/oxydsp-dsp/src/blocks/utilities/adapters.rs b/oxydsp-dsp/src/blocks/utilities/adapters.rs index 060792c..930b218 100644 --- a/oxydsp-dsp/src/blocks/utilities/adapters.rs +++ b/oxydsp-dsp/src/blocks/utilities/adapters.rs @@ -1,7 +1,8 @@ use oxydsp_flowgraph::{ BlockIO, block::{Block, BlockResult}, - edge::{In, Out, PopIterable, stream}, + io::{In, Out, PopIterable, stream}, + tag::Tag, }; #[derive(BlockIO)] @@ -33,7 +34,8 @@ where { fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - self.output.push_iter(self.input.pop_iter().map(&self.map)); + self.output + .push_iter(self.input.pop_iter().map(|x| ((&self.map)(x.0), x.1))); BlockResult::Ok } } @@ -46,7 +48,7 @@ pub struct Repeat repetitions: usize, remaining: usize, - current: Option, + current: Option<(T, Option)>, #[output] output: Out, @@ -82,13 +84,24 @@ impl Block for Repeat { if self.remaining == 0 || self.current.is_none() { - self.current = Some(reader.pop().unwrap()); + self.current = Some(reader.pop_tagged().unwrap()); self.remaining = self.repetitions; } writer .push(self.current.clone().unwrap()) .unwrap_or_else(|_| panic!()); + + match &mut self.current + { + Some((_, tag)) => + { + *tag = None; + } + None => + {} + } + self.remaining -= 1; } diff --git a/oxydsp-dsp/src/blocks/utilities/channels.rs b/oxydsp-dsp/src/blocks/utilities/channels.rs index ac65b92..67db127 100644 --- a/oxydsp-dsp/src/blocks/utilities/channels.rs +++ b/oxydsp-dsp/src/blocks/utilities/channels.rs @@ -3,7 +3,7 @@ use std::sync::mpsc::{Receiver, Sender, SyncSender}; use oxydsp_flowgraph::{ BlockIO, block::{Block, BlockResult}, - edge::{In, Out, PopIterable, stream}, + io::{In, Out, PopIterable, stream}, }; #[derive(BlockIO)] @@ -45,7 +45,7 @@ impl Block for RxSource, I> { fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - if self.output.push_iter(self.input.iter()) + if self.output.push_iter(self.input.iter().map(|x| (x, None))) { BlockResult::Ok } @@ -63,7 +63,7 @@ impl Block for TxSink, I> if self .input .pop_iter() - .map(|x| self.output.send(x)) + .map(|x| self.output.send(x.0)) .any(|res| res.is_err()) { BlockResult::Terminated @@ -82,7 +82,7 @@ impl Block for TxSink, I> if self .input .pop_iter() - .map(|x| self.output.send(x)) + .map(|x| self.output.send(x.0)) .any(|res| res.is_err()) { BlockResult::Terminated diff --git a/oxydsp-dsp/src/blocks/utilities/iter.rs b/oxydsp-dsp/src/blocks/utilities/iter.rs index c5d5e84..ccaa179 100644 --- a/oxydsp-dsp/src/blocks/utilities/iter.rs +++ b/oxydsp-dsp/src/blocks/utilities/iter.rs @@ -1,7 +1,7 @@ use oxydsp_flowgraph::{ BlockIO, block::{Block, BlockResult}, - edge::{In, Out, stream}, + io::{In, Out, stream}, }; #[derive(BlockIO)] @@ -34,7 +34,7 @@ where { fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult { - if self.output.push_iter(&mut self.iter) + if self.output.push_iter((&mut self.iter).map(|x| (x, None))) { BlockResult::Ok } diff --git a/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs index cbe97db..8a1a713 100644 --- a/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs +++ b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs @@ -4,8 +4,11 @@ use zyn::ext::AttrExt; use zyn::ext::FieldsExt; use zyn::format_ident; use zyn::parse_input; +use zyn::syn::GenericParam; use zyn::syn::Index; use zyn::syn::Lit; +use zyn::syn::TypeGenerics; +use zyn::syn::parse_quote; use zyn::syn::spanned::Spanned; #[zyn::derive("BlockIO", attributes(input, output), debug = "pretty")] @@ -146,7 +149,7 @@ fn block_io_set_streams(fields: zyn::syn::Fields) -> zyn::TokenStream fn set_anonymous_out_stream( &mut self, output_index: usize, - producer: oxydsp_flowgraph::edge::AnonymousStreamProducer, + producer: oxydsp_flowgraph::io::AnonymousStreamProducer, ) { match output_index @@ -160,7 +163,7 @@ fn block_io_set_streams(fields: zyn::syn::Fields) -> zyn::TokenStream } #[allow(unreachable_code)] - fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: oxydsp_flowgraph::edge::AnonymousStreamConsumer) + fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: oxydsp_flowgraph::io::AnonymousStreamConsumer) { match input_index { @@ -178,29 +181,26 @@ fn block_io_set_streams(fields: zyn::syn::Fields) -> zyn::TokenStream fn block_io_create_stream(fields: zyn::syn::Fields) -> zyn::TokenStream { zyn::zyn!( - #[allow(unreachable_code)] - fn create_anonymous_stream_for( + #[allow(unreachable_code)] + fn create_anonymous_stream_for( &mut self, output_index: usize, - capacity: usize - ) -> (oxydsp_flowgraph::edge::AnonymousStreamProducer, oxydsp_flowgraph::edge::AnonymousStreamConsumer) - { - let (tx, rx): (oxydsp_flowgraph::edge::AnonymousStreamProducer, oxydsp_flowgraph::edge::AnonymousStreamConsumer) - = match output_index + capacity: usize, + ) -> ( + oxydsp_flowgraph::io::AnonymousStreamProducer, + oxydsp_flowgraph::io::AnonymousStreamConsumer, + ) { - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) + let output = match output_index { - {{ field.0 }} => + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) { - let (tx, rx) = oxydsp_flowgraph::stream::bounded_queue::<@out_inner_type(ty = field.1.ty.clone())>(capacity); - (tx.into(), rx.into()) - }, - } - _ => panic!("output_index out of bounds.") - }; - - (tx, rx) - } + {{ field.0 }} => self.{{ field.1.ident }}.create_anonymous_stream(capacity), + } + _ => panic!("output_index out of bounds."), + }; + return output; + } ) } @@ -234,81 +234,37 @@ fn out_inner_type(ty: zyn::syn::Type) -> zyn::TokenStream #[zyn::attribute] pub fn sync_block(#[zyn(input)] item: zyn::syn::ItemStruct) -> zyn::TokenStream { - let mut strcut_item = item.clone(); - let (impl_generics, type_generics, where_clause) = item.generics.split_for_impl(); + let lifetime: GenericParam = parse_quote!('view); + let mut generics = item.generics.clone(); + generics.params.insert(0, lifetime); + let (_impl_generics, type_generics, where_clause) = generics.split_for_impl(); let fields = &item.fields.as_named().unwrap().named; - // Get state fields let mut state_fields = vec![]; - for field in strcut_item.fields.iter_mut() + for field in fields.iter() { - let attr_index = field.attrs.iter().enumerate().find_map(|(i, attr)| { - if attr.is("sync_state") { Some(i) } else { None } - }); - - if let Some(state_field) = &field.ident - && let Some(attr_index) = attr_index + let mut f = field.clone(); + if f.attrs.iter().any(|x| x.is("input") || x.is("output")) { - state_fields.push(state_field.clone()); - field.attrs.remove(attr_index); + continue; } + + let tk = field.ty.clone().into_token_stream(); + f.ty = parse_quote!(&'view mut #tk); + state_fields.push(f); } zyn::zyn!( - {{ strcut_item }} + {{ item }} - impl {{ impl_generics }} oxydsp_flowgraph::block::Block for {{ strcut_item.ident }} {{ type_generics }} - where {{ where_clause }} + struct {{ item.ident | ident:"{}View" }} {{ type_generics }} + {{ where_clause }} { - fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult + @for (field in state_fields.iter()) { - let mut len = usize::MAX; - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate()) - { - let mut {{ field.1.ident.clone().unwrap() | ident:"{}_reader" }} = self.{{field.1.ident}}.read(); - len = len.min({{ field.1.ident.clone().unwrap() | ident:"{}_reader" }}.len()); - - } - - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) - { - let mut {{ field.1.ident.clone().unwrap() | ident: "{}_writer" }} = self.{{field.1.ident}}.write(); - len = len.min({{ field.1.ident.clone().unwrap() | ident: "{}_writer" }}.len()); - } - - for _ in 0..len - { - if let Some(( - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) - { - {{ field.1.ident.clone().unwrap() | ident: "{}_out" }}, - } - )) = Self::sync_work( - ( - @for (state_field in state_fields) - { - &mut self.{{ state_field }}, - } - ), - ( - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate()) - { - {{ field.1.ident.clone().unwrap() | ident: "{}_reader" }}.pop().unwrap(), - } - ) - ) - { - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) - { - let _ = {{ field.1.ident.clone().unwrap() | ident: "{}_writer" }}.push({{ field.1.ident.clone().unwrap() | ident: "{}_out" }}); - } - } else - { - return oxydsp_flowgraph::block::BlockResult::Terminated; - } - } - oxydsp_flowgraph::block::BlockResult::Ok + {{ field }}, } + _sync_block_phantom: std::marker::PhantomData<'view>, } ) } @@ -373,7 +329,7 @@ pub fn generate_pop_iterable_tuple_impl(input: TokenStream) -> TokenStream type Output = ( @for (i in 0..count) { - StreamReader<'a, {{ generics[i] }}>, + InReader<'a, {{ generics[i] }}>, } ); fn pop_iter(&'a mut self) -> PopIter @@ -457,14 +413,14 @@ pub fn impl_iterator_for_pop_iter_tuple(input: TokenStream) -> TokenStream > Iterator for PopIter<( @for (i in 0..count) { - StreamReader<'a, {{ generics[i] }}>, + InReader<'a, {{ generics[i] }}>, } )> { type Item = ( @for (i in 0..count) { - {{ generics[i] }}, + ({{ generics[i] }}, Option), } ); fn next(&mut self) -> Option @@ -476,7 +432,7 @@ pub fn impl_iterator_for_pop_iter_tuple(input: TokenStream) -> TokenStream Some(( @for (i in 0..count) { - self.reader.{{ Index::from(i) }}.pop().unwrap(), + self.reader.{{ Index::from(i) }}.pop_tagged().unwrap(), } )) } diff --git a/oxydsp-flowgraph/src/block.rs b/oxydsp-flowgraph/src/block.rs index 9c3c18f..e7d169b 100644 --- a/oxydsp-flowgraph/src/block.rs +++ b/oxydsp-flowgraph/src/block.rs @@ -1,4 +1,7 @@ -use crate::edge::{AnonymousStreamConsumer, AnonymousStreamProducer, BlockIOIndex}; +use crate::{ + edge::BlockIOIndex, + io::{AnonymousStreamConsumer, AnonymousStreamProducer}, +}; pub enum BlockResult { @@ -46,13 +49,18 @@ pub trait Block fn work(&mut self) -> BlockResult; } -pub trait SyncBlock +// Represents the input, output, state types +// that a SyncBlock will have to interacti with +pub trait SyncBlockIO { + type StateView; type Input; type Output; - type State; +} - fn sync_work(state: &mut Self::State, input: Self::Input) -> Option; +pub trait SyncBlock: SyncBlockIO +{ + fn sync_work(state: Self::StateView, input: Self::Input) -> Option; } pub trait GraphableBlock: Block + BlockIO {} diff --git a/oxydsp-flowgraph/src/edge.rs b/oxydsp-flowgraph/src/edge.rs index f89f1a1..62924c5 100644 --- a/oxydsp-flowgraph/src/edge.rs +++ b/oxydsp-flowgraph/src/edge.rs @@ -1,17 +1,4 @@ use std::any::Any; -use std::collections::binary_heap::Iter; -use std::sync::Arc; -use std::sync::Mutex; - -use oxydsp_flowgraph_macros::generate_pop_iterable_tuple_impl; -use oxydsp_flowgraph_macros::impl_iterator_for_pop_iter_tuple; - -use crate::stream; -use crate::stream::StreamConsumer; -use crate::stream::StreamProducer; -use crate::stream::StreamReader; -use crate::stream::StreamWriter; -use crate::tag::Tag; #[derive(Default)] pub struct Edge @@ -31,128 +18,3 @@ pub struct BlockIOIndex pub block_index: usize, pub port_index: usize, } - -// Needed for graph to be able to manipulate -// stream endings without knowing the generic type -pub struct AnonymousStreamProducer -{ - inner: Box, -} - -pub struct AnonymousStreamConsumer -{ - inner: Box, -} - -impl From> for AnonymousStreamProducer -{ - fn from(value: StreamProducer) -> Self - { - AnonymousStreamProducer { - inner: Box::new(value), - } - } -} - -impl From> for AnonymousStreamConsumer -{ - fn from(value: StreamConsumer) -> Self - { - AnonymousStreamConsumer { - inner: Box::new(value), - } - } -} - -impl AnonymousStreamProducer -{ - pub fn downcast(self) -> StreamProducer - { - *self.inner.downcast::>().unwrap() - } -} - -impl AnonymousStreamConsumer -{ - pub fn downcast(self) -> StreamConsumer - { - *self.inner.downcast::>().unwrap() - } -} - -pub struct PopIter -{ - len: usize, - popped: usize, - reader: T, -} - -pub trait PopIterable<'a> -{ - type Output; - fn pop_iter(&'a mut self) -> PopIter; -} - -impl<'a, T: 'static> PopIterable<'a> for In -{ - type Output = StreamReader<'a, T>; - fn pop_iter(&'a mut self) -> PopIter> - { - let reader = self.read(); - PopIter { - popped: 0, - len: reader.len(), - reader, - } - } -} - -generate_pop_iterable_tuple_impl! {2} -generate_pop_iterable_tuple_impl! {3} -generate_pop_iterable_tuple_impl! {4} -generate_pop_iterable_tuple_impl! {5} -generate_pop_iterable_tuple_impl! {6} -generate_pop_iterable_tuple_impl! {7} -generate_pop_iterable_tuple_impl! {8} -generate_pop_iterable_tuple_impl! {9} -generate_pop_iterable_tuple_impl! {10} -generate_pop_iterable_tuple_impl! {11} -generate_pop_iterable_tuple_impl! {12} -generate_pop_iterable_tuple_impl! {13} -generate_pop_iterable_tuple_impl! {14} -generate_pop_iterable_tuple_impl! {15} -generate_pop_iterable_tuple_impl! {16} -generate_pop_iterable_tuple_impl! {17} -generate_pop_iterable_tuple_impl! {18} -generate_pop_iterable_tuple_impl! {19} -generate_pop_iterable_tuple_impl! {20} - -impl<'a, T> Iterator for PopIter> -{ - type Item = T; - - fn next(&mut self) -> Option - { - self.reader.pop() - } -} - -impl_iterator_for_pop_iter_tuple! {2} -impl_iterator_for_pop_iter_tuple! {3} -impl_iterator_for_pop_iter_tuple! {4} -impl_iterator_for_pop_iter_tuple! {5} -impl_iterator_for_pop_iter_tuple! {6} -impl_iterator_for_pop_iter_tuple! {7} -impl_iterator_for_pop_iter_tuple! {8} -impl_iterator_for_pop_iter_tuple! {9} -impl_iterator_for_pop_iter_tuple! {10} -impl_iterator_for_pop_iter_tuple! {11} -impl_iterator_for_pop_iter_tuple! {12} -impl_iterator_for_pop_iter_tuple! {13} -impl_iterator_for_pop_iter_tuple! {14} -impl_iterator_for_pop_iter_tuple! {15} -impl_iterator_for_pop_iter_tuple! {16} -impl_iterator_for_pop_iter_tuple! {17} -impl_iterator_for_pop_iter_tuple! {18} -impl_iterator_for_pop_iter_tuple! {19} -impl_iterator_for_pop_iter_tuple! {20} diff --git a/oxydsp-flowgraph/src/io.rs b/oxydsp-flowgraph/src/io.rs index 68fd155..0e4935c 100644 --- a/oxydsp-flowgraph/src/io.rs +++ b/oxydsp-flowgraph/src/io.rs @@ -1,8 +1,10 @@ +use std::any::Any; use std::sync::Arc; use std::sync::Mutex; -use crate::edge::AnonymousStreamConsumer; -use crate::edge::AnonymousStreamProducer; +use oxydsp_flowgraph_macros::generate_pop_iterable_tuple_impl; +use oxydsp_flowgraph_macros::impl_iterator_for_pop_iter_tuple; + use crate::edge::BlockIOIndex; use crate::edge::Edge; use crate::stream::StreamConsumer; @@ -73,7 +75,9 @@ impl In pub fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer) { - self.stream = Some(consumer.downcast::()) + let (stream, tag_stream) = consumer.downcast::(); + self.stream = Some(stream); + self.tag_stream = Some(tag_stream); } pub fn read<'a>(&'a mut self) -> InReader<'a, T> @@ -101,7 +105,9 @@ impl Out pub fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer) { - self.stream = Some(producer.downcast::()) + let (stream, tag_stream) = producer.downcast::(); + self.stream = Some(stream); + self.tag_stream = Some(tag_stream); } // Delegate stream creation to Out object @@ -112,7 +118,8 @@ impl Out ) -> (AnonymousStreamProducer, AnonymousStreamConsumer) { let (tx, rx) = stream::bounded_queue::(capacity); - (tx.into(), rx.into()) + let (tx_tag, rx_tag) = stream::bounded_queue::(capacity); + ((tx, tx_tag).into(), (rx, rx_tag).into()) } pub fn write<'a>(&'a mut self) -> OutWriter<'a, T> @@ -123,7 +130,7 @@ impl Out } } - pub fn push_iter>(&mut self, mut iter: I) -> bool + pub fn push_iter)>>(&mut self, mut iter: I) -> bool { let writer = self.write(); let len = writer.len(); @@ -132,7 +139,17 @@ impl Out { if let Some(elt) = iter.next() { - let _ = writer.push(elt); + match elt.1 + { + Some(tag) => + { + let _ = writer.push_tagged(elt.0, tag); + } + None => + { + let _ = writer.push_no_tag(elt.0); + } + } } else { @@ -182,3 +199,179 @@ impl InReader<'_, T> self.pop_tagged().map(|(data, _)| data) } } + +impl OutWriter<'_, T> +{ + pub fn len(&self) -> usize + { + self.data_writer.len().min(self.tag_writer.len()) + } + + pub fn push(&self, (data, tag): (T, Option)) -> Result<(), (T, Option)> + { + match self.data_writer.push(data) + { + Ok(_) if tag.is_some() => + { + let _ = self.tag_writer.push(tag.unwrap()); + Ok(()) + } + Ok(_) => Ok(()), + Err(data) => Err((data, tag)), + } + } + + pub fn push_tagged(&self, data: T, tag: Tag) -> Result<(), (T, Tag)> + { + let res = self.data_writer.push(data); + match res + { + Ok(_) => + { + let _ = self.tag_writer.push(tag); + Ok(()) + } + Err(t) => Err((t, tag)), + } + } + + pub fn push_no_tag(&self, data: T) -> Result<(), T> + { + self.data_writer.push(data) + } +} + +pub struct PopIter +{ + len: usize, + popped: usize, + reader: T, +} + +pub trait PopIterable<'a> +{ + type Output; + fn pop_iter(&'a mut self) -> PopIter; +} + +impl<'a, T: 'static> PopIterable<'a> for In +{ + type Output = InReader<'a, T>; + fn pop_iter(&'a mut self) -> PopIter> + { + let reader = self.read(); + PopIter { + popped: 0, + len: reader.len(), + reader, + } + } +} + +generate_pop_iterable_tuple_impl! {2} +generate_pop_iterable_tuple_impl! {3} +generate_pop_iterable_tuple_impl! {4} +generate_pop_iterable_tuple_impl! {5} +generate_pop_iterable_tuple_impl! {6} +generate_pop_iterable_tuple_impl! {7} +generate_pop_iterable_tuple_impl! {8} +generate_pop_iterable_tuple_impl! {9} +generate_pop_iterable_tuple_impl! {10} +generate_pop_iterable_tuple_impl! {11} +generate_pop_iterable_tuple_impl! {12} +generate_pop_iterable_tuple_impl! {13} +generate_pop_iterable_tuple_impl! {14} +generate_pop_iterable_tuple_impl! {15} +generate_pop_iterable_tuple_impl! {16} +generate_pop_iterable_tuple_impl! {17} +generate_pop_iterable_tuple_impl! {18} +generate_pop_iterable_tuple_impl! {19} +generate_pop_iterable_tuple_impl! {20} + +impl<'a, T> Iterator for PopIter> +{ + type Item = (T, Option); + + fn next(&mut self) -> Option + { + self.reader.pop_tagged() + } +} + +impl_iterator_for_pop_iter_tuple! {2} +impl_iterator_for_pop_iter_tuple! {3} +impl_iterator_for_pop_iter_tuple! {4} +impl_iterator_for_pop_iter_tuple! {5} +impl_iterator_for_pop_iter_tuple! {6} +impl_iterator_for_pop_iter_tuple! {7} +impl_iterator_for_pop_iter_tuple! {8} +impl_iterator_for_pop_iter_tuple! {9} +impl_iterator_for_pop_iter_tuple! {10} +impl_iterator_for_pop_iter_tuple! {11} +impl_iterator_for_pop_iter_tuple! {12} +impl_iterator_for_pop_iter_tuple! {13} +impl_iterator_for_pop_iter_tuple! {14} +impl_iterator_for_pop_iter_tuple! {15} +impl_iterator_for_pop_iter_tuple! {16} +impl_iterator_for_pop_iter_tuple! {17} +impl_iterator_for_pop_iter_tuple! {18} +impl_iterator_for_pop_iter_tuple! {19} +impl_iterator_for_pop_iter_tuple! {20} + +// Needed for graph to be able to manipulate +// stream endings without knowing the generic type +pub struct AnonymousStreamProducer +{ + inner: Box, + inner_tag: StreamProducer, +} + +pub struct AnonymousStreamConsumer +{ + inner: Box, + inner_tag: StreamConsumer, +} + +impl From<(StreamProducer, StreamProducer)> for AnonymousStreamProducer +{ + fn from(value: (StreamProducer, StreamProducer)) -> Self + { + AnonymousStreamProducer { + inner: Box::new(value.0), + inner_tag: value.1, + } + } +} + +impl From<(StreamConsumer, StreamConsumer)> for AnonymousStreamConsumer +{ + fn from(value: (StreamConsumer, StreamConsumer)) -> Self + { + AnonymousStreamConsumer { + inner: Box::new(value.0), + inner_tag: value.1, + } + } +} + +impl AnonymousStreamProducer +{ + pub fn downcast(self) -> (StreamProducer, StreamProducer) + { + ( + *self.inner.downcast::>().unwrap(), + self.inner_tag, + ) + } +} + +impl AnonymousStreamConsumer +{ + pub fn downcast(self) -> (StreamConsumer, StreamConsumer) + { + ( + *self.inner.downcast::>().unwrap(), + self.inner_tag, + ) + } +} diff --git a/oxydsp-flowgraph/src/tag.rs b/oxydsp-flowgraph/src/tag.rs index b314996..e214d7c 100644 --- a/oxydsp-flowgraph/src/tag.rs +++ b/oxydsp-flowgraph/src/tag.rs @@ -17,7 +17,51 @@ pub struct Tag // TODO: Make it such that when a tag is duplicated, the data seems to be too: // When adding on a duplicate, it should not replicate on others, but without // requiring a deep copy. - pub data: Arc>>>, + pub data: Arc>>>, +} + +pub trait TagValue: Clone {} +impl TagValue for T where T: Clone {} + +pub trait TagMergable +{ + fn merge(&self, other: &T) -> Self; +} + +impl TagMergable for Tag +{ + fn merge(&self, other: &Self) -> Self + { + // TODO: More performant merge + let mut new = other.clone(); + + new.position = self.position; + { + let mut data_locked = new.data.lock().unwrap(); + for (k, v) in self.data.lock().unwrap().iter() + { + data_locked.insert(k.clone(), v.clone()); + } + } + + new + } +} + +impl TagMergable> for Option +{ + fn merge(&self, other: &Self) -> Self + { + match self + { + Some(first) => match other + { + Some(other) => Some(first.merge(other)), + None => Some(first.clone()), + }, + None => other.clone(), + } + } } /// Represents a data, with a potential tag attached to it.