diff --git a/example/src/main.rs b/example/src/main.rs index a6c4804..bcc5507 100644 --- a/example/src/main.rs +++ b/example/src/main.rs @@ -35,6 +35,9 @@ pub struct Printer #[input] input: In, + #[output] + output: Out, + n: usize, } diff --git a/oxydsp-dsp/src/blocks/math/basic.rs b/oxydsp-dsp/src/blocks/math/basic.rs index ede9223..17557f6 100644 --- a/oxydsp-dsp/src/blocks/math/basic.rs +++ b/oxydsp-dsp/src/blocks/math/basic.rs @@ -1,15 +1,18 @@ -use std::ops::{Add, Mul}; +use std::ops::Add; +use std::ops::Mul; -use oxydsp_flowgraph::{ - BlockIO, - block::{Block, BlockResult, SyncBlock}, - io::{In, Out, PopIterable}, - sync_block, - tag::TagMergable, -}; +use oxydsp_flowgraph::BlockIO; +use oxydsp_flowgraph::block::Block; +use oxydsp_flowgraph::block::BlockResult; +use oxydsp_flowgraph::block::SyncBlock; +use oxydsp_flowgraph::io::In; +use oxydsp_flowgraph::io::Out; +use oxydsp_flowgraph::io::PopIterable; +use oxydsp_flowgraph::sync_block; +use oxydsp_flowgraph::tag::TagMergable; #[derive(BlockIO)] -#[sync_block] +#[sync_block(tagged)] pub struct Adder where Ia: Add + 'static, diff --git a/oxydsp-dsp/src/blocks/utilities/adapters.rs b/oxydsp-dsp/src/blocks/utilities/adapters.rs index 930b218..1a17f42 100644 --- a/oxydsp-dsp/src/blocks/utilities/adapters.rs +++ b/oxydsp-dsp/src/blocks/utilities/adapters.rs @@ -1,9 +1,11 @@ -use oxydsp_flowgraph::{ - BlockIO, - block::{Block, BlockResult}, - io::{In, Out, PopIterable, stream}, - tag::Tag, -}; +use oxydsp_flowgraph::BlockIO; +use oxydsp_flowgraph::block::Block; +use oxydsp_flowgraph::block::BlockResult; +use oxydsp_flowgraph::io::In; +use oxydsp_flowgraph::io::Out; +use oxydsp_flowgraph::io::PopIterable; +use oxydsp_flowgraph::io::stream; +use oxydsp_flowgraph::tag::Tag; #[derive(BlockIO)] pub struct Map @@ -84,12 +86,12 @@ impl Block for Repeat { if self.remaining == 0 || self.current.is_none() { - self.current = Some(reader.pop_tagged().unwrap()); + self.current = Some(reader.pop().unwrap().into()); self.remaining = self.repetitions; } writer - .push(self.current.clone().unwrap()) + .push(self.current.clone().unwrap().into()) .unwrap_or_else(|_| panic!()); match &mut self.current diff --git a/oxydsp-dsp/src/blocks/utilities/channels.rs b/oxydsp-dsp/src/blocks/utilities/channels.rs index 67db127..369b29e 100644 --- a/oxydsp-dsp/src/blocks/utilities/channels.rs +++ b/oxydsp-dsp/src/blocks/utilities/channels.rs @@ -1,10 +1,14 @@ -use std::sync::mpsc::{Receiver, Sender, SyncSender}; +use std::sync::mpsc::Receiver; +use std::sync::mpsc::Sender; +use std::sync::mpsc::SyncSender; -use oxydsp_flowgraph::{ - BlockIO, - block::{Block, BlockResult}, - io::{In, Out, PopIterable, stream}, -}; +use oxydsp_flowgraph::BlockIO; +use oxydsp_flowgraph::block::Block; +use oxydsp_flowgraph::block::BlockResult; +use oxydsp_flowgraph::io::In; +use oxydsp_flowgraph::io::Out; +use oxydsp_flowgraph::io::PopIterable; +use oxydsp_flowgraph::io::stream; #[derive(BlockIO)] pub struct RxSource diff --git a/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/sync.rs b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/sync.rs index 21e0a7d..abfedbd 100644 --- a/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/sync.rs +++ b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/sync.rs @@ -5,7 +5,6 @@ use zyn::ast::at; use zyn::ext::AttrExt; use zyn::ext::FieldsExt; use zyn::ext::TypeExt; -use zyn::quote; use zyn::quote::quote; use zyn::syn::Field; use zyn::syn::GenericParam; @@ -217,9 +216,18 @@ fn sync_block_impl_block(item: zyn::syn::ItemStruct) -> zyn::TokenStream @for (out_field in output_fields.iter()) { let {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}} = self.{{ out_field.ident }}.write(); - max_len = max_len.min({{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.len()); } + // Compute max_len + let max_len = [ + usize::MAX, + @for (out_field in output_fields.iter()) + { + {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.len(), + } + ].iter().min().unwrap(); + + @if (!input_fields.is_empty()) { @sync_block_block_impl_with_inputs(item = item.clone(), input_fields = input_fields.clone(), output_fields = output_fields.clone()) @@ -289,25 +297,22 @@ fn sync_block_block_impl_with_inputs( &mut self.{{ in_field.ident }}, } ).pop_iter() - .zip(0..max_len) + .take(max_len) .for_each( // Deconstruct foreach arguments | - ( (@for (in_field in input_fields.iter()) { - ({{in_field.ident.clone().unwrap() | ident:"{}_element"}}, + oxydsp_flowgraph::tag::Tagged({{in_field.ident.clone().unwrap() | ident:"{}_element"}}, {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}), - }), - _ // Ignore index - ) + }) | { // Create output tag - let tag = oxydsp_flowgraph::tag::Tag::merge_tag_opts([ + let common_tag = oxydsp_flowgraph::tag::Tag::merge_tag_opts([ @for (in_field in input_fields.iter()) { - {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}, + {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone(), } ]); @@ -319,26 +324,26 @@ fn sync_block_block_impl_with_inputs( } @else if (output_fields.len() == 1) { - let {{output_fields[0].ident.clone().unwrap() | ident:"{}_element"}} + let oxydsp_flowgraph::tag::Tagged({{output_fields[0].ident.clone().unwrap() | ident:"{}_element"}}, {{output_fields[0].ident.clone().unwrap() | ident:"{}_tag_opt"}}) } @else { let (@for (out_field in output_fields.iter()) { - {{out_field.ident.clone().unwrap() | ident:"{}_element"}}, + oxydsp_flowgraph::tag::Tagged({{out_field.ident.clone().unwrap() | ident:"{}_element"}}, {{out_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}), } ) } = ::sync_work(state, @if (input_fields.len() == 1) { - {{input_fields[0].ident.clone().unwrap() | ident:"{}_element"}}, + oxydsp_flowgraph::tag::Tagged({{input_fields[0].ident.clone().unwrap() | ident:"{}_element"}}, {{input_fields[0].ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone()) } @else { (@for (in_field in input_fields.iter()) { - {{in_field.ident.clone().unwrap() | ident:"{}_element"}}, + oxydsp_flowgraph::tag::Tagged({{in_field.ident.clone().unwrap() | ident:"{}_element"}}, {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone()), } ) } @@ -348,9 +353,9 @@ fn sync_block_block_impl_with_inputs( @for (out_field in output_fields.iter()) { {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.push( - ( + oxydsp_flowgraph::tag::Tagged( {{ out_field.ident.clone().unwrap() | ident: "{}_element"}}, - tag.clone() + {{ out_field.ident.clone().unwrap() | ident: "{}_tag_opt"}}.merge(common_tag), ) ); } diff --git a/oxydsp-flowgraph/src/io.rs b/oxydsp-flowgraph/src/io.rs index cd2df74..3178470 100644 --- a/oxydsp-flowgraph/src/io.rs +++ b/oxydsp-flowgraph/src/io.rs @@ -144,7 +144,7 @@ impl Out { Some(tag) => { - let _ = writer.push_tagged(elt.0, tag); + let _ = writer.push(Tagged(elt.0, Some(tag))); } None => { @@ -174,6 +174,11 @@ impl InReader<'_, T> self.data_reader.len() } + pub fn is_empty(&self) -> bool + { + self.len() == 0 + } + pub fn pop(&self) -> Option> { let data = self.data_reader.pop_with_index(); @@ -208,7 +213,12 @@ impl OutWriter<'_, T> self.data_writer.len().min(self.tag_writer.len()) } - pub fn push(&self, data: Tagged) -> Result<(), (T, Option)> + pub fn is_empty(&self) -> bool + { + self.len() == 0 + } + + pub fn push(&self, data: Tagged) -> Result<(), Tagged> { let (data, tag) = data.into(); match self.data_writer.push(data) @@ -219,13 +229,13 @@ impl OutWriter<'_, T> Ok(()) } Ok(_) => Ok(()), - Err(data) => Err((data, tag)), + Err(data) => Err((data, tag).into()), } } pub fn push_no_tag(&self, data: T) -> Result<(), T> { - self.data_writer.push(data.into()) + self.data_writer.push(data) } }