Blocks and flowgraph: added this time

This commit is contained in:
2026-03-11 19:31:37 +01:00
parent 13d060776f
commit 6d99d54f4a
9 changed files with 181 additions and 78 deletions

View File

@ -4,3 +4,4 @@ version = "0.1.0"
edition = "2024"
[dependencies]
oxydsp-flowgraph-macros = { path = "../oxydsp-flowgraph-macros" }

View File

@ -0,0 +1,28 @@
use crate::edge::BlockIOIndex;
pub enum BlockResult
{
Ok,
Terminated,
}
pub trait BlockIO
{
// Get all of the BlockIOIndices (block index + port) that
// this blocks can send data to.
fn get_successors(&self) -> Vec<BlockIOIndex>;
// Sets the index of the current blocks on the shared edges
fn set_index(&self, block_index: usize);
// Number of input/output ports
fn input_count(&self);
fn output_count(&self);
}
pub trait Block
{
fn work(&mut self) -> BlockResult;
}
pub trait GraphableBlock: Block + BlockIO {}

View File

@ -0,0 +1,63 @@
use std::sync::{Arc, Mutex};
use crate::stream::{StreamConsumer, StreamProducer};
pub struct Edge
{
// Represents the index of the block owning the Out end in the graph
// And the the output index within that block
pub from: Option<BlockIOIndex>,
// Represents the index of the block owning the In end in the graph
// And the the input index within that block
pub to: Option<BlockIOIndex>,
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct BlockIOIndex
{
pub block_index: usize,
pub port_index: usize,
}
pub struct In<T>
{
stream: Option<StreamProducer<T>>,
// Will rarely be accessed
edge: Arc<Mutex<Edge>>,
}
pub struct Out<T>
{
stream: Option<StreamConsumer<T>>,
// Will rarely be accessed
edge: Arc<Mutex<Edge>>,
}
impl<T> In<T>
{
pub fn set_block_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().to = Some(index);
}
pub fn get_producer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().from
}
}
impl<T> Out<T>
{
pub fn set_block_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().from = Some(index);
}
pub fn get_consumer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().to
}
}

View File

@ -0,0 +1,28 @@
use crate::block::GraphableBlock;
pub struct FlowGraph
{
blocks: Vec<Box<dyn GraphableBlock + Send + 'static>>,
}
impl FlowGraph
{
pub fn new() -> Self
{
FlowGraph { blocks: vec![] }
}
pub fn add_block<T: GraphableBlock + Send + 'static>(&mut self, block: T)
{
block.set_index(self.blocks.len());
self.blocks.push(Box::new(block));
}
}
impl Default for FlowGraph
{
fn default() -> Self
{
Self::new()
}
}

View File

@ -1,4 +1,7 @@
// This crate manages the flowgraph datastructures and execution/scheduling
// as well as the communication between the blocks
pub mod block;
pub mod edge;
pub mod graph;
pub mod stream;

View File

@ -1,73 +1,14 @@
use std::time::Instant;
use oxydsp_flowgraph::edge::{In, Out};
use oxydsp_flowgraph_macros::BlockIO;
use oxydsp_flowgraph::stream;
fn main()
#[derive(BlockIO)]
pub struct Test
{
transfer_test();
return;
let (mut tx, mut rx) = stream::bounded_queue::<usize>(256);
#[input]
input: In<u32>,
std::thread::spawn(move || {
let mut i = 0;
loop
{
let mut writer = tx.write();
while writer.push(i).is_ok()
{
i += 1;
std::thread::yield_now();
}
}
});
loop
{
let mut reader = rx.read();
let len = reader.len();
while let Some(x) = reader.pop()
{
println!("{len}: {x}");
std::thread::yield_now();
}
}
#[output]
output: Out<u32>,
}
fn transfer_test()
{
let (mut tx, mut rx) = stream::bounded_queue::<usize>(256);
let count = 1_000_000_000;
let start = Instant::now();
std::thread::spawn(move || {
let mut i = 0;
while i <= count
{
let mut writer = tx.write();
let mut batch_size = 0;
while i <= count && batch_size < 128 && writer.push(i).is_ok()
{
i += 1;
batch_size += 1;
}
}
});
let mut j = 0;
while j <= count
{
let mut reader = rx.read();
while let Some(x) = reader.pop()
{
assert_eq!(x, j);
j += 1;
}
}
let end = Instant::now();
let time = (end - start).as_secs_f32();
println!(
"Transfer test: {:.2}s, {:.2} MT/s",
time,
count as f32 / (1_000_000. * time)
);
}
fn main() {}

View File

@ -84,19 +84,11 @@ pub fn bounded_queue<T>(capacity: usize) -> (StreamProducer<T>, StreamConsumer<T
let cell = UnsafeCell::from_mut(Box::leak(slice));
// SAFETY:
// Cell was created in a box, and UnsafeCell is a ZST,
// Cell was created in a box, and UnsafeCell is transparent,
// the memory layout pointed to by cell is the one a box would thus
// expect
let buffer = unsafe { Box::from_raw(cell as *mut UnsafeCell<[MaybeUninit<T>]>) };
// DBG
// let buffer = unsafe {
// Box::from_raw(std::mem::transmute(
// cell as *mut UnsafeCell<[MaybeUninit<usize>]>,
// ))
// };
// DBG
let queue = Arc::new(Stream {
buffer,
capacity_mask: capacity - 1,