diff --git a/Cargo.lock b/Cargo.lock index 0e56a4e..42ffa2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,6 +20,16 @@ dependencies = [ "zyn", ] +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro2" version = "1.0.106" @@ -57,9 +67,9 @@ checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" [[package]] name = "zyn" -version = "0.4.2" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1821d8c02040d27b081b0e3552933622411b5fdd62d3529ee1e693e37084afe" +checksum = "3e17ca216fea955924d26c55b32bc71d4e7ce24bdfe4790f649f3b34e9258edc" dependencies = [ "zyn-core", "zyn-derive", @@ -67,10 +77,11 @@ dependencies = [ [[package]] name = "zyn-core" -version = "0.4.2" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "718adc314c1f0745d0c54a38226285338a7b800848c49529e282ee41cdbcdb31" +checksum = "f76574e3cb7a65a69e73359fc474f852e10484c033fc8ed149f9ab713d85c6e3" dependencies = [ + "prettyplease", "proc-macro2", "quote", "syn", @@ -78,9 +89,9 @@ dependencies = [ [[package]] name = "zyn-derive" -version = "0.4.2" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e592b35728519732b11b0b26d4c6feb9756dc1b805fc204a75c4348ef331ff3" +checksum = "8fd3a593e80434ab30e6402166faa1de4550aaf4019dd1aaa4f34d2aabf5cc57" dependencies = [ "zyn-core", ] diff --git a/oxydsp-flowgraph-macros/Cargo.toml b/oxydsp-flowgraph-macros/Cargo.toml index dcc3f20..9522172 100644 --- a/oxydsp-flowgraph-macros/Cargo.toml +++ b/oxydsp-flowgraph-macros/Cargo.toml @@ -7,4 +7,4 @@ edition = "2024" proc-macro = true [dependencies] -zyn = {version = "0.4.2", features = ["ext"]} +zyn = {version = "0.5", features = ["ext", "pretty"]} diff --git a/oxydsp-flowgraph-macros/src/lib.rs b/oxydsp-flowgraph-macros/src/lib.rs index 609fe1c..6340d5b 100644 --- a/oxydsp-flowgraph-macros/src/lib.rs +++ b/oxydsp-flowgraph-macros/src/lib.rs @@ -1,5 +1,11 @@ +use core::panic; + +use proc_macro::Ident; +use proc_macro::Spacing; +use zyn::ToTokens; use zyn::ext::AttrExt; use zyn::ext::FieldsExt; +use zyn::syn::spanned::Spanned; #[zyn::derive("BlockIO", attributes(input, output))] pub fn block_io( @@ -16,6 +22,8 @@ pub fn block_io( @block_io_set_index(fields = fields.clone()) @block_io_get_successors(fields = fields.clone()) @block_io_counts(fields = fields.clone()) + @block_io_set_streams(fields = fields.clone()) + @block_io_create_stream(fields = fields.clone()) } ) } @@ -46,12 +54,15 @@ fn block_io_get_successors(fields: zyn::syn::Fields) -> zyn::TokenStream { let fields = fields.as_named().unwrap().named.clone(); zyn::zyn!( - fn get_successors(&self, block_index: usize) -> Vec + fn get_successors(&self) -> Vec { let mut output = vec![]; @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) { - output.push(self.{{ field.1.ident }}.get_consumer_block()); + if let Some(block_index) = self.{{ field.1.ident }}.get_consumer_block() + { + output.push(block_index); + } } output } @@ -82,3 +93,132 @@ fn block_io_counts(fields: zyn::syn::Fields) -> zyn::TokenStream } ) } + +#[zyn::element] +fn block_io_set_streams(fields: zyn::syn::Fields) -> zyn::TokenStream +{ + zyn::zyn!( + fn set_anonymous_out_stream( + &mut self, + output_index: usize, + producer: oxydsp_flowgraph::edge::AnonymousStreamProducer, + ) + { + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) + { + self.{{field.1.ident}}.set_anonymous_stream(producer); + } + } + + fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: oxydsp_flowgraph::edge::AnonymousStreamConsumer) + { + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate()) + { + self.{{field.1.ident}}.set_anonymous_stream(consumer); + } + } + ) +} + +#[zyn::element] +fn block_io_create_stream(fields: zyn::syn::Fields) -> zyn::TokenStream +{ + zyn::zyn!( + fn create_anonymous_stream_for( + &mut self, + output_index: usize, + capacity: usize + ) -> (oxydsp_flowgraph::edge::AnonymousStreamProducer, oxydsp_flowgraph::edge::AnonymousStreamConsumer) + { + let (tx, rx) = match output_index + { + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) + { + {{ field.0 }} => oxydsp_flowgraph::stream::bounded_queue::<@out_inner_type(ty = field.1.ty.clone())>(capacity), + } + _ => panic!("output_index out of bounds.") + }; + + (tx.into(), rx.into()) + } + ) +} + +#[zyn::element] +fn out_inner_type(ty: zyn::syn::Type) -> zyn::TokenStream +{ + let out_ty = match ty + { + zyn::syn::Type::Path(type_path) => match &type_path.path.segments.last().unwrap().arguments + { + zyn::syn::PathArguments::AngleBracketed(args) => match args.args.first().unwrap() + { + zyn::syn::GenericArgument::Type(x) => Some(x.clone().to_token_stream()), + _ => None, + }, + _ => None, + }, + _ => None, + }; + + if out_ty.is_none() + { + bail!("Output type must be a Out type."; span = ty.span()); + } + + out_ty.unwrap() +} + +// Sync block + +#[zyn::attribute] +pub fn sync_block(#[zyn(input)] item: zyn::syn::ItemStruct) -> zyn::TokenStream +{ + let strcut_item = item.clone(); + let (impl_generics, type_generics, where_clause) = item.generics.split_for_impl(); + let fields = &item.fields.as_named().unwrap().named; + zyn::zyn!( + {{ strcut_item }} + + impl {{ impl_generics }} oxydsp_flowgraph::block::Block for {{ strcut_item.ident }} {{ type_generics }} + where {{ where_clause }} + { + fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult + { + let mut len = usize::MAX; + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate()) + { + let mut {{ field.1.ident.clone().unwrap() | ident:"{}_reader" }} = self.{{field.1.ident}}.read(); + len = len.min({{ field.1.ident.clone().unwrap() | ident:"{}_reader" }}.len()); + + } + + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) + { + let mut {{ field.1.ident.clone().unwrap() | ident: "{}_writer" }} = self.{{field.1.ident}}.write(); + len = len.min({{ field.1.ident.clone().unwrap() | ident: "{}_writer" }}.len()); + } + + for _ in 0..len + { + let ( + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) + { + {{ field.1.ident.clone().unwrap() | ident: "{}_out" }}, + } + ) = self.sync_work(( + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate()) + { + {{ field.1.ident.clone().unwrap() | ident: "{}_reader" }}.pop().unwrap(), + } + )); + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) + { + {{ field.1.ident.clone().unwrap() | ident: "{}_writer" }}.push({{ field.1.ident.clone().unwrap() | ident: "{}_out" }}).unwrap(); + } + } + oxydsp_flowgraph::block::BlockResult::Ok + } + } + ) +} diff --git a/oxydsp-flowgraph/src/block.rs b/oxydsp-flowgraph/src/block.rs index be2c2f9..1eb949f 100644 --- a/oxydsp-flowgraph/src/block.rs +++ b/oxydsp-flowgraph/src/block.rs @@ -1,4 +1,4 @@ -use crate::edge::BlockIOIndex; +use crate::edge::{AnonymousStreamConsumer, AnonymousStreamProducer, BlockIOIndex}; pub enum BlockResult { @@ -16,8 +16,18 @@ pub trait BlockIO fn set_index(&self, block_index: usize); // Number of input/output ports - fn input_count(&self); - fn output_count(&self); + fn input_count(&self) -> usize; + fn output_count(&self) -> usize; + + // Stream managment + fn set_anonymous_out_stream(&mut self, output_index: usize, producer: AnonymousStreamProducer); + fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: AnonymousStreamConsumer); + + fn create_anonymous_stream_for( + &mut self, + output_index: usize, + capacity: usize, + ) -> (AnonymousStreamProducer, AnonymousStreamConsumer); } pub trait Block @@ -25,4 +35,12 @@ pub trait Block fn work(&mut self) -> BlockResult; } +pub trait SyncBlock +{ + type Input; + type Output; + + fn sync_work(&mut self, input: Self::Input) -> Self::Output; +} + pub trait GraphableBlock: Block + BlockIO {} diff --git a/oxydsp-flowgraph/src/edge.rs b/oxydsp-flowgraph/src/edge.rs index 14d6b15..d49c3da 100644 --- a/oxydsp-flowgraph/src/edge.rs +++ b/oxydsp-flowgraph/src/edge.rs @@ -1,8 +1,12 @@ +use std::any::Any; use std::sync::Arc; use std::sync::Mutex; +use crate::stream; use crate::stream::StreamConsumer; use crate::stream::StreamProducer; +use crate::stream::StreamReader; +use crate::stream::StreamWriter; pub struct Edge { @@ -22,15 +26,55 @@ pub struct BlockIOIndex pub port_index: usize, } -pub struct In +// Needed for graph to be able to manipulate +// stream endings without knowing the generic type +pub struct AnonymousStreamProducer { - stream: Option>, - - // Will rarely be accessed - edge: Arc>, + inner: Box, } -pub struct Out +pub struct AnonymousStreamConsumer +{ + inner: Box, +} + +impl From> for AnonymousStreamProducer +{ + fn from(value: StreamProducer) -> Self + { + AnonymousStreamProducer { + inner: Box::new(value), + } + } +} + +impl From> for AnonymousStreamConsumer +{ + fn from(value: StreamConsumer) -> Self + { + AnonymousStreamConsumer { + inner: Box::new(value), + } + } +} + +impl AnonymousStreamProducer +{ + pub fn downcast(self) -> StreamProducer + { + *self.inner.downcast::>().unwrap() + } +} + +impl AnonymousStreamConsumer +{ + pub fn downcast(self) -> StreamConsumer + { + *self.inner.downcast::>().unwrap() + } +} + +pub struct In { stream: Option>, @@ -38,7 +82,15 @@ pub struct Out edge: Arc>, } -impl In +pub struct Out +{ + stream: Option>, + + // Will rarely be accessed + edge: Arc>, +} + +impl In { pub fn set_block_index(&self, index: BlockIOIndex) { @@ -49,9 +101,19 @@ impl In { self.edge.lock().unwrap().from } + + pub fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer) + { + self.stream = Some(consumer.downcast::()) + } + + pub fn read<'a>(&'a mut self) -> StreamReader<'a, T> + { + self.stream.as_mut().unwrap().read() + } } -impl Out +impl Out { pub fn set_block_index(&self, index: BlockIOIndex) { @@ -62,4 +124,25 @@ impl Out { self.edge.lock().unwrap().to } + + pub fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer) + { + self.stream = Some(producer.downcast::()) + } + + // Delegate stream creation to Out object + // which knows the stream type + pub fn create_anonymous_stream( + &self, + capacity: usize, + ) -> (AnonymousStreamProducer, AnonymousStreamConsumer) + { + let (tx, rx) = stream::bounded_queue::(capacity); + (tx.into(), rx.into()) + } + + pub fn write<'a>(&'a mut self) -> StreamWriter<'a, T> + { + self.stream.as_mut().unwrap().write() + } } diff --git a/oxydsp-flowgraph/src/main.rs b/oxydsp-flowgraph/src/main.rs index 61623ed..1c8b0e4 100644 --- a/oxydsp-flowgraph/src/main.rs +++ b/oxydsp-flowgraph/src/main.rs @@ -1,7 +1,11 @@ -use oxydsp_flowgraph::edge::{In, Out}; -use oxydsp_flowgraph_macros::BlockIO; +use oxydsp_flowgraph::{ + block::SyncBlock, + edge::{In, Out}, +}; +use oxydsp_flowgraph_macros::{BlockIO, sync_block}; #[derive(BlockIO)] +//#[sync_block] pub struct Test { #[input] @@ -11,4 +15,35 @@ pub struct Test output: Out, } +impl oxydsp_flowgraph::block::Block for Test +{ + fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult + { + let mut len = usize::MAX; + let mut input_reader = self.input.read(); + len = len.min(input_reader.len()); + let mut output_writer = self.output.write(); + len = len.min(output_writer.len()); + for _ in 0..len + { + let input = input_reader.pop().unwrap(); + let (output_out,) = self.sync_work((input,)); + output_writer.push(output_out).unwrap(); + } + oxydsp_flowgraph::block::BlockResult::Ok + } +} + +impl SyncBlock for Test +{ + type Input = (u32,); + + type Output = (u32,); + + fn sync_work(&mut self, (num,): Self::Input) -> Self::Output + { + (num,) + } +} + fn main() {} diff --git a/oxydsp-flowgraph/src/stream.rs b/oxydsp-flowgraph/src/stream.rs index 71a6780..e79ace6 100644 --- a/oxydsp-flowgraph/src/stream.rs +++ b/oxydsp-flowgraph/src/stream.rs @@ -56,8 +56,8 @@ unsafe impl Sync for StreamConsumer {} pub struct StreamWriter<'a, T> { producer: &'a StreamProducer, - first: &'a mut [MaybeUninit], - second: Option<&'a mut [MaybeUninit]>, + first: &'a UnsafeCell<[MaybeUninit]>, + second: Option<&'a UnsafeCell<[MaybeUninit]>>, written: usize, } @@ -144,6 +144,14 @@ impl StreamProducer let (start_to_tail, _tail_to_head) = start_to_head.split_at_mut_unchecked(wrapped_tail); + // Slices are wrapped into unsafe cells to provide interior mutability + // On the stream as it is much more convienient. + // + // SAFETY: + // + // This functions borrows the stream mutably. As such, only one instance + // of StreamWriter can exist for a given stream. The StreamWriter + // is thus the only on able to write or read the stream when it lives let first = head_to_end; let second = Some(start_to_tail); StreamWriter { @@ -354,7 +362,7 @@ impl<'a, T> StreamWriter<'a, T> self.len() == 0 } - pub fn push(&mut self, element: T) -> Result<(), T> + pub fn push(&self, element: T) -> Result<(), T> { if self.written < self.first.len() {