From 866a5dd50183e3336380c71d024f0d9d0107e60e Mon Sep 17 00:00:00 2001 From: Albin Chaboissier Date: Fri, 13 Mar 2026 21:57:10 +0100 Subject: [PATCH] Working base --- Cargo.lock | 11 + Cargo.toml | 2 +- example/Cargo.toml | 2 + example/out.dot | 15 + example/src/main.rs | 59 ++- oxydsp-dsp/Cargo.toml | 7 + oxydsp-dsp/src/blocks.rs | 2 + oxydsp-dsp/src/blocks/math.rs | 1 + oxydsp-dsp/src/blocks/math/basic.rs | 61 +++ oxydsp-dsp/src/blocks/utilities.rs | 1 + oxydsp-dsp/src/blocks/utilities/iter.rs | 46 ++ oxydsp-dsp/src/lib.rs | 1 + oxydsp-dsp/src/main.rs | 1 + oxydsp-flowgraph-macros/src/lib.rs | 224 -------- oxydsp-flowgraph/Cargo.toml | 2 +- .../oxydsp-flowgraph-macros}/Cargo.toml | 0 .../oxydsp-flowgraph-macros/src/lib.rs | 492 ++++++++++++++++++ oxydsp-flowgraph/src/block.rs | 16 +- oxydsp-flowgraph/src/edge.rs | 119 +++++ oxydsp-flowgraph/src/graph.rs | 139 +++++ oxydsp-flowgraph/src/lib.rs | 1 + oxydsp-flowgraph/src/main.rs | 49 -- oxydsp-flowgraph/src/stream.rs | 120 +++-- 23 files changed, 1050 insertions(+), 321 deletions(-) create mode 100644 example/out.dot create mode 100644 oxydsp-dsp/Cargo.toml create mode 100644 oxydsp-dsp/src/blocks.rs create mode 100644 oxydsp-dsp/src/blocks/math.rs create mode 100644 oxydsp-dsp/src/blocks/math/basic.rs create mode 100644 oxydsp-dsp/src/blocks/utilities.rs create mode 100644 oxydsp-dsp/src/blocks/utilities/iter.rs create mode 100644 oxydsp-dsp/src/lib.rs create mode 100644 oxydsp-dsp/src/main.rs delete mode 100644 oxydsp-flowgraph-macros/src/lib.rs rename {oxydsp-flowgraph-macros => oxydsp-flowgraph/oxydsp-flowgraph-macros}/Cargo.toml (100%) create mode 100644 oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 42ffa2c..e586521 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5,6 +5,17 @@ version = 4 [[package]] name = "example" version = "0.1.0" +dependencies = [ + "oxydsp-dsp", + "oxydsp-flowgraph", +] + +[[package]] +name = "oxydsp-dsp" +version = "0.1.0" +dependencies = [ + "oxydsp-flowgraph", +] [[package]] name = "oxydsp-flowgraph" diff --git a/Cargo.toml b/Cargo.toml index 795a2b8..26be4aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,3 @@ [workspace] resolver = "3" -members = ["example","oxydsp-flowgraph", "oxydsp-flowgraph-macros"] +members = ["example", "oxydsp-dsp","oxydsp-flowgraph"] diff --git a/example/Cargo.toml b/example/Cargo.toml index 5d4c622..eb0d5f1 100644 --- a/example/Cargo.toml +++ b/example/Cargo.toml @@ -4,3 +4,5 @@ version = "0.1.0" edition = "2024" [dependencies] +oxydsp-flowgraph = {path = "../oxydsp-flowgraph/"} +oxydsp-dsp = {path = "../oxydsp-dsp/"} diff --git a/example/out.dot b/example/out.dot new file mode 100644 index 0000000..bb8e40c --- /dev/null +++ b/example/out.dot @@ -0,0 +1,15 @@ + + digraph G { + node [shape=record]; + rankdir=TB; + IterSource_0 [label="{ IterSource |{ output} }"]; +IterSource_1 [label="{ IterSource |{ output} }"]; +Adder_2 [label="{ { input_a| input_b}| Adder |{ output} }"]; +Printer_3 [label="{ { input}| Printer }"]; + + IterSource_0:o0 -> Adder_2:i0 [label="usize"]; +IterSource_1:o0 -> Adder_2:i1 [label="usize"]; +Adder_2:o0 -> Printer_3:i0 [label="usize"]; + + } + \ No newline at end of file diff --git a/example/src/main.rs b/example/src/main.rs index a4f2e4f..a89e7d0 100644 --- a/example/src/main.rs +++ b/example/src/main.rs @@ -1,4 +1,61 @@ +use std::{fmt::Display, fs::File, io::Write}; + +use oxydsp_dsp::blocks::{math::basic::Adder, utilities::iter::IterSource}; +use oxydsp_flowgraph::{ + BlockIO, + block::{Block, BlockResult}, + edge::{In, PopIterable}, + flowgraph, + graph::FlowGraph, +}; + +#[derive(BlockIO)] +pub struct Printer +{ + #[input] + input: In, + + n: usize, +} + +impl Printer +{ + pub fn new(input: In) -> Self + { + Self { input, n: 0 } + } +} + +impl Block for Printer +where + T: Display, +{ + fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult + { + for x in self.input.pop_iter() + { + if self.n.is_multiple_of(2usize.pow(20)) + { + println!("{}", x); + self.n = 0; + } + self.n += 1; + } + BlockResult::Ok + } +} + fn main() { - println!("Hello, world!"); + let (iter_a, a) = IterSource::new(0usize..); + let (iter_b, b) = IterSource::new(0usize..); + let (adder, added) = Adder::new(a, b); + let printer = Printer::new(added); + + let graph = flowgraph![iter_a, iter_b, adder, printer]; + File::create("out.dot") + .unwrap() + .write_all(graph.get_dot().as_bytes()) + .unwrap(); + graph.run(); } diff --git a/oxydsp-dsp/Cargo.toml b/oxydsp-dsp/Cargo.toml new file mode 100644 index 0000000..e13b74a --- /dev/null +++ b/oxydsp-dsp/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "oxydsp-dsp" +version = "0.1.0" +edition = "2024" + +[dependencies] +oxydsp-flowgraph = {path = "../oxydsp-flowgraph/"} diff --git a/oxydsp-dsp/src/blocks.rs b/oxydsp-dsp/src/blocks.rs new file mode 100644 index 0000000..ecff8e3 --- /dev/null +++ b/oxydsp-dsp/src/blocks.rs @@ -0,0 +1,2 @@ +pub mod math; +pub mod utilities; diff --git a/oxydsp-dsp/src/blocks/math.rs b/oxydsp-dsp/src/blocks/math.rs new file mode 100644 index 0000000..38883ee --- /dev/null +++ b/oxydsp-dsp/src/blocks/math.rs @@ -0,0 +1 @@ +pub mod basic; diff --git a/oxydsp-dsp/src/blocks/math/basic.rs b/oxydsp-dsp/src/blocks/math/basic.rs new file mode 100644 index 0000000..37b7c03 --- /dev/null +++ b/oxydsp-dsp/src/blocks/math/basic.rs @@ -0,0 +1,61 @@ +use std::ops::Add; + +use oxydsp_flowgraph::{ + BlockIO, + block::{Block, BlockResult}, + edge::{In, Out, PopIterable, stream}, +}; + +#[derive(BlockIO)] +pub struct Adder +where + Ia: Add + 'static, + Ib: 'static, + O: 'static, +{ + #[input] + input_a: In, + + #[input] + input_b: In, + + #[output] + output: Out, +} + +impl Adder +where + Ia: Add + 'static, + Ib: 'static, + O: 'static, +{ + pub fn new(input_a: In, input_b: In) -> (Self, In) + { + let (output, added) = stream(); + ( + Self { + input_a, + input_b, + output, + }, + added, + ) + } +} + +impl Block for Adder +where + Ia: Add + 'static, + Ib: 'static, + O: 'static, +{ + fn work(&mut self) -> BlockResult + { + self.output.push_iter( + (&mut self.input_a, &mut self.input_b) + .pop_iter() + .map(|(a, b)| a + b), + ); + BlockResult::Ok + } +} diff --git a/oxydsp-dsp/src/blocks/utilities.rs b/oxydsp-dsp/src/blocks/utilities.rs new file mode 100644 index 0000000..9708a97 --- /dev/null +++ b/oxydsp-dsp/src/blocks/utilities.rs @@ -0,0 +1 @@ +pub mod iter; diff --git a/oxydsp-dsp/src/blocks/utilities/iter.rs b/oxydsp-dsp/src/blocks/utilities/iter.rs new file mode 100644 index 0000000..c5d5e84 --- /dev/null +++ b/oxydsp-dsp/src/blocks/utilities/iter.rs @@ -0,0 +1,46 @@ +use oxydsp_flowgraph::{ + BlockIO, + block::{Block, BlockResult}, + edge::{In, Out, stream}, +}; + +#[derive(BlockIO)] +pub struct IterSource +where + I::Item: 'static, +{ + iter: I, + + #[output] + output: Out, +} + +impl IterSource +where + I: Iterator, + I::Item: 'static, +{ + pub fn new(iter: I) -> (Self, In) + { + let (output, items) = stream(); + (Self { iter, output }, items) + } +} + +impl Block for IterSource +where + I: Iterator, + I::Item: 'static, +{ + fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult + { + if self.output.push_iter(&mut self.iter) + { + BlockResult::Ok + } + else + { + BlockResult::Terminated + } + } +} diff --git a/oxydsp-dsp/src/lib.rs b/oxydsp-dsp/src/lib.rs new file mode 100644 index 0000000..049a8aa --- /dev/null +++ b/oxydsp-dsp/src/lib.rs @@ -0,0 +1 @@ +pub mod blocks; diff --git a/oxydsp-dsp/src/main.rs b/oxydsp-dsp/src/main.rs new file mode 100644 index 0000000..f328e4d --- /dev/null +++ b/oxydsp-dsp/src/main.rs @@ -0,0 +1 @@ +fn main() {} diff --git a/oxydsp-flowgraph-macros/src/lib.rs b/oxydsp-flowgraph-macros/src/lib.rs deleted file mode 100644 index 6340d5b..0000000 --- a/oxydsp-flowgraph-macros/src/lib.rs +++ /dev/null @@ -1,224 +0,0 @@ -use core::panic; - -use proc_macro::Ident; -use proc_macro::Spacing; -use zyn::ToTokens; -use zyn::ext::AttrExt; -use zyn::ext::FieldsExt; -use zyn::syn::spanned::Spanned; - -#[zyn::derive("BlockIO", attributes(input, output))] -pub fn block_io( - #[zyn(input)] ident: zyn::Extract, - #[zyn(input)] generics: zyn::Extract, - #[zyn(input)] fields: zyn::Fields, -) -> zyn::TokenStream -{ - let (impl_generics, type_generics, where_clause) = generics.split_for_impl(); - zyn::zyn!( - impl {{impl_generics}} oxydsp_flowgraph::block::BlockIO for {{ ident.inner() }} {{ type_generics }} - where {{ where_clause }} - { - @block_io_set_index(fields = fields.clone()) - @block_io_get_successors(fields = fields.clone()) - @block_io_counts(fields = fields.clone()) - @block_io_set_streams(fields = fields.clone()) - @block_io_create_stream(fields = fields.clone()) - } - ) -} - -#[zyn::element] -fn block_io_set_index(fields: zyn::syn::Fields) -> zyn::TokenStream -{ - let fields = fields.as_named().unwrap().named.clone(); - zyn::zyn!( - fn set_index(&self, block_index: usize) - { - use oxydsp_flowgraph::edge::BlockIOIndex; - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate()) - { - self.{{field.1.ident}}.set_block_index(BlockIOIndex {block_index, port_index: {{ field.0 }} }); - } - - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) - { - self.{{field.1.ident}}.set_block_index(BlockIOIndex {block_index, port_index: {{ field.0 }} }); - } - } - ) -} - -#[zyn::element] -fn block_io_get_successors(fields: zyn::syn::Fields) -> zyn::TokenStream -{ - let fields = fields.as_named().unwrap().named.clone(); - zyn::zyn!( - fn get_successors(&self) -> Vec - { - let mut output = vec![]; - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) - { - if let Some(block_index) = self.{{ field.1.ident }}.get_consumer_block() - { - output.push(block_index); - } - } - output - } - ) -} - -#[zyn::element] -fn block_io_counts(fields: zyn::syn::Fields) -> zyn::TokenStream -{ - let fields = fields.as_named().unwrap().named.clone(); - let input_count = fields - .iter() - .filter(|x| x.attrs.iter().any(|x| x.is("input"))) - .count(); - let output_count = fields - .iter() - .filter(|x| x.attrs.iter().any(|x| x.is("output"))) - .count(); - zyn::zyn!( - fn input_count(&self) -> usize - { - return { { input_count } }; - } - - fn output_count(&self) -> usize - { - return { { output_count } }; - } - ) -} - -#[zyn::element] -fn block_io_set_streams(fields: zyn::syn::Fields) -> zyn::TokenStream -{ - zyn::zyn!( - fn set_anonymous_out_stream( - &mut self, - output_index: usize, - producer: oxydsp_flowgraph::edge::AnonymousStreamProducer, - ) - { - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) - { - self.{{field.1.ident}}.set_anonymous_stream(producer); - } - } - - fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: oxydsp_flowgraph::edge::AnonymousStreamConsumer) - { - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate()) - { - self.{{field.1.ident}}.set_anonymous_stream(consumer); - } - } - ) -} - -#[zyn::element] -fn block_io_create_stream(fields: zyn::syn::Fields) -> zyn::TokenStream -{ - zyn::zyn!( - fn create_anonymous_stream_for( - &mut self, - output_index: usize, - capacity: usize - ) -> (oxydsp_flowgraph::edge::AnonymousStreamProducer, oxydsp_flowgraph::edge::AnonymousStreamConsumer) - { - let (tx, rx) = match output_index - { - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) - { - {{ field.0 }} => oxydsp_flowgraph::stream::bounded_queue::<@out_inner_type(ty = field.1.ty.clone())>(capacity), - } - _ => panic!("output_index out of bounds.") - }; - - (tx.into(), rx.into()) - } - ) -} - -#[zyn::element] -fn out_inner_type(ty: zyn::syn::Type) -> zyn::TokenStream -{ - let out_ty = match ty - { - zyn::syn::Type::Path(type_path) => match &type_path.path.segments.last().unwrap().arguments - { - zyn::syn::PathArguments::AngleBracketed(args) => match args.args.first().unwrap() - { - zyn::syn::GenericArgument::Type(x) => Some(x.clone().to_token_stream()), - _ => None, - }, - _ => None, - }, - _ => None, - }; - - if out_ty.is_none() - { - bail!("Output type must be a Out type."; span = ty.span()); - } - - out_ty.unwrap() -} - -// Sync block - -#[zyn::attribute] -pub fn sync_block(#[zyn(input)] item: zyn::syn::ItemStruct) -> zyn::TokenStream -{ - let strcut_item = item.clone(); - let (impl_generics, type_generics, where_clause) = item.generics.split_for_impl(); - let fields = &item.fields.as_named().unwrap().named; - zyn::zyn!( - {{ strcut_item }} - - impl {{ impl_generics }} oxydsp_flowgraph::block::Block for {{ strcut_item.ident }} {{ type_generics }} - where {{ where_clause }} - { - fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult - { - let mut len = usize::MAX; - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate()) - { - let mut {{ field.1.ident.clone().unwrap() | ident:"{}_reader" }} = self.{{field.1.ident}}.read(); - len = len.min({{ field.1.ident.clone().unwrap() | ident:"{}_reader" }}.len()); - - } - - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) - { - let mut {{ field.1.ident.clone().unwrap() | ident: "{}_writer" }} = self.{{field.1.ident}}.write(); - len = len.min({{ field.1.ident.clone().unwrap() | ident: "{}_writer" }}.len()); - } - - for _ in 0..len - { - let ( - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) - { - {{ field.1.ident.clone().unwrap() | ident: "{}_out" }}, - } - ) = self.sync_work(( - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate()) - { - {{ field.1.ident.clone().unwrap() | ident: "{}_reader" }}.pop().unwrap(), - } - )); - @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) - { - {{ field.1.ident.clone().unwrap() | ident: "{}_writer" }}.push({{ field.1.ident.clone().unwrap() | ident: "{}_out" }}).unwrap(); - } - } - oxydsp_flowgraph::block::BlockResult::Ok - } - } - ) -} diff --git a/oxydsp-flowgraph/Cargo.toml b/oxydsp-flowgraph/Cargo.toml index 8bc1afc..ffe0d5d 100644 --- a/oxydsp-flowgraph/Cargo.toml +++ b/oxydsp-flowgraph/Cargo.toml @@ -4,4 +4,4 @@ version = "0.1.0" edition = "2024" [dependencies] -oxydsp-flowgraph-macros = { path = "../oxydsp-flowgraph-macros" } +oxydsp-flowgraph-macros = { path = "./oxydsp-flowgraph-macros" } diff --git a/oxydsp-flowgraph-macros/Cargo.toml b/oxydsp-flowgraph/oxydsp-flowgraph-macros/Cargo.toml similarity index 100% rename from oxydsp-flowgraph-macros/Cargo.toml rename to oxydsp-flowgraph/oxydsp-flowgraph-macros/Cargo.toml diff --git a/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs new file mode 100644 index 0000000..cbe97db --- /dev/null +++ b/oxydsp-flowgraph/oxydsp-flowgraph-macros/src/lib.rs @@ -0,0 +1,492 @@ +use proc_macro::TokenStream; +use zyn::ToTokens; +use zyn::ext::AttrExt; +use zyn::ext::FieldsExt; +use zyn::format_ident; +use zyn::parse_input; +use zyn::syn::Index; +use zyn::syn::Lit; +use zyn::syn::spanned::Spanned; + +#[zyn::derive("BlockIO", attributes(input, output), debug = "pretty")] +pub fn block_io( + #[zyn(input)] ident: zyn::Extract, + #[zyn(input)] generics: zyn::Extract, + #[zyn(input)] fields: zyn::Fields, +) -> zyn::TokenStream +{ + let ident = ident.inner(); + let (impl_generics, type_generics, where_clause) = generics.split_for_impl(); + zyn::zyn!( + impl {{impl_generics}} oxydsp_flowgraph::block::BlockIO for {{ ident.clone() }} {{ type_generics }} + {{ where_clause }} + { + @block_io_set_index(fields = fields.clone()) + @block_io_get_successors(fields = fields.clone()) + @block_io_counts(fields = fields.clone()) + @block_io_set_streams(fields = fields.clone()) + @block_io_create_stream(fields = fields.clone()) + @block_io_get_meta(ident = ident.clone(), fields = fields.clone()) + } + ) +} + +#[zyn::element] +fn block_io_set_index(fields: zyn::syn::Fields) -> zyn::TokenStream +{ + let fields = fields.as_named().unwrap().named.clone(); + zyn::zyn!( + fn set_index(&self, block_index: usize) + { + use oxydsp_flowgraph::edge::BlockIOIndex; + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate()) + { + self.{{field.1.ident}}.set_block_index(BlockIOIndex {block_index, port_index: {{ field.0 }} }); + } + + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) + { + self.{{field.1.ident}}.set_block_index(BlockIOIndex {block_index, port_index: {{ field.0 }} }); + } + } + ) +} + +#[zyn::element] +fn block_io_get_successors(fields: zyn::syn::Fields) -> zyn::TokenStream +{ + let fields = fields.as_named().unwrap().named.clone(); + zyn::zyn!( + fn get_successors(&self) -> Vec + { + let mut output = vec![]; + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) + { + if let Some(block_index) = self.{{ field.1.ident }}.get_consumer_block() + { + output.push(block_index); + } + } + output + } + ) +} + +#[zyn::element] +fn block_io_get_meta(ident: zyn::syn::Ident, fields: zyn::syn::Fields) -> zyn::TokenStream +{ + let fields = fields.as_named().unwrap().named.clone(); + zyn::zyn!( + fn get_block_name(&self) -> &'static str + { + return {{ ident.to_string() }}; + } + + fn get_input_names(&self) -> Vec<&'static str> + { + let mut output = Vec::new(); + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate()) + { + output.push({{ field.1.ident.clone().unwrap().to_string() }}); + } + return output; + } + fn get_output_names(&self) -> Vec<&'static str> + { + let mut output = Vec::new(); + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) + { + output.push({{ field.1.ident.clone().unwrap().to_string() }}); + } + return output; + } + + fn get_output_type_names(&self) -> Vec<&'static str> + { + let mut output = Vec::new(); + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) + { + output.push(self.{{ field.1.ident.clone() }}.get_type_name()); + } + return output; + } + ) +} + +#[zyn::element] +fn block_io_counts(fields: zyn::syn::Fields) -> zyn::TokenStream +{ + let fields = fields.as_named().unwrap().named.clone(); + let input_count = fields + .iter() + .filter(|x| x.attrs.iter().any(|x| x.is("input"))) + .count(); + let output_count = fields + .iter() + .filter(|x| x.attrs.iter().any(|x| x.is("output"))) + .count(); + zyn::zyn!( + fn input_count(&self) -> usize + { + return { { input_count } }; + } + + fn output_count(&self) -> usize + { + return { { output_count } }; + } + ) +} + +#[zyn::element(debug = "pretty")] +fn block_io_set_streams(fields: zyn::syn::Fields) -> zyn::TokenStream +{ + zyn::zyn!( + #[allow(unreachable_code)] + fn set_anonymous_out_stream( + &mut self, + output_index: usize, + producer: oxydsp_flowgraph::edge::AnonymousStreamProducer, + ) + { + match output_index + { + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) + { + {{ field.0 }} => self.{{field.1.ident}}.set_anonymous_stream(producer), + } + _ => panic!("output_index out of bounds.") + }; + } + + #[allow(unreachable_code)] + fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: oxydsp_flowgraph::edge::AnonymousStreamConsumer) + { + match input_index + { + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate()) + { + {{ field.0 }} => self.{{field.1.ident}}.set_anonymous_stream(consumer), + } + _ => panic!("output_index out of bounds.") + }; + } + ) +} + +#[zyn::element] +fn block_io_create_stream(fields: zyn::syn::Fields) -> zyn::TokenStream +{ + zyn::zyn!( + #[allow(unreachable_code)] + fn create_anonymous_stream_for( + &mut self, + output_index: usize, + capacity: usize + ) -> (oxydsp_flowgraph::edge::AnonymousStreamProducer, oxydsp_flowgraph::edge::AnonymousStreamConsumer) + { + let (tx, rx): (oxydsp_flowgraph::edge::AnonymousStreamProducer, oxydsp_flowgraph::edge::AnonymousStreamConsumer) + = match output_index + { + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) + { + {{ field.0 }} => + { + let (tx, rx) = oxydsp_flowgraph::stream::bounded_queue::<@out_inner_type(ty = field.1.ty.clone())>(capacity); + (tx.into(), rx.into()) + }, + } + _ => panic!("output_index out of bounds.") + }; + + (tx, rx) + } + ) +} + +#[zyn::element] +fn out_inner_type(ty: zyn::syn::Type) -> zyn::TokenStream +{ + let out_ty = match ty + { + zyn::syn::Type::Path(type_path) => match &type_path.path.segments.last().unwrap().arguments + { + zyn::syn::PathArguments::AngleBracketed(args) => match args.args.first().unwrap() + { + zyn::syn::GenericArgument::Type(x) => Some(x.clone().to_token_stream()), + _ => None, + }, + _ => None, + }, + _ => None, + }; + + if out_ty.is_none() + { + bail!("Output type must be a Out type."; span = ty.span()); + } + + out_ty.unwrap() +} + +// Sync block + +#[zyn::attribute] +pub fn sync_block(#[zyn(input)] item: zyn::syn::ItemStruct) -> zyn::TokenStream +{ + let mut strcut_item = item.clone(); + let (impl_generics, type_generics, where_clause) = item.generics.split_for_impl(); + let fields = &item.fields.as_named().unwrap().named; + + // Get state fields + let mut state_fields = vec![]; + for field in strcut_item.fields.iter_mut() + { + let attr_index = field.attrs.iter().enumerate().find_map(|(i, attr)| { + if attr.is("sync_state") { Some(i) } else { None } + }); + + if let Some(state_field) = &field.ident + && let Some(attr_index) = attr_index + { + state_fields.push(state_field.clone()); + field.attrs.remove(attr_index); + } + } + + zyn::zyn!( + {{ strcut_item }} + + impl {{ impl_generics }} oxydsp_flowgraph::block::Block for {{ strcut_item.ident }} {{ type_generics }} + where {{ where_clause }} + { + fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult + { + let mut len = usize::MAX; + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate()) + { + let mut {{ field.1.ident.clone().unwrap() | ident:"{}_reader" }} = self.{{field.1.ident}}.read(); + len = len.min({{ field.1.ident.clone().unwrap() | ident:"{}_reader" }}.len()); + + } + + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) + { + let mut {{ field.1.ident.clone().unwrap() | ident: "{}_writer" }} = self.{{field.1.ident}}.write(); + len = len.min({{ field.1.ident.clone().unwrap() | ident: "{}_writer" }}.len()); + } + + for _ in 0..len + { + if let Some(( + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) + { + {{ field.1.ident.clone().unwrap() | ident: "{}_out" }}, + } + )) = Self::sync_work( + ( + @for (state_field in state_fields) + { + &mut self.{{ state_field }}, + } + ), + ( + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate()) + { + {{ field.1.ident.clone().unwrap() | ident: "{}_reader" }}.pop().unwrap(), + } + ) + ) + { + @for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate()) + { + let _ = {{ field.1.ident.clone().unwrap() | ident: "{}_writer" }}.push({{ field.1.ident.clone().unwrap() | ident: "{}_out" }}); + } + } else + { + return oxydsp_flowgraph::block::BlockResult::Terminated; + } + } + oxydsp_flowgraph::block::BlockResult::Ok + } + } + ) +} + +// Generate +#[proc_macro] +pub fn generate_pop_iterable_tuple_impl(input: TokenStream) -> TokenStream +{ + let count = parse_input!(input as Lit); + let count: usize = match count + { + Lit::Int(lit_int) => lit_int.base10_parse::().unwrap(), + _ => + { + return zyn::syn::Error::new(count.span(), "Must be an integer") + .to_compile_error() + .into(); + } + }; + let generics = [ + format_ident!("A"), + format_ident!("B"), + format_ident!("C"), + format_ident!("D"), + format_ident!("E"), + format_ident!("F"), + format_ident!("G"), + format_ident!("H"), + format_ident!("I"), + format_ident!("J"), + format_ident!("K"), + format_ident!("L"), + format_ident!("M"), + format_ident!("N"), + format_ident!("O"), + format_ident!("P"), + format_ident!("Q"), + format_ident!("R"), + format_ident!("S"), + format_ident!("T"), + format_ident!("U"), + format_ident!("V"), + format_ident!("W"), + format_ident!("X"), + format_ident!("Y"), + format_ident!("Z"), + ]; + + zyn::zyn!( + impl<'a, + @for (i in 0..count) + { + {{ generics[i] }}: 'static, + } + > PopIterable<'a> for ( + @for (i in 0..count) + { + &mut In<{{ generics[i] }}>, + } + ) + { + type Output = ( + @for (i in 0..count) + { + StreamReader<'a, {{ generics[i] }}>, + } + ); + fn pop_iter(&'a mut self) -> PopIter + { + @for (i in 0..count) + { + let {{ i | ident:"reader_{}" }} = self.{{ Index::from(i) }}.read(); + } + let len = [ + @for (i in 0..count) + { + {{ i | ident:"reader_{}" }}.len(), + } + ].into_iter().min().unwrap(); + PopIter { + popped: 0, + len, + reader: ( + @for (i in 0..count) + { + {{ i | ident:"reader_{}" }}, + } + ), + } + } + } + ) + .to_token_stream() + .into() +} + +#[proc_macro] +pub fn impl_iterator_for_pop_iter_tuple(input: TokenStream) -> TokenStream +{ + let count = parse_input!(input as Lit); + let count: usize = match count + { + Lit::Int(lit_int) => lit_int.base10_parse::().unwrap(), + _ => + { + return zyn::syn::Error::new(count.span(), "Must be an integer") + .to_compile_error() + .into(); + } + }; + let generics = [ + format_ident!("A"), + format_ident!("B"), + format_ident!("C"), + format_ident!("D"), + format_ident!("E"), + format_ident!("F"), + format_ident!("G"), + format_ident!("H"), + format_ident!("I"), + format_ident!("J"), + format_ident!("K"), + format_ident!("L"), + format_ident!("M"), + format_ident!("N"), + format_ident!("O"), + format_ident!("P"), + format_ident!("Q"), + format_ident!("R"), + format_ident!("S"), + format_ident!("T"), + format_ident!("U"), + format_ident!("V"), + format_ident!("W"), + format_ident!("X"), + format_ident!("Y"), + format_ident!("Z"), + ]; + + zyn::zyn!( + impl<'a, + @for (i in 0..count) + { + {{ generics[i] }}: 'static, + } + > Iterator for PopIter<( + @for (i in 0..count) + { + StreamReader<'a, {{ generics[i] }}>, + } + )> + { + type Item = ( + @for (i in 0..count) + { + {{ generics[i] }}, + } + ); + fn next(&mut self) -> Option + { + if self.popped < self.len + { + self.popped += 1; + + Some(( + @for (i in 0..count) + { + self.reader.{{ Index::from(i) }}.pop().unwrap(), + } + )) + } + else + { + None + } + } + } + ) + .to_token_stream() + .into() +} diff --git a/oxydsp-flowgraph/src/block.rs b/oxydsp-flowgraph/src/block.rs index 1eb949f..9c3c18f 100644 --- a/oxydsp-flowgraph/src/block.rs +++ b/oxydsp-flowgraph/src/block.rs @@ -2,7 +2,12 @@ use crate::edge::{AnonymousStreamConsumer, AnonymousStreamProducer, BlockIOIndex pub enum BlockResult { + // Signifies that the block can be scheduled again. Ok, + + // Signifies that the block finished its work + // Running it again would be useless + // This triggers the graph shutdown Terminated, } @@ -28,6 +33,12 @@ pub trait BlockIO output_index: usize, capacity: usize, ) -> (AnonymousStreamProducer, AnonymousStreamConsumer); + + // Meta information + fn get_block_name(&self) -> &'static str; + fn get_input_names(&self) -> Vec<&'static str>; + fn get_output_names(&self) -> Vec<&'static str>; + fn get_output_type_names(&self) -> Vec<&'static str>; } pub trait Block @@ -39,8 +50,11 @@ pub trait SyncBlock { type Input; type Output; + type State; - fn sync_work(&mut self, input: Self::Input) -> Self::Output; + fn sync_work(state: &mut Self::State, input: Self::Input) -> Option; } pub trait GraphableBlock: Block + BlockIO {} + +impl GraphableBlock for T where T: Block + BlockIO {} diff --git a/oxydsp-flowgraph/src/edge.rs b/oxydsp-flowgraph/src/edge.rs index d49c3da..ce32d10 100644 --- a/oxydsp-flowgraph/src/edge.rs +++ b/oxydsp-flowgraph/src/edge.rs @@ -1,13 +1,18 @@ use std::any::Any; +use std::collections::binary_heap::Iter; use std::sync::Arc; use std::sync::Mutex; +use oxydsp_flowgraph_macros::generate_pop_iterable_tuple_impl; +use oxydsp_flowgraph_macros::impl_iterator_for_pop_iter_tuple; + use crate::stream; use crate::stream::StreamConsumer; use crate::stream::StreamProducer; use crate::stream::StreamReader; use crate::stream::StreamWriter; +#[derive(Default)] pub struct Edge { // Represents the index of the block owning the Out end in the graph @@ -90,6 +95,18 @@ pub struct Out edge: Arc>, } +pub fn stream() -> (Out, In) +{ + let edge = Arc::new(Mutex::new(Edge::default())); + ( + Out { + stream: None, + edge: edge.clone(), + }, + In { stream: None, edge }, + ) +} + impl In { pub fn set_block_index(&self, index: BlockIOIndex) @@ -145,4 +162,106 @@ impl Out { self.stream.as_mut().unwrap().write() } + + pub fn push_iter>(&mut self, mut iter: I) -> bool + { + let writer = self.write(); + let len = writer.len(); + + for _ in 0..len + { + if let Some(elt) = iter.next() + { + let _ = writer.push(elt); + } + else + { + return false; + } + } + true + } + + // Meta information + pub fn get_type_name(&self) -> &'static str + { + std::any::type_name::() + } } + +pub struct PopIter +{ + len: usize, + popped: usize, + reader: T, +} + +pub trait PopIterable<'a> +{ + type Output; + fn pop_iter(&'a mut self) -> PopIter; +} + +impl<'a, T: 'static> PopIterable<'a> for In +{ + type Output = StreamReader<'a, T>; + fn pop_iter(&'a mut self) -> PopIter> + { + let reader = self.read(); + PopIter { + popped: 0, + len: reader.len(), + reader, + } + } +} + +generate_pop_iterable_tuple_impl! {2} +generate_pop_iterable_tuple_impl! {3} +generate_pop_iterable_tuple_impl! {4} +generate_pop_iterable_tuple_impl! {5} +generate_pop_iterable_tuple_impl! {6} +generate_pop_iterable_tuple_impl! {7} +generate_pop_iterable_tuple_impl! {8} +generate_pop_iterable_tuple_impl! {9} +generate_pop_iterable_tuple_impl! {10} +generate_pop_iterable_tuple_impl! {11} +generate_pop_iterable_tuple_impl! {12} +generate_pop_iterable_tuple_impl! {13} +generate_pop_iterable_tuple_impl! {14} +generate_pop_iterable_tuple_impl! {15} +generate_pop_iterable_tuple_impl! {16} +generate_pop_iterable_tuple_impl! {17} +generate_pop_iterable_tuple_impl! {18} +generate_pop_iterable_tuple_impl! {19} +generate_pop_iterable_tuple_impl! {20} + +impl<'a, T> Iterator for PopIter> +{ + type Item = T; + + fn next(&mut self) -> Option + { + self.reader.pop() + } +} + +impl_iterator_for_pop_iter_tuple! {2} +impl_iterator_for_pop_iter_tuple! {3} +impl_iterator_for_pop_iter_tuple! {4} +impl_iterator_for_pop_iter_tuple! {5} +impl_iterator_for_pop_iter_tuple! {6} +impl_iterator_for_pop_iter_tuple! {7} +impl_iterator_for_pop_iter_tuple! {8} +impl_iterator_for_pop_iter_tuple! {9} +impl_iterator_for_pop_iter_tuple! {10} +impl_iterator_for_pop_iter_tuple! {11} +impl_iterator_for_pop_iter_tuple! {12} +impl_iterator_for_pop_iter_tuple! {13} +impl_iterator_for_pop_iter_tuple! {14} +impl_iterator_for_pop_iter_tuple! {15} +impl_iterator_for_pop_iter_tuple! {16} +impl_iterator_for_pop_iter_tuple! {17} +impl_iterator_for_pop_iter_tuple! {18} +impl_iterator_for_pop_iter_tuple! {19} +impl_iterator_for_pop_iter_tuple! {20} diff --git a/oxydsp-flowgraph/src/graph.rs b/oxydsp-flowgraph/src/graph.rs index 16e4c83..fa9245e 100644 --- a/oxydsp-flowgraph/src/graph.rs +++ b/oxydsp-flowgraph/src/graph.rs @@ -1,5 +1,20 @@ use crate::block::GraphableBlock; +#[macro_export] +macro_rules! flowgraph +{ + ($($x:ident),* $(,)?) => + { + { + let mut flowgraph = FlowGraph::new(); + $( + flowgraph.add_block($x); + )* + flowgraph + } + } +} + pub struct FlowGraph { blocks: Vec>, @@ -17,6 +32,130 @@ impl FlowGraph block.set_index(self.blocks.len()); self.blocks.push(Box::new(block)); } + + pub fn run(mut self) + { + self.populate_edges(); + let mut k = vec![]; + for mut x in self.blocks.into_iter() + { + k.push(std::thread::spawn(move || { + loop + { + x.work(); + } + })); + } + k.into_iter().for_each(|j| { + let _ = j.join(); + }); + } + + fn populate_edges(&mut self) + { + for block_index in 0..self.blocks.len() + { + let successors = self.blocks[block_index].get_successors(); + for (output_index, succ_id) in successors.iter().enumerate() + { + let (tx, rx) = + self.blocks[block_index].create_anonymous_stream_for(output_index, 4096); + self.blocks[block_index].set_anonymous_out_stream(output_index, tx); + self.blocks[succ_id.block_index].set_anonymous_in_stream(succ_id.port_index, rx); + } + } + } + + pub fn get_dot(&self) -> String + { + let mut node_string = String::new(); + + for (i, block) in self.blocks.iter().enumerate() + { + // Block name + + // Input strings + let mut input_string = String::new(); + let inputs = block.get_input_names(); + let len = inputs.len(); + if !inputs.is_empty() + { + input_string.push('{'); + for (j, input) in inputs.iter().enumerate() + { + input_string.push_str(&format!(" {}", j, input)); + if j != len - 1 + { + input_string.push('|'); + } + } + input_string.push_str("}|"); + } + + // Output strings + let mut output_string = String::new(); + let outputs = block.get_output_names(); + let len = outputs.len(); + if !outputs.is_empty() + { + output_string.push_str("|{"); + for (j, output) in outputs.iter().enumerate() + { + output_string.push_str(&format!(" {}", j, output)); + if j != len - 1 + { + output_string.push('|'); + } + } + output_string.push('}'); + } + + node_string.push_str(&format!( + "{}_{} [label=\"{{ {} {} {} }}\"];\n", + block.get_block_name(), + i, + input_string, + block.get_block_name(), + output_string, + )); + } + + let mut edges_string = String::new(); + + for (i, block) in self.blocks.iter().enumerate() + { + let outputs = block.get_successors(); + let output_types = block.get_output_type_names(); + let block_name = block.get_block_name(); + for (j, (output, output_type)) in outputs.iter().zip(output_types.iter()).enumerate() + { + let destination_block = output.block_index; + let destination_block_name = self.blocks[destination_block].get_block_name(); + edges_string.push_str(&format!( + "{}_{}:o{} -> {}_{}:i{} [label=\"{}\"];\n", + block_name, + i, + j, + destination_block_name, + destination_block, + output.port_index, + output_type + )); + } + } + + format!( + " + digraph G {{ + node [shape=record]; + rankdir=TB; + {} + {} + }} + ", + node_string, edges_string + ) + } } impl Default for FlowGraph diff --git a/oxydsp-flowgraph/src/lib.rs b/oxydsp-flowgraph/src/lib.rs index 09972fd..20f4b59 100644 --- a/oxydsp-flowgraph/src/lib.rs +++ b/oxydsp-flowgraph/src/lib.rs @@ -5,3 +5,4 @@ pub mod block; pub mod edge; pub mod graph; pub mod stream; +pub use oxydsp_flowgraph_macros::{BlockIO, sync_block}; diff --git a/oxydsp-flowgraph/src/main.rs b/oxydsp-flowgraph/src/main.rs index 1382a61..f328e4d 100644 --- a/oxydsp-flowgraph/src/main.rs +++ b/oxydsp-flowgraph/src/main.rs @@ -1,50 +1 @@ -use oxydsp_flowgraph::block::SyncBlock; -use oxydsp_flowgraph::edge::In; -use oxydsp_flowgraph::edge::Out; -use oxydsp_flowgraph_macros::BlockIO; -use oxydsp_flowgraph_macros::sync_block; - -#[derive(BlockIO)] -//#[sync_block] -pub struct Test -{ - #[input] - input: In, - - #[output] - output: Out, -} - -impl oxydsp_flowgraph::block::Block for Test -{ - fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult - { - let mut len = usize::MAX; - let mut input_reader = self.input.read(); - len = len.min(input_reader.len()); - let mut output_writer = self.output.write(); - len = len.min(output_writer.len()); - //let input = input_reader.pop().unwrap(); - let input = 0; - for _ in 0..len - { - let (output_out,) = self.sync_work((input,)); - output_writer.push(output_out).unwrap(); - } - oxydsp_flowgraph::block::BlockResult::Ok - } -} - -impl SyncBlock for Test -{ - type Input = (u32,); - - type Output = (u32,); - - fn sync_work(&mut self, (num,): Self::Input) -> Self::Output - { - (num,) - } -} - fn main() {} diff --git a/oxydsp-flowgraph/src/stream.rs b/oxydsp-flowgraph/src/stream.rs index fb3be77..ab4b2fc 100644 --- a/oxydsp-flowgraph/src/stream.rs +++ b/oxydsp-flowgraph/src/stream.rs @@ -1,3 +1,4 @@ +use std::cell::Cell; use std::cell::UnsafeCell; use std::mem::MaybeUninit; use std::ops::Deref; @@ -60,7 +61,7 @@ pub struct StreamWriter<'a, T> second: Option<&'a UnsafeCell<[MaybeUninit]>>, first_len: usize, second_len: usize, - written: usize, + written: Cell, } // Represents a read operation within a stream producer @@ -71,7 +72,7 @@ pub struct StreamReader<'a, T> second: Option<&'a UnsafeCell<[MaybeUninit]>>, first_len: usize, second_len: usize, - read: usize, + read: Cell, } pub fn bounded_queue(capacity: usize) -> (StreamProducer, StreamConsumer) @@ -158,15 +159,21 @@ impl StreamProducer // is thus the only on able to write or read the stream when it lives let first_len = head_to_end.len(); let second_len = start_to_tail.len(); - let first = std::mem::transmute(head_to_end); - let second = Some(std::mem::transmute(start_to_tail)); + let first = std::mem::transmute::< + &mut [MaybeUninit], + &UnsafeCell<[MaybeUninit]>, + >(head_to_end); + let second = Some(std::mem::transmute::< + &mut [MaybeUninit], + &UnsafeCell<[MaybeUninit]>, + >(start_to_tail)); StreamWriter { producer: self, first, second, first_len, second_len, - written: 0, + written: 0.into(), } } } @@ -206,15 +213,23 @@ impl StreamProducer let first_len = head_to_end.len(); let second_len = start_to_tail.len(); - let first = std::mem::transmute(head_to_end); - let second = Some(std::mem::transmute(start_to_tail)); + + let first = std::mem::transmute::< + &mut [MaybeUninit], + &UnsafeCell<[std::mem::MaybeUninit]>, + >(head_to_end); + let second = Some(std::mem::transmute::< + &mut [MaybeUninit], + &UnsafeCell<[MaybeUninit]>, + >(start_to_tail)); + StreamWriter { producer: self, first, second, first_len, second_len, - written: 0, + written: 0.into(), } } } @@ -245,11 +260,14 @@ impl StreamProducer let k = &mut *self.inner.buffer.get(); StreamWriter { producer: self, - first_len: k.len(), + first_len: wrapped_tail - wrapped_head, second_len: 0, - first: std::mem::transmute(&k[wrapped_head..wrapped_tail]), + first: std::mem::transmute::< + &[MaybeUninit], + &UnsafeCell<[MaybeUninit]>, + >(&k[wrapped_head..wrapped_tail]), second: None, - written: 0, + written: 0.into(), } } } @@ -278,11 +296,13 @@ impl StreamConsumer let k = &mut *self.inner.buffer.get(); StreamReader { producer: self, - first_len: k.len(), + first_len: wrapped_head - wrapped_tail, second_len: 0, - first: std::mem::transmute(&k[wrapped_tail..wrapped_head]), + first: std::mem::transmute::<&[MaybeUninit], &UnsafeCell<[MaybeUninit]>>( + &k[wrapped_tail..wrapped_head], + ), second: None, - read: 0, + read: 0.into(), } } } @@ -309,11 +329,14 @@ impl StreamConsumer let k = &mut *self.inner.buffer.get(); StreamReader { producer: self, - first_len: k.len(), + first_len: wrapped_head - wrapped_tail, second_len: 0, - first: std::mem::transmute(&k[wrapped_tail..wrapped_head]), + first: std::mem::transmute::< + &[MaybeUninit], + &UnsafeCell<[MaybeUninit]>, + >(&k[wrapped_tail..wrapped_head]), second: None, - read: 0, + read: 0.into(), } } } @@ -351,15 +374,23 @@ impl StreamConsumer let first_len = tail_to_end.len(); let second_len = start_to_head.len(); - let first = std::mem::transmute(tail_to_end); - let second = Some(std::mem::transmute(start_to_head)); + + let first = std::mem::transmute::< + &mut [MaybeUninit], + &UnsafeCell<[MaybeUninit]>, + >(tail_to_end); + let second = Some(std::mem::transmute::< + &mut [MaybeUninit], + &UnsafeCell<[MaybeUninit]>, + >(start_to_head)); + StreamReader { producer: self, first, second, first_len, second_len, - read: 0, + read: 0.into(), } } } @@ -379,23 +410,24 @@ impl<'a, T> StreamWriter<'a, T> self.len() == 0 } - pub fn push(&mut self, element: T) -> Result<(), T> + pub fn push(&self, element: T) -> Result<(), T> { - if self.written < self.first_len + if self.written.get() < self.first_len { unsafe { - (&mut *self.first.get())[self.written] = MaybeUninit::new(element); + (&mut *self.first.get())[self.written.get()] = MaybeUninit::new(element); } - self.written += 1; + self.written.set(self.written.get() + 1); Ok(()) } - else if let Some(second) = &mut self.second - && self.written - self.first_len < self.second_len + else if let Some(second) = &self.second + && self.written.get() - self.first_len < self.second_len { unsafe { - (&mut *second.get())[self.written - self.first_len] = MaybeUninit::new(element); + (&mut *second.get())[self.written.get() - self.first_len] = + MaybeUninit::new(element); } - self.written += 1; + self.written.set(self.written.get() + 1); Ok(()) } else @@ -417,9 +449,9 @@ impl<'a, T> StreamReader<'a, T> self.len() == 0 } - pub fn pop(&mut self) -> Option + pub fn pop(&self) -> Option { - if self.read < self.first_len + if self.read.get() < self.first_len { // SAFETY: // @@ -427,25 +459,25 @@ impl<'a, T> StreamReader<'a, T> // We take it once since read increases let element = unsafe { std::mem::replace( - &mut (&mut *self.first.get())[self.read], + &mut (&mut *self.first.get())[self.read.get()], MaybeUninit::uninit(), ) .assume_init() }; - self.read += 1; + self.read.set(self.read.get() + 1); Some(element) } - else if let Some(second) = &mut self.second - && self.read - self.first_len < self.second_len + else if let Some(second) = &self.second + && self.read.get() - self.first_len < self.second_len { let element = unsafe { std::mem::replace( - &mut (&mut *second.get())[self.read - self.first_len], + &mut (&mut *second.get())[self.read.get() - self.first_len], MaybeUninit::uninit(), ) .assume_init() }; - self.read += 1; + self.read.set(self.read.get() + 1); Some(element) } else @@ -469,7 +501,7 @@ impl<'a, T> Drop for StreamWriter<'a, T> self.producer .inner .head - .store(head + self.written, Ordering::Release); + .store(head + self.written.get(), Ordering::Release); } } @@ -487,7 +519,7 @@ impl<'a, T> Drop for StreamReader<'a, T> self.producer .inner .tail - .store(tail + self.read, Ordering::Release); + .store(tail + self.read.get(), Ordering::Release); } } @@ -503,7 +535,7 @@ mod test let (mut tx, mut rx) = bounded_queue::(4); { - let mut writer = tx.write(); + let writer = tx.write(); assert_eq!(writer.len(), 4); @@ -515,7 +547,7 @@ mod test } { - let mut reader = rx.read(); + let reader = rx.read(); assert_eq!(reader.len(), 4); @@ -528,7 +560,7 @@ mod test // Put stream into weird situatino { - let mut writer = tx.write(); + let writer = tx.write(); assert_eq!(writer.push(1), Ok(())); assert_eq!(writer.push(2), Ok(())); assert_eq!(writer.push(3), Ok(())); @@ -536,20 +568,20 @@ mod test } { - let mut reader = rx.read(); + let reader = rx.read(); assert_eq!(reader.pop(), Some(1)); assert_eq!(reader.pop(), Some(2)); } { - let mut writer = tx.write(); + let writer = tx.write(); assert_eq!(writer.len(), 2); assert_eq!(writer.push(5), Ok(())); assert_eq!(writer.push(6), Ok(())); } { - let mut reader = rx.read(); + let reader = rx.read(); assert_eq!(reader.pop(), Some(3)); assert_eq!(reader.pop(), Some(4)); assert_eq!(reader.pop(), Some(5));