From 6d99d54f4a30a357b8d94d9d4d68977843fec3ad Mon Sep 17 00:00:00 2001 From: Albin Chaboissier Date: Wed, 11 Mar 2026 19:31:37 +0100 Subject: [PATCH] Blocks and flowgraph: added this time --- Cargo.lock | 47 +++++++++++++++++++++ Cargo.toml | 2 +- oxydsp-flowgraph/Cargo.toml | 1 + oxydsp-flowgraph/src/block.rs | 28 +++++++++++++ oxydsp-flowgraph/src/edge.rs | 63 ++++++++++++++++++++++++++++ oxydsp-flowgraph/src/graph.rs | 28 +++++++++++++ oxydsp-flowgraph/src/lib.rs | 3 ++ oxydsp-flowgraph/src/main.rs | 77 ++++------------------------------ oxydsp-flowgraph/src/stream.rs | 10 +---- 9 files changed, 181 insertions(+), 78 deletions(-) create mode 100644 oxydsp-flowgraph/src/block.rs create mode 100644 oxydsp-flowgraph/src/edge.rs create mode 100644 oxydsp-flowgraph/src/graph.rs diff --git a/Cargo.lock b/Cargo.lock index 2ccf4e4..0390bc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9,3 +9,50 @@ version = "0.1.0" [[package]] name = "oxydsp-flowgraph" version = "0.1.0" +dependencies = [ + "oxydsp-flowgraph-macros", +] + +[[package]] +name = "oxydsp-flowgraph-macros" +version = "0.1.0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "proc-macro2" +version = "1.0.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "syn" +version = "2.0.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "unicode-ident" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" diff --git a/Cargo.toml b/Cargo.toml index 8f24bd8..795a2b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,3 @@ [workspace] resolver = "3" -members = ["example","oxydsp-flowgraph"] +members = ["example","oxydsp-flowgraph", "oxydsp-flowgraph-macros"] diff --git a/oxydsp-flowgraph/Cargo.toml b/oxydsp-flowgraph/Cargo.toml index 9e5aa63..8bc1afc 100644 --- a/oxydsp-flowgraph/Cargo.toml +++ b/oxydsp-flowgraph/Cargo.toml @@ -4,3 +4,4 @@ version = "0.1.0" edition = "2024" [dependencies] +oxydsp-flowgraph-macros = { path = "../oxydsp-flowgraph-macros" } diff --git a/oxydsp-flowgraph/src/block.rs b/oxydsp-flowgraph/src/block.rs new file mode 100644 index 0000000..be2c2f9 --- /dev/null +++ b/oxydsp-flowgraph/src/block.rs @@ -0,0 +1,28 @@ +use crate::edge::BlockIOIndex; + +pub enum BlockResult +{ + Ok, + Terminated, +} + +pub trait BlockIO +{ + // Get all of the BlockIOIndices (block index + port) that + // this blocks can send data to. + fn get_successors(&self) -> Vec; + + // Sets the index of the current blocks on the shared edges + fn set_index(&self, block_index: usize); + + // Number of input/output ports + fn input_count(&self); + fn output_count(&self); +} + +pub trait Block +{ + fn work(&mut self) -> BlockResult; +} + +pub trait GraphableBlock: Block + BlockIO {} diff --git a/oxydsp-flowgraph/src/edge.rs b/oxydsp-flowgraph/src/edge.rs new file mode 100644 index 0000000..19d0139 --- /dev/null +++ b/oxydsp-flowgraph/src/edge.rs @@ -0,0 +1,63 @@ +use std::sync::{Arc, Mutex}; + +use crate::stream::{StreamConsumer, StreamProducer}; + +pub struct Edge +{ + // Represents the index of the block owning the Out end in the graph + // And the the output index within that block + pub from: Option, + + // Represents the index of the block owning the In end in the graph + // And the the input index within that block + pub to: Option, +} + +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub struct BlockIOIndex +{ + pub block_index: usize, + pub port_index: usize, +} + +pub struct In +{ + stream: Option>, + + // Will rarely be accessed + edge: Arc>, +} + +pub struct Out +{ + stream: Option>, + + // Will rarely be accessed + edge: Arc>, +} + +impl In +{ + pub fn set_block_index(&self, index: BlockIOIndex) + { + self.edge.lock().unwrap().to = Some(index); + } + + pub fn get_producer_block(&self) -> Option + { + self.edge.lock().unwrap().from + } +} + +impl Out +{ + pub fn set_block_index(&self, index: BlockIOIndex) + { + self.edge.lock().unwrap().from = Some(index); + } + + pub fn get_consumer_block(&self) -> Option + { + self.edge.lock().unwrap().to + } +} diff --git a/oxydsp-flowgraph/src/graph.rs b/oxydsp-flowgraph/src/graph.rs new file mode 100644 index 0000000..16e4c83 --- /dev/null +++ b/oxydsp-flowgraph/src/graph.rs @@ -0,0 +1,28 @@ +use crate::block::GraphableBlock; + +pub struct FlowGraph +{ + blocks: Vec>, +} + +impl FlowGraph +{ + pub fn new() -> Self + { + FlowGraph { blocks: vec![] } + } + + pub fn add_block(&mut self, block: T) + { + block.set_index(self.blocks.len()); + self.blocks.push(Box::new(block)); + } +} + +impl Default for FlowGraph +{ + fn default() -> Self + { + Self::new() + } +} diff --git a/oxydsp-flowgraph/src/lib.rs b/oxydsp-flowgraph/src/lib.rs index c2134db..09972fd 100644 --- a/oxydsp-flowgraph/src/lib.rs +++ b/oxydsp-flowgraph/src/lib.rs @@ -1,4 +1,7 @@ // This crate manages the flowgraph datastructures and execution/scheduling // as well as the communication between the blocks +pub mod block; +pub mod edge; +pub mod graph; pub mod stream; diff --git a/oxydsp-flowgraph/src/main.rs b/oxydsp-flowgraph/src/main.rs index 2030592..61623ed 100644 --- a/oxydsp-flowgraph/src/main.rs +++ b/oxydsp-flowgraph/src/main.rs @@ -1,73 +1,14 @@ -use std::time::Instant; +use oxydsp_flowgraph::edge::{In, Out}; +use oxydsp_flowgraph_macros::BlockIO; -use oxydsp_flowgraph::stream; - -fn main() +#[derive(BlockIO)] +pub struct Test { - transfer_test(); - return; - let (mut tx, mut rx) = stream::bounded_queue::(256); + #[input] + input: In, - std::thread::spawn(move || { - let mut i = 0; - loop - { - let mut writer = tx.write(); - while writer.push(i).is_ok() - { - i += 1; - std::thread::yield_now(); - } - } - }); - - loop - { - let mut reader = rx.read(); - let len = reader.len(); - while let Some(x) = reader.pop() - { - println!("{len}: {x}"); - std::thread::yield_now(); - } - } + #[output] + output: Out, } -fn transfer_test() -{ - let (mut tx, mut rx) = stream::bounded_queue::(256); - let count = 1_000_000_000; - - let start = Instant::now(); - std::thread::spawn(move || { - let mut i = 0; - while i <= count - { - let mut writer = tx.write(); - let mut batch_size = 0; - while i <= count && batch_size < 128 && writer.push(i).is_ok() - { - i += 1; - batch_size += 1; - } - } - }); - - let mut j = 0; - while j <= count - { - let mut reader = rx.read(); - while let Some(x) = reader.pop() - { - assert_eq!(x, j); - j += 1; - } - } - let end = Instant::now(); - let time = (end - start).as_secs_f32(); - println!( - "Transfer test: {:.2}s, {:.2} MT/s", - time, - count as f32 / (1_000_000. * time) - ); -} +fn main() {} diff --git a/oxydsp-flowgraph/src/stream.rs b/oxydsp-flowgraph/src/stream.rs index daaf110..71a6780 100644 --- a/oxydsp-flowgraph/src/stream.rs +++ b/oxydsp-flowgraph/src/stream.rs @@ -84,19 +84,11 @@ pub fn bounded_queue(capacity: usize) -> (StreamProducer, StreamConsumer]>) }; - // DBG - // let buffer = unsafe { - // Box::from_raw(std::mem::transmute( - // cell as *mut UnsafeCell<[MaybeUninit]>, - // )) - // }; - // DBG - let queue = Arc::new(Stream { buffer, capacity_mask: capacity - 1,