diff --git a/example/src/main.rs b/example/src/main.rs index b78cb70..377dbdf 100644 --- a/example/src/main.rs +++ b/example/src/main.rs @@ -20,31 +20,79 @@ use oxydsp_dsp::units::DigitalFrequency; use oxydsp_flowgraph::BlockIO; use oxydsp_flowgraph::block::Block; use oxydsp_flowgraph::block::BlockResult; +use oxydsp_flowgraph::block::SyncBlock; use oxydsp_flowgraph::block::SyncBlockIO; use oxydsp_flowgraph::flowgraph; use oxydsp_flowgraph::graph::FlowGraph; use oxydsp_flowgraph::io::In; +use oxydsp_flowgraph::io::Out; use oxydsp_flowgraph::io::PopIterable; use oxydsp_flowgraph::sync_block; +use crate::printer_synchronous_block::PrinterView; + #[derive(BlockIO)] +#[sync_block] pub struct Printer { #[input] input: In, + #[input] + input_b: In, + + #[output] + output_a: Out, + n: usize, } -impl SyncBlockIO for Printer +impl<'view, T> SyncBlock<'view> for Printer { - type StateView = u32; - - type Input = T; - - type Output = T; + fn sync_work(state: Self::StateView, input: Self::Input) -> Option + { + None + } } +impl Block for Printer +{ + fn work(&mut self) -> BlockResult + { + let state = PrinterView { + n: &mut self.n, + _sync_block_phantom: Default::default(), + }; + + let output_a_write = self.output_a.write(); + (&mut self.input, &mut self.input_b).pop_iter().for_each( + |((input_el, input_tag), (input_b_el, input_b_tag))| { + let new_tag = Tag::from([input_tag, input_b_tag]); + let output_a_el = ::sync_work(state, (input_el, input_b_el)); + + output_a_write.push((output_a_el, new_tag)); + }, + ); + + todo!() + } +} + +// mod printer_synchronous_block +// { +// struct PrinterView<'view, T> +// { +// n: &'view mut usize, +// _sync_block_phantom: std::marker::PhantomData<'view, T>, +// } +// impl<'view, T: 'static> oxydsp_flowgraph::block::SyncBlockIO<'view> for super::Printer +// { +// type StateView = PrinterView<'view, T>; +// type Input = (); +// type Output = (); +// } +// } + impl Printer { pub fn new(input: In) -> Self diff --git a/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs index 8a1a713..b3ff381 100644 --- a/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs +++ b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs @@ -3,6 +3,7 @@ use zyn::ToTokens; use zyn::ext::AttrExt; use zyn::ext::FieldsExt; use zyn::format_ident; +use zyn::ident; use zyn::parse_input; use zyn::syn::GenericParam; use zyn::syn::Index; @@ -11,6 +12,17 @@ use zyn::syn::TypeGenerics; use zyn::syn::parse_quote; use zyn::syn::spanned::Spanned; +mod sync; + +#[zyn::attribute] +pub fn sync_block(#[zyn(input)] item: zyn::syn::ItemStruct) -> zyn::TokenStream +{ + use sync::SyncBlockImpl; + zyn::zyn!( + @sync_block_impl(item = item) + ) +} + #[zyn::derive("BlockIO", attributes(input, output), debug = "pretty")] pub fn block_io( #[zyn(input)] ident: zyn::Extract, @@ -20,6 +32,7 @@ pub fn block_io( { let ident = ident.inner(); let (impl_generics, type_generics, where_clause) = generics.split_for_impl(); + zyn::zyn!( impl {{impl_generics}} oxydsp_flowgraph::block::BlockIO for {{ ident.clone() }} {{ type_generics }} {{ where_clause }} @@ -229,46 +242,6 @@ fn out_inner_type(ty: zyn::syn::Type) -> zyn::TokenStream out_ty.unwrap() } -// Sync block - -#[zyn::attribute] -pub fn sync_block(#[zyn(input)] item: zyn::syn::ItemStruct) -> zyn::TokenStream -{ - let lifetime: GenericParam = parse_quote!('view); - let mut generics = item.generics.clone(); - generics.params.insert(0, lifetime); - let (_impl_generics, type_generics, where_clause) = generics.split_for_impl(); - let fields = &item.fields.as_named().unwrap().named; - - let mut state_fields = vec![]; - for field in fields.iter() - { - let mut f = field.clone(); - if f.attrs.iter().any(|x| x.is("input") || x.is("output")) - { - continue; - } - - let tk = field.ty.clone().into_token_stream(); - f.ty = parse_quote!(&'view mut #tk); - state_fields.push(f); - } - - zyn::zyn!( - {{ item }} - - struct {{ item.ident | ident:"{}View" }} {{ type_generics }} - {{ where_clause }} - { - @for (field in state_fields.iter()) - { - {{ field }}, - } - _sync_block_phantom: std::marker::PhantomData<'view>, - } - ) -} - // Generate #[proc_macro] pub fn generate_pop_iterable_tuple_impl(input: TokenStream) -> TokenStream diff --git a/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/sync.rs b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/sync.rs new file mode 100644 index 0000000..0a44e93 --- /dev/null +++ b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/sync.rs @@ -0,0 +1,436 @@ +use zyn::Fields; +use zyn::ToTokens; +use zyn::ast::at; +use zyn::ext::AttrExt; +use zyn::ext::FieldsExt; +use zyn::ext::TypeExt; +use zyn::quote; +use zyn::quote::quote; +use zyn::syn::Field; +use zyn::syn::GenericParam; +use zyn::syn::parse_quote; + +// Sync block + +#[zyn::element] +pub fn sync_block_impl(item: zyn::syn::ItemStruct) -> zyn::TokenStream +{ + zyn::zyn!( + {{ item }} + + // module to keep everything clean + mod {{ item.ident | snake | ident: "{}_synchronous_block" }} + { + @sync_block_view_struct(item = item.clone()) + } + + @sync_block_syncio_impl(item = item.clone()) + ) +} + +#[zyn::element] +fn sync_block_syncio_impl(item: zyn::syn::ItemStruct) -> zyn::TokenStream +{ + let view_lifetime: GenericParam = parse_quote!('view); + let mut view_generics = item.generics.clone(); + view_generics.params.insert(0, view_lifetime); + + let (view_impl_generics, view_type_generics, _view_where_clause) = + view_generics.split_for_impl(); + let (_impl_generics, type_generics, where_clause) = item.generics.split_for_impl(); + + zyn::zyn!( + impl {{ view_impl_generics }} oxydsp_flowgraph::block::SyncBlockIO<'view> for {{ item.ident }} {{ type_generics }} + {{ where_clause }} + { + // Path within module + type StateView = {{ item.ident | snake | ident: "{}_synchronous_block" }}::{{ item.ident | ident:"{}View" }} {{ view_type_generics }}; + type Input = {{ sync_block_io_types(item.clone(), "input") }}; + type Output = {{ sync_block_io_types(item.clone(), "output") }}; + } + ) +} + +fn sync_block_io_types(item: zyn::syn::ItemStruct, io: &'static str) -> zyn::TokenStream +{ + let field_types = item + .fields + .as_named() + .unwrap() + .named + .iter() + .filter(|f| f.attrs.iter().any(|attr| attr.is(io))) + .map(|x| x.ty.clone()) + .map(|ty| { + match ty + .as_path() + .unwrap() + .segments + .last() + .unwrap() + .arguments + .clone() + { + zyn::syn::PathArguments::AngleBracketed(args) => args.args.to_token_stream(), + zyn::syn::PathArguments::None => panic!(), + zyn::syn::PathArguments::Parenthesized(_) => panic!(), + } + }) + .collect::>(); + + zyn::zyn!( + @if (field_types.is_empty()) + { + () + } @else if (field_types.len() == 1) + { + {{ field_types[0].clone() }} + } @else + { + ( + @for (x in field_types.iter()) + { + {{ x }}, + } + ) + } + ) + .into_token_stream() +} + +#[zyn::element] +fn sync_block_view_struct(item: zyn::syn::ItemStruct) -> zyn::TokenStream +{ + // Create view liftime to add to struct definition + let lifetime: GenericParam = parse_quote!('view); + let mut generics = item.generics.clone(); + generics.params.insert(0, lifetime); + + let (_impl_generics, type_generics, where_clause) = generics.split_for_impl(); + let fields = &item.fields.as_named().unwrap().named; + + let mut state_fields = vec![]; + for field in fields.iter() + { + let mut f = field.clone(); + if f.attrs.iter().any(|x| x.is("input") || x.is("output")) + { + continue; + } + + let tk = field.ty.clone().into_token_stream(); + f.ty = parse_quote!(&'view mut #tk); + state_fields.push(f); + } + + let phantom_types = generics + .params + .iter() + .map(|param| { + match param + { + GenericParam::Type(t) => + { + let ident = &t.ident; + quote!(#ident) + } + GenericParam::Lifetime(l) => + { + let lifetime = &l.lifetime; + // Lifetimes need to be wrapped in a reference or similar + quote!(& #lifetime ()) + } + GenericParam::Const(_) => + { + // Const generics generally don't need PhantomData + // as they don't affect variance or drop-check. + quote!() + } + } + }) + .filter(|tokens| !tokens.is_empty()); + + zyn::zyn!( + #[derive(Clone, Copy)] + pub struct {{ item.ident | ident:"{}View" }} {{ type_generics }} + {{ where_clause }} + { + @for (field in state_fields.iter()) + { + pub {{ field }}, + } + pub _sync_block_phantom: std::marker::PhantomData<( + @for (ty in phantom_types) + { + {{ ty }}, + } + )>, + } + ) +} + +#[zyn::element] +fn sync_block_impl_block(item: zyn::syn::ItemStruct) -> zyn::TokenStream +{ + let item2 = item.clone(); + let (impl_generics, type_generics, where_clause) = item2.generics.split_for_impl(); + let fields = &item.fields.as_named().unwrap().named; + + // Retrieve fields + let input_fields = fields + .iter() + .filter(|f| f.attrs.iter().any(|attr| attr.is("input"))) + .cloned() + .collect::>(); + let output_fields = fields + .iter() + .filter(|f| f.attrs.iter().any(|attr| attr.is("output"))) + .cloned() + .collect::>(); + + zyn::zyn!( + impl {{ impl_generics }} oxydsp_flowgraph::block::Block for {{ item.ident }} {{ type_generics }} + {{ where_clause }} + { + fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult + { + let state = {{ sync_block_make_view_struct(item.clone()) }}; + + // Get writers from outputs + let mut max_len = usize::MAX; + @for (out_field in output_fields.iter()) + { + let {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}} = self.{{ out_field.ident }}.write(); + max_len = max_len.min({{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.len()); + } + + @if (!input_fields.is_empty()) + { + @sync_block_block_impl_with_inputs(item = item.clone(), input_fields = input_fields.clone(), output_fields = output_fields.clone()) + } + + oxydsp_flowgraph::block::BlockResult::Ok + } + } + ) +} + +#[zyn::element] +fn sync_block_block_impl_with_inputs( + item: zyn::syn::ItemStruct, + output_fields: Vec, +) -> zyn::TokenStream +{ + zyn::zyn!( + + for _ in 0..max_len + { + @if (output_fields.len() == 1) + { + let {{output_fields[0].ident.clone().unwrap() | ident:"{}_element"}} + } + @else + { + let (@for (out_field in output_fields.iter()) + { + {{out_field.ident.clone().unwrap() | ident:"{}_element"}}, + } + ) + } + = ::sync_work(state, ()).unwrap(); + } + + // Iterate on inputs + ( + @for (in_field in input_fields.iter()) + { + &mut self.{{ in_field.ident }}, + } + ).pop_iter() + .zip(0..max_len) + .for_each( + // Deconstruct foreach arguments + | + ( + (@for (in_field in input_fields.iter()) + { + ({{in_field.ident.clone().unwrap() | ident:"{}_element"}}, + {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}), + }), + _ // Ignore index + ) + | + { + // Create output tag + let tag = oxydsp_flowgraph::tag::merge_tag_opts([ + @for (in_field in input_fields.iter()) + { + {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}, + }, + ]); + + // Compute output sample + @if (output_fields.is_empty()) + { + let _ + } + @else if (output_fields.len() == 1) + { + let {{output_fields[0].ident.clone().unwrap() | ident:"{}_element"}} + } + @else + { + let (@for (out_field in output_fields.iter()) + { + {{out_field.ident.clone().unwrap() | ident:"{}_element"}}, + } + ) + } + = ::sync_work(state, + @if (input_fields.len() == 1) + { + {{input_fields[0].ident.clone().unwrap() | ident:"{}_element"}}, + } + @else + { + (@for (in_field in input_fields.iter()) + { + {{in_field.ident.clone().unwrap() | ident:"{}_element"}}, + } + ) + } + ).unwrap(); + + // Now the output samples must be sent to their resepective outputs + @for (out_field in output_fields.iter()) + { + {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.push( + ( + {{ out_field.ident.clone().unwrap() | ident: "{}_element"}}, + tag.clone() + ) + ); + } + // + + } + ); + ) +} + +#[zyn::element] +fn sync_block_block_impl_with_inputs( + item: zyn::syn::ItemStruct, + input_fields: Vec, + output_fields: Vec, +) -> zyn::TokenStream +{ + zyn::zyn!( + + // Iterate on inputs + ( + @for (in_field in input_fields.iter()) + { + &mut self.{{ in_field.ident }}, + } + ).pop_iter() + .zip(0..max_len) + .for_each( + // Deconstruct foreach arguments + | + ( + (@for (in_field in input_fields.iter()) + { + ({{in_field.ident.clone().unwrap() | ident:"{}_element"}}, + {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}), + }), + _ // Ignore index + ) + | + { + // Create output tag + let tag = oxydsp_flowgraph::tag::merge_tag_opts([ + @for (in_field in input_fields.iter()) + { + {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}, + }, + ]); + + // Compute output sample + @if (output_fields.is_empty()) + { + let _ + } + @else if (output_fields.len() == 1) + { + let {{output_fields[0].ident.clone().unwrap() | ident:"{}_element"}} + } + @else + { + let (@for (out_field in output_fields.iter()) + { + {{out_field.ident.clone().unwrap() | ident:"{}_element"}}, + } + ) + } + = ::sync_work(state, + @if (input_fields.len() == 1) + { + {{input_fields[0].ident.clone().unwrap() | ident:"{}_element"}}, + } + @else + { + (@for (in_field in input_fields.iter()) + { + {{in_field.ident.clone().unwrap() | ident:"{}_element"}}, + } + ) + } + ).unwrap(); + + // Now the output samples must be sent to their resepective outputs + @for (out_field in output_fields.iter()) + { + {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.push( + ( + {{ out_field.ident.clone().unwrap() | ident: "{}_element"}}, + tag.clone() + ) + ); + } + // + + } + ); + ) +} + +fn sync_block_make_view_struct(item: zyn::syn::ItemStruct) -> zyn::TokenStream +{ + let fields = &item.fields.as_named().unwrap().named; + + let mut state_fields = vec![]; + for field in fields.iter() + { + let mut f = field.clone(); + if f.attrs.iter().any(|x| x.is("input") || x.is("output")) + { + continue; + } + + let tk = field.ty.clone().into_token_stream(); + f.ty = parse_quote!(&'view mut #tk); + state_fields.push(f); + } + + zyn::zyn!( + {{ item.ident | ident:"{}View" }} { + @for (field in state_fields) + { + {{field.ident}}: &mut self.{{ field.ident }}, + } + _sync_block_phantom: Default::default(), + } + ) + .into_token_stream() +} diff --git a/oxydsp-flowgraph/src/block.rs b/oxydsp-flowgraph/src/block.rs index e7d169b..612b589 100644 --- a/oxydsp-flowgraph/src/block.rs +++ b/oxydsp-flowgraph/src/block.rs @@ -1,7 +1,6 @@ -use crate::{ - edge::BlockIOIndex, - io::{AnonymousStreamConsumer, AnonymousStreamProducer}, -}; +use crate::edge::BlockIOIndex; +use crate::io::AnonymousStreamConsumer; +use crate::io::AnonymousStreamProducer; pub enum BlockResult { @@ -51,14 +50,14 @@ pub trait Block // Represents the input, output, state types // that a SyncBlock will have to interacti with -pub trait SyncBlockIO +pub trait SyncBlockIO<'view> { type StateView; type Input; type Output; } -pub trait SyncBlock: SyncBlockIO +pub trait SyncBlock<'view>: SyncBlockIO<'view> { fn sync_work(state: Self::StateView, input: Self::Input) -> Option; } diff --git a/oxydsp-flowgraph/src/tag.rs b/oxydsp-flowgraph/src/tag.rs index e214d7c..e9e13b2 100644 --- a/oxydsp-flowgraph/src/tag.rs +++ b/oxydsp-flowgraph/src/tag.rs @@ -1,9 +1,9 @@ -use std::{ - any::Any, - collections::HashMap, - ops::{Deref, DerefMut}, - sync::{Arc, Mutex}, -}; +use std::any::Any; +use std::collections::HashMap; +use std::ops::Deref; +use std::ops::DerefMut; +use std::sync::Arc; +use std::sync::Mutex; // Tags a particular sample within a specific stream #[derive(Clone)] @@ -20,6 +20,19 @@ pub struct Tag pub data: Arc>>>, } +impl Tag +{ + pub fn merge_tag_opts(tag_opts: [Option; N]) -> Option + { + let mut out_tag = None; + for tag in tag_opts.iter() + { + out_tag = out_tag.merge(tag); + } + out_tag + } +} + pub trait TagValue: Clone {} impl TagValue for T where T: Clone {}