This commit is contained in:
2026-03-20 23:12:46 +01:00
parent ac5c9eeaa0
commit 582d876abf
6 changed files with 70 additions and 43 deletions

View File

@ -35,6 +35,9 @@ pub struct Printer<T: 'static + Display>
#[input]
input: In<T>,
#[output]
output: Out<T>,
n: usize,
}

View File

@ -1,15 +1,18 @@
use std::ops::{Add, Mul};
use std::ops::Add;
use std::ops::Mul;
use oxydsp_flowgraph::{
BlockIO,
block::{Block, BlockResult, SyncBlock},
io::{In, Out, PopIterable},
sync_block,
tag::TagMergable,
};
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::block::SyncBlock;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::io::PopIterable;
use oxydsp_flowgraph::sync_block;
use oxydsp_flowgraph::tag::TagMergable;
#[derive(BlockIO)]
#[sync_block]
#[sync_block(tagged)]
pub struct Adder<Ia, Ib, O>
where
Ia: Add<Ib, Output = O> + 'static,

View File

@ -1,9 +1,11 @@
use oxydsp_flowgraph::{
BlockIO,
block::{Block, BlockResult},
io::{In, Out, PopIterable, stream},
tag::Tag,
};
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::io::PopIterable;
use oxydsp_flowgraph::io::stream;
use oxydsp_flowgraph::tag::Tag;
#[derive(BlockIO)]
pub struct Map<I: 'static, O: 'static, F>
@ -84,12 +86,12 @@ impl<T: Clone + 'static> Block for Repeat<T>
{
if self.remaining == 0 || self.current.is_none()
{
self.current = Some(reader.pop_tagged().unwrap());
self.current = Some(reader.pop().unwrap().into());
self.remaining = self.repetitions;
}
writer
.push(self.current.clone().unwrap())
.push(self.current.clone().unwrap().into())
.unwrap_or_else(|_| panic!());
match &mut self.current

View File

@ -1,10 +1,14 @@
use std::sync::mpsc::{Receiver, Sender, SyncSender};
use std::sync::mpsc::Receiver;
use std::sync::mpsc::Sender;
use std::sync::mpsc::SyncSender;
use oxydsp_flowgraph::{
BlockIO,
block::{Block, BlockResult},
io::{In, Out, PopIterable, stream},
};
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::io::PopIterable;
use oxydsp_flowgraph::io::stream;
#[derive(BlockIO)]
pub struct RxSource<Rx, I: 'static>

View File

@ -5,7 +5,6 @@ use zyn::ast::at;
use zyn::ext::AttrExt;
use zyn::ext::FieldsExt;
use zyn::ext::TypeExt;
use zyn::quote;
use zyn::quote::quote;
use zyn::syn::Field;
use zyn::syn::GenericParam;
@ -217,9 +216,18 @@ fn sync_block_impl_block(item: zyn::syn::ItemStruct) -> zyn::TokenStream
@for (out_field in output_fields.iter())
{
let {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}} = self.{{ out_field.ident }}.write();
max_len = max_len.min({{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.len());
}
// Compute max_len
let max_len = [
usize::MAX,
@for (out_field in output_fields.iter())
{
{{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.len(),
}
].iter().min().unwrap();
@if (!input_fields.is_empty())
{
@sync_block_block_impl_with_inputs(item = item.clone(), input_fields = input_fields.clone(), output_fields = output_fields.clone())
@ -289,25 +297,22 @@ fn sync_block_block_impl_with_inputs(
&mut self.{{ in_field.ident }},
}
).pop_iter()
.zip(0..max_len)
.take(max_len)
.for_each(
// Deconstruct foreach arguments
|
(
(@for (in_field in input_fields.iter())
{
({{in_field.ident.clone().unwrap() | ident:"{}_element"}},
oxydsp_flowgraph::tag::Tagged({{in_field.ident.clone().unwrap() | ident:"{}_element"}},
{{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}),
}),
_ // Ignore index
)
})
|
{
// Create output tag
let tag = oxydsp_flowgraph::tag::Tag::merge_tag_opts([
let common_tag = oxydsp_flowgraph::tag::Tag::merge_tag_opts([
@for (in_field in input_fields.iter())
{
{{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}},
{{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone(),
}
]);
@ -319,26 +324,26 @@ fn sync_block_block_impl_with_inputs(
}
@else if (output_fields.len() == 1)
{
let {{output_fields[0].ident.clone().unwrap() | ident:"{}_element"}}
let oxydsp_flowgraph::tag::Tagged({{output_fields[0].ident.clone().unwrap() | ident:"{}_element"}}, {{output_fields[0].ident.clone().unwrap() | ident:"{}_tag_opt"}})
}
@else
{
let (@for (out_field in output_fields.iter())
{
{{out_field.ident.clone().unwrap() | ident:"{}_element"}},
oxydsp_flowgraph::tag::Tagged({{out_field.ident.clone().unwrap() | ident:"{}_element"}}, {{out_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}),
}
)
}
= <Self as oxydsp_flowgraph::block::SyncBlock>::sync_work(state,
@if (input_fields.len() == 1)
{
{{input_fields[0].ident.clone().unwrap() | ident:"{}_element"}},
oxydsp_flowgraph::tag::Tagged({{input_fields[0].ident.clone().unwrap() | ident:"{}_element"}}, {{input_fields[0].ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone())
}
@else
{
(@for (in_field in input_fields.iter())
{
{{in_field.ident.clone().unwrap() | ident:"{}_element"}},
oxydsp_flowgraph::tag::Tagged({{in_field.ident.clone().unwrap() | ident:"{}_element"}}, {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone()),
}
)
}
@ -348,9 +353,9 @@ fn sync_block_block_impl_with_inputs(
@for (out_field in output_fields.iter())
{
{{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.push(
(
oxydsp_flowgraph::tag::Tagged(
{{ out_field.ident.clone().unwrap() | ident: "{}_element"}},
tag.clone()
{{ out_field.ident.clone().unwrap() | ident: "{}_tag_opt"}}.merge(common_tag),
)
);
}

View File

@ -144,7 +144,7 @@ impl<T: 'static> Out<T>
{
Some(tag) =>
{
let _ = writer.push_tagged(elt.0, tag);
let _ = writer.push(Tagged(elt.0, Some(tag)));
}
None =>
{
@ -174,6 +174,11 @@ impl<T> InReader<'_, T>
self.data_reader.len()
}
pub fn is_empty(&self) -> bool
{
self.len() == 0
}
pub fn pop(&self) -> Option<Tagged<T>>
{
let data = self.data_reader.pop_with_index();
@ -208,7 +213,12 @@ impl<T> OutWriter<'_, T>
self.data_writer.len().min(self.tag_writer.len())
}
pub fn push(&self, data: Tagged<T>) -> Result<(), (T, Option<Tag>)>
pub fn is_empty(&self) -> bool
{
self.len() == 0
}
pub fn push(&self, data: Tagged<T>) -> Result<(), Tagged<T>>
{
let (data, tag) = data.into();
match self.data_writer.push(data)
@ -219,13 +229,13 @@ impl<T> OutWriter<'_, T>
Ok(())
}
Ok(_) => Ok(()),
Err(data) => Err((data, tag)),
Err(data) => Err((data, tag).into()),
}
}
pub fn push_no_tag(&self, data: T) -> Result<(), T>
{
self.data_writer.push(data.into())
self.data_writer.push(data)
}
}